Maximale Anzahl von Tupelwiederholungen auf Storm Kafka Tülle

8

Wir benutzen Storm mit der Kafka-Tülle. Wenn wir Nachrichten auslassen, möchten wir sie wiederholen, aber in einigen Fällen führen schlechte Daten oder Codefehler dazu, dass Nachrichten immer einen Bolt auslassen, so dass wir in einen unendlichen Wiederholungszyklus kommen. Offensichtlich beheben wir Fehler, wenn wir sie finden, möchten aber, dass unsere Topologie generell fehlertolerant ist. Wie können wir ein Tupel ack () nachdem es mehr als N mal wiederholt wurde?

Wenn ich mir den Code für den Kafka-Auslauf ansehe, sehe ich, dass er mit einem exponentiellen Backoff-Timer und den Kommentaren erneut versucht wird auf der PR Staat:

"Der Auslauf beendet den Wiederholungszyklus nicht (ich bin der Überzeugung, dass dies nicht der Fall sein sollte, weil er keinen Zusammenhang über den Fehlschlag melden kann), er behandelt nur das Verzögern der Wiederholungen Von der Topologie wird weiterhin erwartet, dass sie ack () anstelle von fail () aufruft, um den Zyklus zu stoppen. "

Ich habe StackOverflow-Antworten gesehen, die das Schreiben einer benutzerdefinierten Tülle empfehlen, aber ich möchte lieber nicht einen benutzerdefinierten Patch der Interna der Kafka-Tülle pflegen, wenn es eine empfohlene Methode dafür in einem Bolt gibt.

>

Was ist der richtige Weg, dies in einem Bolzen zu tun? Ich sehe keinen Zustand im Tupel, der anzeigt, wie oft es wiederholt wurde.

    
kleinsch 02.10.2015, 16:49
quelle

4 Antworten

5

Storm selbst bietet keine Unterstützung für Ihr Problem. Daher ist eine maßgeschneiderte Lösung der einzige Weg. Auch wenn Sie KafkaSpout nicht patchen wollen, wäre es der beste Ansatz, einen Counter einzuführen und den Replay-Zyklus zu unterbrechen. Als Alternative könnten Sie auch von KafkaSpout erben und einen Zähler in Ihre Unterklasse setzen. Dies ist natürlich etwas ähnlich wie ein Patch, aber möglicherweise weniger aufdringlich und einfacher zu implementieren.

Wenn Sie einen Bolzen verwenden möchten, könnten Sie Folgendes tun (was auch einige Änderungen an der KafkaSpout oder einer Unterklasse erfordert).

  • Ordnen Sie jedem Tupel eine eindeutige ID als zusätzliches Attribut zu (vielleicht steht bereits eine eindeutige ID zur Verfügung), andernfalls könnten Sie eine "Zähler-ID" oder nur das ganze Tupel, dh alle Attribute, eingeben, um jedes zu identifizieren Tupel).
  • Fügen Sie eine Schraube nach KafkaSpout via fieldsGrouping für die ID ein (um sicherzustellen, dass ein Tupel, das wiedergegeben wird, an die gleiche Bolt-Instanz gestreamt wird.)
  • Verwenden Sie in Ihrer Schraube ein HashMap<ID,Counter> , das alle Tupel puffert und die Anzahl der (Wiederholungs-) Versuche zählt. Wenn der Zähler kleiner als Ihr Schwellenwert ist, leiten Sie das Eingabetupel weiter, damit es von der tatsächlichen Topologie verarbeitet wird, die folgt (Sie müssen das Tupel natürlich entsprechend verankern). Wenn die Anzahl größer als Ihr Schwellenwert ist, bestätigen Sie das Tupel, um den Zyklus zu unterbrechen und den Eintrag aus HashMap zu entfernen (Sie können auch alle fehlerhaften Tupel protokollieren).
  • Um erfolgreich verarbeitete Tupel aus HashMap zu entfernen, müssen Sie jedes Mal, wenn ein Tupel in KafkaSpout angekreuzt wird, die Tupel-ID an die Schraube weiterleiten, damit das Tupel aus HashMap entfernt werden kann. Erklären Sie einfach einen zweiten Ausgabestream für Ihre KafkaSpout -Unterklasse und überschreiben Sie Spout.ack(...) (natürlich müssen Sie super.ack(...) aufrufen, um sicherzustellen, dass KafkaSpout auch die Bestätigung erhält).

Dieser Ansatz kann jedoch viel Speicher verbrauchen. Als Alternative zu einem Eintrag für jedes Tupel in HashMap könnten Sie auch einen dritten Stream verwenden (der mit den beiden anderen verbunden ist) und eine Tupel-ID weiterleiten, wenn ein Tupel fehlschlägt (zB in% co_de) %). Jedes Mal erhält der Riegel eine "Fail" -Nachricht von diesem dritten Strom, der Zähler wird erhöht. Solange sich kein Eintrag in Spout.fail(...) befindet (oder der Schwellenwert nicht erreicht wird), leitet die Schraube das Tupel zur Verarbeitung einfach weiter. Dies sollte den verwendeten Speicher reduzieren, erfordert jedoch eine weitere Logik, die in Ihrer Tülle und Schraube implementiert wird.

Beide Ansätze haben den Nachteil, dass jedes ackerte Tupel zu einer zusätzlichen Nachricht zu Ihrer neu eingebrachten Schraube führt (wodurch der Netzwerkverkehr erhöht wird). Für den zweiten Ansatz könnte es so aussehen, als ob Sie nur eine "Ack" -Meldung an die Schraube für Tupel senden müssen, die zuvor fehlgeschlagen sind. Sie wissen jedoch nicht, welche Tupel fehlgeschlagen sind und welche nicht. Wenn Sie diesen Netzwerk-Overhead loswerden möchten, könnten Sie ein zweites HashMap in HashMap einfügen, das die IDs fehlgeschlagener Nachrichten puffert. Daher können Sie nur eine "ack" -Nachricht senden, wenn ein fehlgeschlagenes Tupel erfolgreich wiedergegeben wurde. Dieser dritte Ansatz macht die zu implementierende Logik natürlich noch komplexer.

Ohne die KafkaSpout bis zu einem gewissen Grad zu ändern, sehe ich keine Lösung für Ihr Problem. Ich persönlich patch KafkaSpout oder würde den dritten Ansatz mit einer KafkaSpout in HashMap Unterklasse und der Schraube verwenden (weil es wenig Speicher verbraucht und das Netzwerk nicht viel mehr belastet als die ersten beiden Lösungen ).

    
Matthias J. Sax 02.10.2015 22:13
quelle
0

Grundsätzlich funktioniert das so:

  1. Wenn Sie Topologien bereitstellen, sollten sie eine Produktionsqualität aufweisen (das heißt, ein bestimmtes Qualitätsniveau wird erwartet, und die Anzahl der Tupel ist gering).
  2. Wenn ein Tupel fehlschlägt, überprüfen Sie, ob das Tupel tatsächlich gültig ist.
  3. Wenn ein Tupel gültig ist (z. B. konnte es nicht eingefügt werden, weil es nicht möglich ist, eine Verbindung zu einer externen Datenbank herzustellen, oder etwas Ähnliches), antworten Sie darauf.
  4. Wenn ein Tupel fehlgeformt ist und niemals gehandhabt werden kann (zum Beispiel eine Datenbank-ID, die Text ist und die Datenbank eine ganze Zahl erwartet), sollte es ack sein, Sie werden nie in der Lage sein, solche Dinge zu reparieren oder einzufügen es in die Datenbank.
  5. Neue Arten von Ausnahmen sollten protokolliert werden (ebenso wie der Tupelinhalt selbst). Sie sollten diese Protokolle überprüfen und die Regel generieren, um Tupel zukünftig zu validieren. Und fügen Sie schließlich Code hinzu, um sie (ETL) in der Zukunft richtig zu verarbeiten.
  6. Protokollieren Sie nicht alles, sonst sind Ihre Log-Dateien riesig, sehr selektiv was Sie loggen. Der Inhalt der Protokolldateien sollte nützlich sein und kein Müllhaufen.
  7. Machen Sie das weiter und schließlich werden Sie nur alle Fälle abdecken.
SQL.injection 06.10.2015 09:14
quelle
0

Wir sehen uns auch den ähnlichen Daten gegenüber, bei denen schlechte Daten eintreffen, die dazu führen, dass die Schraube unendlich versagt.

Um dies zur Laufzeit zu beheben, haben wir eine weitere Schraube eingeführt, die sie als "DebugBolt" bezeichnet. Die Tülle sendet also zuerst die Nachricht an diese Schraube und dann werden mit diesen Schrauben die erforderlichen Daten für die fehlerhaften Meldungen fixiert und anschließend an die erforderliche Schraube ausgegeben. Auf diese Weise kann man die Datenfehler im laufenden Betrieb beheben.

Auch wenn Sie einige Nachrichten löschen müssen, können Sie tatsächlich ein ignoreFlag von Ihrem DebugBolt an Ihren ursprünglichen Bolt übergeben und Ihr ursprünglicher Bolt sollte einfach eine Ack an Tülle senden ohne zu verarbeiten, wenn ignoreFlag True ist.

    
arpiagar 13.07.2016 05:46
quelle
0

Wir hatten einfach unsere Schraube das schlechte Tupel in einem Fehlerstream ausstrahlen und achte darauf. Eine weitere Schraube hat den Fehler behoben, indem sie speziell für Fehler in ein Kafka-Thema zurückgeschrieben wurde. Dies ermöglicht es uns, den Datenfluss von normal zu fehlerhaft durch die Topologie zu leiten.

Der einzige Fall, in dem ein Tupel fehlschlägt, ist, dass eine erforderliche Ressource offline ist, z. B. eine Netzwerkverbindung, DB, ... Dies sind wiederholbare Fehler. Alles andere wird auf den Fehlerstrom gerichtet, der repariert oder behandelt wird, wie es angemessen ist.

Das alles setzt natürlich voraus, dass Sie keinen Datenverlust erleiden möchten. Wenn Sie nur einen Versuch machen und nach ein paar Wiederholungen ignorieren wollen, dann würde ich mir andere Optionen ansehen.

    
Robin 26.07.2016 18:56
quelle

Tags und Links