Spark - Serialisierung eines Objekts mit einem nicht serialisierbaren Element

8

Ich werde diese Frage im Kontext von Spark stellen, denn das ist es, womit ich konfrontiert bin, aber das könnte ein einfaches Java-Problem sein.

In unserem Spark-Job haben wir ein Resolver , das in allen unseren Mitarbeitern verwendet werden muss (es wird in einem udf verwendet). Das Problem ist, dass es nicht serialisierbar ist und wir es nicht ändern können. Die Lösung bestand darin, es als Mitglied einer anderen Klasse anzugeben, die serialisierbar ist.

Also haben wir uns entschieden:

%Vor%

Wir haben dann broadcast diese Klasse mit der Spark API:

%Vor%

(Weitere Informationen zur Spark-Übertragung finden Sie hier )

Wir verwenden dann analyzer in einer UDF, als Teil unseres Spark-Codes, mit etwas wie:

%Vor%

Das alles funktioniert wie erwartet, aber wir fragen uns.

Resolver implementiert nicht Serializable und ist daher als transient gekennzeichnet - was bedeutet, dass es nicht zusammen mit seinem Besitzerobjekt Analyzer serialisiert wird.

Wie Sie jedoch anhand des obigen Codes deutlich sehen können, verwendet die resolve() -Methode resolver und darf daher nicht null sein. Und in der Tat ist es nicht. Der Code funktioniert.

Wenn also das Feld nicht serialisiert wird, wie wird das resolver -Member instanziiert?

Mein erster Gedanke war, dass der Analyzer -Konstruktor auf der Empfängerseite aufgerufen wird (d. h. der Spark-Worker), aber dann würde ich erwarten, dass die Zeile "Initializing a Resolver..." mehrmals gedruckt wird. Aber es wird nur einmal gedruckt, was wahrscheinlich darauf hindeutet, dass es nur einmal aufgerufen wird, bevor es an die Broadcast-API übergeben wird. Warum ist resolver nicht null?

Fehle ich etwas über JVM-Serialisierung oder Spark-Serialisierung?

Wie funktioniert dieser Code überhaupt?

Spark läuft auf YARN, in cluster mode. spark.serializer wird auf org.apache.spark.serializer.KryoSerializer gesetzt.

    
summerbulb 21.01.2018, 19:53
quelle

1 Antwort

3
  

Wenn also das Feld nicht durch die Serialisierung geleitet wird, wie ist das?   Resolver Member instanziiert?

Es wird über den Konstruktoraufruf ( new Resolver ) instanziiert, wenn kryo.readObject :

aufgerufen wird %Vor%
  

Mein erster Gedanke war, dass vielleicht der Analyzer-Konstruktor aufgerufen wird   auf der Empfängerseite (d. h. der Funken Arbeiter), aber dann würde ich erwarten   Die Zeile "Initializing a Resolver ..." wird mehrmals angezeigt.   Aber es ist nur einmal gedruckt, was wahrscheinlich ein Hinweis auf die ist   Tatsache, dass es nur einmal aufgerufen wird

So funktioniert eine Broadcast-Variable nicht. Was passiert ist, wenn jeder Executor die Broadcast-Variable im Scope benötigt, prüft er zuerst, ob sich das Objekt in seinem BlockManager im Speicher befindet. Wenn dies nicht der Fall ist, fragt er entweder den Treiber oder die Nachbar-Executoren (wenn es mehrere gibt) Executoren auf dem gleichen Worker-Knoten) für ihre zwischengespeicherte Instanz, und sie serialisieren es und senden es ihm, und im Gegenzug erhält er die Instanz und speichert sie in seinem eigenen BlockManager .

Dies ist im Verhalten von TorrentBroadcast dokumentiert (was die Standard-Broadcasting-Implementierung ist):

%Vor%
  

Wenn wir die Transiente entfernen, schlägt es fehl, und die Stack-Spur führt zu Kryo

Das liegt daran, dass es wahrscheinlich ein Feld in Ihrer Resolver -Klasse gibt, das selbst Kryo unabhängig vom Serializable -Attribut nicht serialisieren kann.

    
Yuval Itzchakov 22.01.2018 15:38
quelle