Kafka 0.9 Wie man eine Nachricht wieder aufnimmt, wenn man den Offset mit einem Kafka Consumer manuell festlegt

8

Ich schreibe einen Verbraucher, der den Offset manuell festlegt, sobald eine Reihe von Aufzeichnungen an Mongo übergeben wurde.
Im Falle eines Mongo-Fehlers oder eines anderen Fehlers wird versucht, den Datensatz in eine Fehlerbehandlungssammlung zu übernehmen für die Wiedergabe zu einem späteren Zeitpunkt. Wenn Mongo down ist, möchte ich, dass der Konsument die Verarbeitung für eine gewisse Zeit stoppt, bevor er versucht, die Datensätze aus dem nicht aufgegebenen Offset von Kakfa zu lesen.
Das folgende Beispiel funktioniert, aber ich würde gerne wissen, was die beste Vorgehensweise für dieses Szenario ist?

%Vor%

Wenn ich den Verbraucher nicht neu erstellen, erhalte ich den folgenden Fehler.

%Vor%     
Michael Freeman 20.01.2016, 13:46
quelle

3 Antworten

5

Wenn Sie den Offset nicht festgelegt haben und die Eigenschaft auto.commit.enable false ist, dann warten Sie, wenn der Aufruf von Mongo fehlschlägt, einfach die Zeit, die Sie für notwendig halten, und wiederholen Sie die Abfrage ().

Das Problem, das Sie sehen, ist, dass der neue Consumer den poll () als Heartbeat-Mechanismus verwendet. Wenn Sie also länger auf die Timeout-Anforderung warten, wird der Coordinator für das Thema den Consumer kicken, weil er denkt, dass er tot ist und es wird die Gruppe wieder ins Gleichgewicht bringen. Warten Sie also auf Mongo, aber Sie können ab und zu () abholen.

EDIT: Als Workaround können Sie diese Eigenschaft höher request.timeout.ms

setzen

Ich hoffe, es hilft!

    
Nautilus 21.01.2016, 22:44
quelle
6

Hier ist mein Code mit Client-Version 0.10.0.

Seem ist in Ordnung für Sie fordern.

%Vor%     
simonchen 16.06.2016 07:10
quelle
1

Wie ich es verstehe, ist der (neue) Kunde derjenige, der die verbrauchten Offsets beibehält. Das Commit sendet die Offsets an den Server, aber es hat keine Auswirkung auf die nächste Abfrage von diesem Client, da der Client zum Server sagt "gib mir nächste Nachrichten auf diesem Offset". Warum wird dann der Offset an den Server gesendet? Für das nächste Rebalancing. Der einzige Situation-Server, der die festgeschriebenen Offsets verwendet, ist, wenn ein Client stirbt / trennt - dann werden die Partitionen neu gewichtet und mit diesen Neubalances erhalten die Clients die Offsets vom Server.

Wenn Sie also keinen Offset festlegen und dann poll () aufrufen, können Sie nicht erwarten, dass die Nachricht erneut gelesen wird. Dazu müsste es eine Möglichkeit geben, den Offset im Client zurückzusetzen. Ich habe es nicht versucht, aber ich denke, KafkaConsumer.seek zum Offset der fehlgeschlagenen Nachricht aufzurufen, sollte den Trick machen.

Ссылка

BTW, auf diese Weise können Sie sogar die letzte erfolgreich verarbeitete Nachricht festschreiben und nach der ersten suchen, die fehlgeschlagen ist, so dass Sie nicht die gesamte Datensatzliste wiederholen müssen, wenn ein Fehler für eine Nachricht in der Mitte aufgetreten ist.

    
Jakub 27.04.2016 09:56
quelle