Wir benutzen Storm mit der Kafka-Tülle. Wenn wir Nachrichten auslassen, möchten wir sie wiederholen, aber in einigen Fällen führen schlechte Daten oder Codefehler dazu, dass Nachrichten immer einen Bolt auslassen, so dass wir in einen unendlichen Wiederholungszyklus kommen. Offensichtlich beheben wir Fehler, wenn wir sie finden, möchten aber, dass unsere Topologie generell fehlertolerant ist. Wie können wir ein Tupel ack () nachdem es mehr als N mal wiederholt wurde?
Wenn ich mir den Code für den Kafka-Auslauf ansehe, sehe ich, dass er mit einem exponentiellen Backoff-Timer und den Kommentaren erneut versucht wird auf der PR Staat:
"Der Auslauf beendet den Wiederholungszyklus nicht (ich bin der Überzeugung, dass dies nicht der Fall sein sollte, weil er keinen Zusammenhang über den Fehlschlag melden kann), er behandelt nur das Verzögern der Wiederholungen Von der Topologie wird weiterhin erwartet, dass sie ack () anstelle von fail () aufruft, um den Zyklus zu stoppen. "
Ich habe StackOverflow-Antworten gesehen, die das Schreiben einer benutzerdefinierten Tülle empfehlen, aber ich möchte lieber nicht einen benutzerdefinierten Patch der Interna der Kafka-Tülle pflegen, wenn es eine empfohlene Methode dafür in einem Bolt gibt.
>Was ist der richtige Weg, dies in einem Bolzen zu tun? Ich sehe keinen Zustand im Tupel, der anzeigt, wie oft es wiederholt wurde.
Storm selbst bietet keine Unterstützung für Ihr Problem. Daher ist eine maßgeschneiderte Lösung der einzige Weg. Auch wenn Sie KafkaSpout
nicht patchen wollen, wäre es der beste Ansatz, einen Counter einzuführen und den Replay-Zyklus zu unterbrechen. Als Alternative könnten Sie auch von KafkaSpout
erben und einen Zähler in Ihre Unterklasse setzen. Dies ist natürlich etwas ähnlich wie ein Patch, aber möglicherweise weniger aufdringlich und einfacher zu implementieren.
Wenn Sie einen Bolzen verwenden möchten, könnten Sie Folgendes tun (was auch einige Änderungen an der KafkaSpout
oder einer Unterklasse erfordert).
KafkaSpout
via fieldsGrouping
für die ID ein (um sicherzustellen, dass ein Tupel, das wiedergegeben wird, an die gleiche Bolt-Instanz gestreamt wird.) HashMap<ID,Counter>
, das alle Tupel puffert und die Anzahl der (Wiederholungs-) Versuche zählt. Wenn der Zähler kleiner als Ihr Schwellenwert ist, leiten Sie das Eingabetupel weiter, damit es von der tatsächlichen Topologie verarbeitet wird, die folgt (Sie müssen das Tupel natürlich entsprechend verankern). Wenn die Anzahl größer als Ihr Schwellenwert ist, bestätigen Sie das Tupel, um den Zyklus zu unterbrechen und den Eintrag aus HashMap
zu entfernen (Sie können auch alle fehlerhaften Tupel protokollieren). HashMap
zu entfernen, müssen Sie jedes Mal, wenn ein Tupel in KafkaSpout
angekreuzt wird, die Tupel-ID an die Schraube weiterleiten, damit das Tupel aus HashMap
entfernt werden kann. Erklären Sie einfach einen zweiten Ausgabestream für Ihre KafkaSpout
-Unterklasse und überschreiben Sie Spout.ack(...)
(natürlich müssen Sie super.ack(...)
aufrufen, um sicherzustellen, dass KafkaSpout
auch die Bestätigung erhält). Dieser Ansatz kann jedoch viel Speicher verbrauchen. Als Alternative zu einem Eintrag für jedes Tupel in HashMap
könnten Sie auch einen dritten Stream verwenden (der mit den beiden anderen verbunden ist) und eine Tupel-ID weiterleiten, wenn ein Tupel fehlschlägt (zB in% co_de) %). Jedes Mal erhält der Riegel eine "Fail" -Nachricht von diesem dritten Strom, der Zähler wird erhöht. Solange sich kein Eintrag in Spout.fail(...)
befindet (oder der Schwellenwert nicht erreicht wird), leitet die Schraube das Tupel zur Verarbeitung einfach weiter. Dies sollte den verwendeten Speicher reduzieren, erfordert jedoch eine weitere Logik, die in Ihrer Tülle und Schraube implementiert wird.
Beide Ansätze haben den Nachteil, dass jedes ackerte Tupel zu einer zusätzlichen Nachricht zu Ihrer neu eingebrachten Schraube führt (wodurch der Netzwerkverkehr erhöht wird). Für den zweiten Ansatz könnte es so aussehen, als ob Sie nur eine "Ack" -Meldung an die Schraube für Tupel senden müssen, die zuvor fehlgeschlagen sind. Sie wissen jedoch nicht, welche Tupel fehlgeschlagen sind und welche nicht. Wenn Sie diesen Netzwerk-Overhead loswerden möchten, könnten Sie ein zweites HashMap
in HashMap
einfügen, das die IDs fehlgeschlagener Nachrichten puffert. Daher können Sie nur eine "ack" -Nachricht senden, wenn ein fehlgeschlagenes Tupel erfolgreich wiedergegeben wurde. Dieser dritte Ansatz macht die zu implementierende Logik natürlich noch komplexer.
Ohne die KafkaSpout
bis zu einem gewissen Grad zu ändern, sehe ich keine Lösung für Ihr Problem. Ich persönlich patch KafkaSpout
oder würde den dritten Ansatz mit einer KafkaSpout
in HashMap
Unterklasse und der Schraube verwenden (weil es wenig Speicher verbraucht und das Netzwerk nicht viel mehr belastet als die ersten beiden Lösungen ).
Grundsätzlich funktioniert das so:
ack
sein, Sie werden nie in der Lage sein, solche Dinge zu reparieren oder einzufügen es in die Datenbank. Wir sehen uns auch den ähnlichen Daten gegenüber, bei denen schlechte Daten eintreffen, die dazu führen, dass die Schraube unendlich versagt.
Um dies zur Laufzeit zu beheben, haben wir eine weitere Schraube eingeführt, die sie als "DebugBolt" bezeichnet. Die Tülle sendet also zuerst die Nachricht an diese Schraube und dann werden mit diesen Schrauben die erforderlichen Daten für die fehlerhaften Meldungen fixiert und anschließend an die erforderliche Schraube ausgegeben. Auf diese Weise kann man die Datenfehler im laufenden Betrieb beheben.
Auch wenn Sie einige Nachrichten löschen müssen, können Sie tatsächlich ein ignoreFlag von Ihrem DebugBolt an Ihren ursprünglichen Bolt übergeben und Ihr ursprünglicher Bolt sollte einfach eine Ack an Tülle senden ohne zu verarbeiten, wenn ignoreFlag True ist.
Wir hatten einfach unsere Schraube das schlechte Tupel in einem Fehlerstream ausstrahlen und achte darauf. Eine weitere Schraube hat den Fehler behoben, indem sie speziell für Fehler in ein Kafka-Thema zurückgeschrieben wurde. Dies ermöglicht es uns, den Datenfluss von normal zu fehlerhaft durch die Topologie zu leiten.
Der einzige Fall, in dem ein Tupel fehlschlägt, ist, dass eine erforderliche Ressource offline ist, z. B. eine Netzwerkverbindung, DB, ... Dies sind wiederholbare Fehler. Alles andere wird auf den Fehlerstrom gerichtet, der repariert oder behandelt wird, wie es angemessen ist.
Das alles setzt natürlich voraus, dass Sie keinen Datenverlust erleiden möchten. Wenn Sie nur einen Versuch machen und nach ein paar Wiederholungen ignorieren wollen, dann würde ich mir andere Optionen ansehen.
Tags und Links apache-storm apache-kafka