Ich werde diese Frage im Kontext von Spark stellen, denn das ist es, womit ich konfrontiert bin, aber das könnte ein einfaches Java-Problem sein.
In unserem Spark-Job haben wir ein Resolver
, das in allen unseren Mitarbeitern verwendet werden muss (es wird in einem udf verwendet). Das Problem ist, dass es nicht serialisierbar ist und wir es nicht ändern können. Die Lösung bestand darin, es als Mitglied einer anderen Klasse anzugeben, die serialisierbar ist.
Also haben wir uns entschieden:
%Vor% Wir haben dann broadcast
diese Klasse mit der Spark API:
(Weitere Informationen zur Spark-Übertragung finden Sie hier )
Wir verwenden dann analyzer
in einer UDF, als Teil unseres Spark-Codes, mit etwas wie:
Das alles funktioniert wie erwartet, aber wir fragen uns.
Resolver
implementiert nicht Serializable
und ist daher als transient
gekennzeichnet - was bedeutet, dass es nicht zusammen mit seinem Besitzerobjekt Analyzer
serialisiert wird.
Wie Sie jedoch anhand des obigen Codes deutlich sehen können, verwendet die resolve()
-Methode resolver
und darf daher nicht null sein. Und in der Tat ist es nicht. Der Code funktioniert.
Wenn also das Feld nicht serialisiert wird, wie wird das resolver
-Member instanziiert?
Mein erster Gedanke war, dass der Analyzer
-Konstruktor auf der Empfängerseite aufgerufen wird (d. h. der Spark-Worker), aber dann würde ich erwarten, dass die Zeile "Initializing a Resolver..."
mehrmals gedruckt wird. Aber es wird nur einmal gedruckt, was wahrscheinlich darauf hindeutet, dass es nur einmal aufgerufen wird, bevor es an die Broadcast-API übergeben wird. Warum ist resolver
nicht null?
Fehle ich etwas über JVM-Serialisierung oder Spark-Serialisierung?
Wie funktioniert dieser Code überhaupt?
Spark läuft auf YARN, in cluster
mode.
spark.serializer
wird auf org.apache.spark.serializer.KryoSerializer
gesetzt.
Wenn also das Feld nicht durch die Serialisierung geleitet wird, wie ist das? Resolver Member instanziiert?
Es wird über den Konstruktoraufruf ( new Resolver
) instanziiert, wenn kryo.readObject
:
Mein erster Gedanke war, dass vielleicht der Analyzer-Konstruktor aufgerufen wird auf der Empfängerseite (d. h. der Funken Arbeiter), aber dann würde ich erwarten Die Zeile "Initializing a Resolver ..." wird mehrmals angezeigt. Aber es ist nur einmal gedruckt, was wahrscheinlich ein Hinweis auf die ist Tatsache, dass es nur einmal aufgerufen wird
So funktioniert eine Broadcast-Variable nicht. Was passiert ist, wenn jeder Executor die Broadcast-Variable im Scope benötigt, prüft er zuerst, ob sich das Objekt in seinem BlockManager
im Speicher befindet. Wenn dies nicht der Fall ist, fragt er entweder den Treiber oder die Nachbar-Executoren (wenn es mehrere gibt) Executoren auf dem gleichen Worker-Knoten) für ihre zwischengespeicherte Instanz, und sie serialisieren es und senden es ihm, und im Gegenzug erhält er die Instanz und speichert sie in seinem eigenen BlockManager
.
Dies ist im Verhalten von TorrentBroadcast
dokumentiert (was die Standard-Broadcasting-Implementierung ist):
Wenn wir die Transiente entfernen, schlägt es fehl, und die Stack-Spur führt zu Kryo
Das liegt daran, dass es wahrscheinlich ein Feld in Ihrer Resolver
-Klasse gibt, das selbst Kryo unabhängig vom Serializable
-Attribut nicht serialisieren kann.
Tags und Links scala java serialization apache-spark kryo