Ich schreibe einen Verbraucher, der den Offset manuell festlegt, sobald eine Reihe von Aufzeichnungen an Mongo übergeben wurde.
Im Falle eines Mongo-Fehlers oder eines anderen Fehlers wird versucht, den Datensatz in eine Fehlerbehandlungssammlung zu übernehmen
für die Wiedergabe zu einem späteren Zeitpunkt.
Wenn Mongo down ist, möchte ich, dass der Konsument die Verarbeitung für eine gewisse Zeit stoppt, bevor er versucht, die Datensätze aus dem nicht aufgegebenen Offset von Kakfa zu lesen.
Das folgende Beispiel funktioniert, aber ich würde gerne wissen, was die beste Vorgehensweise für dieses Szenario ist?
Wenn ich den Verbraucher nicht neu erstellen, erhalte ich den folgenden Fehler.
%Vor%Wenn Sie den Offset nicht festgelegt haben und die Eigenschaft auto.commit.enable false ist, dann warten Sie, wenn der Aufruf von Mongo fehlschlägt, einfach die Zeit, die Sie für notwendig halten, und wiederholen Sie die Abfrage ().
Das Problem, das Sie sehen, ist, dass der neue Consumer den poll () als Heartbeat-Mechanismus verwendet. Wenn Sie also länger auf die Timeout-Anforderung warten, wird der Coordinator für das Thema den Consumer kicken, weil er denkt, dass er tot ist und es wird die Gruppe wieder ins Gleichgewicht bringen. Warten Sie also auf Mongo, aber Sie können ab und zu () abholen.
EDIT: Als Workaround können Sie diese Eigenschaft höher request.timeout.ms
setzenIch hoffe, es hilft!
Wie ich es verstehe, ist der (neue) Kunde derjenige, der die verbrauchten Offsets beibehält. Das Commit sendet die Offsets an den Server, aber es hat keine Auswirkung auf die nächste Abfrage von diesem Client, da der Client zum Server sagt "gib mir nächste Nachrichten auf diesem Offset". Warum wird dann der Offset an den Server gesendet? Für das nächste Rebalancing. Der einzige Situation-Server, der die festgeschriebenen Offsets verwendet, ist, wenn ein Client stirbt / trennt - dann werden die Partitionen neu gewichtet und mit diesen Neubalances erhalten die Clients die Offsets vom Server.
Wenn Sie also keinen Offset festlegen und dann poll () aufrufen, können Sie nicht erwarten, dass die Nachricht erneut gelesen wird. Dazu müsste es eine Möglichkeit geben, den Offset im Client zurückzusetzen. Ich habe es nicht versucht, aber ich denke, KafkaConsumer.seek zum Offset der fehlgeschlagenen Nachricht aufzurufen, sollte den Trick machen.
BTW, auf diese Weise können Sie sogar die letzte erfolgreich verarbeitete Nachricht festschreiben und nach der ersten suchen, die fehlgeschlagen ist, so dass Sie nicht die gesamte Datensatzliste wiederholen müssen, wenn ein Fehler für eine Nachricht in der Mitte aufgetreten ist.
Tags und Links apache-kafka kafka-consumer-api