Ich habe einen Sturmcluster eingerichtet, um Echtzeittrending und andere Statistiken zu berechnen, allerdings habe ich einige Probleme, die "Wiederherstellungs" -Funktion in dieses Projekt einzuführen, indem ich den Offset erlaubte, der zuletzt von kafka-spout
(der Quelle) gelesen wurde Code für kafka-spout
kommt von Ссылка ), um in Erinnerung zu bleiben. Ich starte meine kafka-spout
auf diese Weise:
Die Standardeinstellungen sollten dies tun, aber ich denke, dass es in meinem Fall nicht so ist. Jedes Mal, wenn ich mein Projekt starte, sucht PartitionManager
nach der Datei mit den Offsets, dann wird nichts gefunden:
Dann beginnt es mit dem letzten möglichen Offset. Was ist in Ordnung, wenn mein Projekt nie versagt, aber nicht genau das, was ich wollte.
Ich habe auch ein wenig mehr in die Klasse PartitionManager
geschaut, die Zkstate
class verwendet, um die Offsets aus diesem Code-Snippet zu schreiben:
PartitionManager
%Vor%ZkState
%Vor% Ich konnte sehen, dass für die erste Nachricht die writeBytes
-Methode in den if
-Block gelangt und versucht, einen Pfad zu erstellen, und dann für die zweite Nachricht in den else
-Block geht, was in Ordnung zu sein scheint . Aber wenn ich das Projekt erneut starte, erscheint dieselbe Nachricht wie oben erwähnt. Es kann keine partition information
gefunden werden.
Ich hatte das gleiche Problem. Es stellte sich heraus, dass ich im lokalen Modus lief, der einen In-Memory-Tierpfleger verwendet und nicht den Tierpfleger, den Kafka benutzt.
Um sicherzustellen, dass KafkaSpout Storms ZooKeeper nicht für die ZkState
verwendet, die den Versatz speichert, müssen Sie zusätzlich zum SpoutConfig.zkServers
SpoutConfig.zkPort
, SpoutConfig.zkRoot
und ZkHosts
festlegen. Zum Beispiel
Ich denke, Sie schlagen diesen Fehler:
Und der Kommentar des oben genannten Kollegen hat mein Problem behoben. Ich habe einige neuere Bibliotheken hinzugefügt.
Tags und Links apache-storm apache-kafka bigdata apache-zookeeper