So bereinigen Sie andere Ressourcen, wenn der Funke gestoppt wird

9

In meiner Spark-Anwendung gibt es ein object ResourceFactory , das ein akka ActorSystem für die Bereitstellung von Ressourcenclients enthält. Wenn ich diese Spark-Anwendung ausführe, erstellt jeder Arbeiterknoten ein ActorSystem . Das Problem ist, dass wenn die Funkenanwendung ihre Arbeit beendet und heruntergefahren wird. Das ActorSystem bleibt auf jedem Arbeiterknoten am Leben und verhindert, dass die gesamte Anwendung beendet wird, es hängt einfach.

Gibt es eine Möglichkeit, einen Listener bei SparkContext zu registrieren, sodass, wenn sc heruntergefahren wird, der ActorSystem auf jedem Arbeiterknoten benachrichtigt wird, dass er sich selbst herunterfährt?

UPDATE:

Es folgt das vereinfachte Skelett:

Es gibt ein ResourceFactory , was ein object ist und ein actor system enthält. Und es bietet auch eine fetchData -Methode.

%Vor%

Und dann gibt es eine Klasse user-defined RDD , in ihrer Methode compute muss sie Daten von ResourceFactory holen.

%Vor%

Also wird es auf jedem Knoten ein ActorSystem namens "resource-akka-system" geben, und diese MyRDD Instanzen, die auf diesen Arbeiterknoten verteilt sind, können Daten vom "resource-akka-system" bekommen.

Das Problem ist, dass, wenn die SparkContext heruntergefahren wird, diese "resource-akka-systems" nicht benötigt werden, ich aber nicht weiß, wie ich die ResourceFactory benachrichtigen kann, um die "resource- akka-system "wenn die SparkContext heruntergefahren wird. Das "resouce-akka-system" bleibt also auf jedem Arbeiterknoten am Leben und verhindert, dass das gesamte Programm beendet wird.

UPDATE2:

Mit einigen weiteren Experimenten finde ich, dass das Programm im lokalen Modus hängt, aber im yarn-cluster -Modus wird das Programm erfolgreich beendet. Kann sein, dass yarn die Threads auf den Worker-Knoten beendet, wenn% code_de% heruntergefahren wird?

UPDATE3:

Um zu überprüfen, ob jeder Knoten ein sc enthält, ändere ich den Code wie folgt (folgendes ist das reale Skelett, wenn ich eine weitere Klassendefinition hinzufüge):

%Vor%

Nachdem ich diese ActorSystem s hinzugefügt habe, starte ich den Code auf Spark im Garn-Cluster-Modus. Ich finde das auf dem Treiber habe ich folgende Drucke:

%Vor%

Während ich bei einigen Arbeitern bin, habe ich folgende Abzüge:

%Vor%

Und einige der Arbeiter, es druckt nichts (und allen von ihnen sind keine Aufgaben zugewiesen).

Basierend auf dem oben genannten, denke ich, dass der println im Treiber eifrig initialisiert wird, da er object auf dem Treiber ausgibt, auch wenn sich nichts darauf bezieht, und creating resource factory wird in fuzzy initialisiert, weil er% druckt co_de% nach dem Drucken von object , da die Ressourcenfactory vom ersten erstellten RDDIterator abgerufen wird.

Und ich finde, dass in meinem Anwendungsfall die Klasse creating resource factory nur im Treiber erstellt wird.

Ich bin nicht sehr sicher über die Faulheit der Initialisierung des creating rdd iterator auf Treiber und Worker, es ist meine Vermutung, weil es vielleicht durch andere Teile des Programms verursacht wird, damit es so aussieht. Aber ich denke, es sollte richtig sein, dass es auf jedem Arbeiterknoten ein Akteursystem gibt, wenn es notwendig ist.

    
宇宙人 13.04.2016, 10:00
quelle

1 Antwort

2

Ich glaube nicht, dass es einen Weg gibt, jeden Worker -Lebenszyklus anzuzapfen.

Ich habe auch einige Fragen bezüglich Ihrer Implementierung:

  1. Wenn Sie object haben, das val enthält, wird das von der Funktion run für worker verwendet. Ich verstehe, dass diese val serialisiert und an worker übertragen wird. Können Sie bestätigen, dass pro Arbeiter ein ActorSystem läuft?

  2. Actor System wird normalerweise sofort beendet, wenn Sie nicht explizit auf die Beendigung warten. Rufen Sie etwas wie system.awaitTermination oder blockieren auf system.whenTerminated ?

  3. auf



Wie auch immer, es gibt eine andere Möglichkeit, wie man Akteursysteme auf Remote-Workern herunterfahren kann:

  1. Machen Sie Ihr ActorSystem auf jedem Knoten des akka-Clusters. Hier finden Sie einige Dokumente , wie Sie das programmatisch durchführen können.
  2. Haben Sie die Adresse Ihres Koordinators "Actor" auf dem Treiberknoten (wo Ihr sc ist) an jeden Mitarbeiter gesendet. In einfachen Worten, haben Sie einfach val mit dieser Adresse.
  3. Wenn Ihr akka-System für jeden Mitarbeiter gestartet wird, verwenden Sie die Koordinator-Adresse "actor", um dieses bestimmte Akteursystem zu registrieren (senden Sie die entsprechende Nachricht an den Koordinator Actor).
  4. Coordination Actor überwacht alle registrierten "worker" -Aktoren
  5. Wenn Ihre Berechnung abgeschlossen ist und Sie das Akka-System bei jedem Arbeiter herunterfahren wollen, senden Sie Nachrichten an alle registrierten Akteure vom Koordinationsaktor auf dem Treiberknoten.
  6. Herunterfahren auf Arbeiter-Akka-Systemen, wenn die Nachricht "shutdown" empfangen wird.
Aivean 15.04.2016, 22:01
quelle

Tags und Links