In meiner Spark-Anwendung gibt es ein object ResourceFactory
, das ein akka ActorSystem
für die Bereitstellung von Ressourcenclients enthält. Wenn ich diese Spark-Anwendung ausführe, erstellt jeder Arbeiterknoten ein ActorSystem
. Das Problem ist, dass wenn die Funkenanwendung ihre Arbeit beendet und heruntergefahren wird. Das ActorSystem
bleibt auf jedem Arbeiterknoten am Leben und verhindert, dass die gesamte Anwendung beendet wird, es hängt einfach.
Gibt es eine Möglichkeit, einen Listener bei SparkContext
zu registrieren, sodass, wenn sc
heruntergefahren wird, der ActorSystem
auf jedem Arbeiterknoten benachrichtigt wird, dass er sich selbst herunterfährt?
UPDATE:
Es folgt das vereinfachte Skelett:
Es gibt ein ResourceFactory
, was ein object
ist und ein actor system
enthält. Und es bietet auch eine fetchData
-Methode.
Und dann gibt es eine Klasse user-defined RDD
, in ihrer Methode compute
muss sie Daten von ResourceFactory
holen.
Also wird es auf jedem Knoten ein ActorSystem
namens "resource-akka-system" geben, und diese MyRDD
Instanzen, die auf diesen Arbeiterknoten verteilt sind, können Daten vom "resource-akka-system" bekommen.
Das Problem ist, dass, wenn die SparkContext
heruntergefahren wird, diese "resource-akka-systems" nicht benötigt werden, ich aber nicht weiß, wie ich die ResourceFactory
benachrichtigen kann, um die "resource- akka-system "wenn die SparkContext
heruntergefahren wird. Das "resouce-akka-system" bleibt also auf jedem Arbeiterknoten am Leben und verhindert, dass das gesamte Programm beendet wird.
UPDATE2:
Mit einigen weiteren Experimenten finde ich, dass das Programm im lokalen Modus hängt, aber im yarn-cluster
-Modus wird das Programm erfolgreich beendet. Kann sein, dass yarn
die Threads auf den Worker-Knoten beendet, wenn% code_de% heruntergefahren wird?
UPDATE3:
Um zu überprüfen, ob jeder Knoten ein sc
enthält, ändere ich den Code wie folgt (folgendes ist das reale Skelett, wenn ich eine weitere Klassendefinition hinzufüge):
Nachdem ich diese ActorSystem
s hinzugefügt habe, starte ich den Code auf Spark im Garn-Cluster-Modus. Ich finde das auf dem Treiber habe ich folgende Drucke:
Während ich bei einigen Arbeitern bin, habe ich folgende Abzüge:
%Vor%Und einige der Arbeiter, es druckt nichts (und allen von ihnen sind keine Aufgaben zugewiesen).
Basierend auf dem oben genannten, denke ich, dass der println
im Treiber eifrig initialisiert wird, da er object
auf dem Treiber ausgibt, auch wenn sich nichts darauf bezieht, und creating resource factory
wird in fuzzy initialisiert, weil er% druckt co_de% nach dem Drucken von object
, da die Ressourcenfactory vom ersten erstellten RDDIterator abgerufen wird.
Und ich finde, dass in meinem Anwendungsfall die Klasse creating resource factory
nur im Treiber erstellt wird.
Ich bin nicht sehr sicher über die Faulheit der Initialisierung des creating rdd iterator
auf Treiber und Worker, es ist meine Vermutung, weil es vielleicht durch andere Teile des Programms verursacht wird, damit es so aussieht. Aber ich denke, es sollte richtig sein, dass es auf jedem Arbeiterknoten ein Akteursystem gibt, wenn es notwendig ist.
Ich glaube nicht, dass es einen Weg gibt, jeden Worker
-Lebenszyklus anzuzapfen.
Ich habe auch einige Fragen bezüglich Ihrer Implementierung:
Wenn Sie object
haben, das val
enthält, wird das von der Funktion run für worker verwendet. Ich verstehe, dass diese val
serialisiert und an worker übertragen wird. Können Sie bestätigen, dass pro Arbeiter ein ActorSystem läuft?
Actor System wird normalerweise sofort beendet, wenn Sie nicht explizit auf die Beendigung warten. Rufen Sie etwas wie system.awaitTermination
oder blockieren auf system.whenTerminated
?
Wie auch immer, es gibt eine andere Möglichkeit, wie man Akteursysteme auf Remote-Workern herunterfahren kann:
sc
ist) an jeden Mitarbeiter gesendet. In einfachen Worten, haben Sie einfach val
mit dieser Adresse. Tags und Links scala akka apache-spark