PySpark dataframe.foreach () mit dem HappyBase-Verbindungspool gibt 'TypeError: kann thread.lock-Objekte nicht picken'

8

Ich habe einen PySpark-Job, der einige Objekte in HBase aktualisiert (Spark v1.6.0; happybase v0.9).

Es funktioniert, wenn ich eine HBase-Verbindung für jede Zeile öffne / schließe:

%Vor%

Nach einigen tausend Upserts sehen wir Fehler wie folgt:

%Vor%

Offensichtlich ist es ineffizient, eine Verbindung für jedes Upsert zu öffnen / schließen. Diese Funktion ist wirklich nur ein Platzhalter für eine richtige Lösung.

Ich habe dann versucht, eine Version der Funktion process_row zu erstellen, die einen Verbindungspool verwendet:

%Vor%

Aus irgendeinem Grund gibt die Verbindungspoolversion dieser Funktion einen Fehler zurück (siehe vollständige Fehlermeldung ):

%Vor%

Können Sie sehen, was ich falsch mache?

Aktualisieren

Ich sah diesen Beitrag und vermute, dass ich es bin Dasselbe Problem tritt auf: Spark versucht, das Objekt pool zu serialisieren und es an alle Executoren zu verteilen, aber dieses Verbindungspoolobjekt kann nicht für mehrere Executoren freigegeben werden.

Es klingt, als müsste ich das Dataset in Partitionen aufteilen und eine Verbindung pro Partition verwenden (siehe Entwurfsmuster für die Verwendung von foreachrdd ). Ich habe das anhand eines Beispiels in der Dokumentation versucht:

%Vor%

Leider gibt es immer noch einen "can not pickel thread.lock objects" Fehler.

    
Alex Woolford 06.04.2016, 15:28
quelle

1 Antwort

1

in der Zeile happybase-Verbindungen sind nur TCP-Verbindungen, so dass sie nicht zwischen Prozessen ausgetauscht werden können. Ein Verbindungspool ist hauptsächlich für Multithread-Anwendungen nützlich und erweist sich auch für Singlethread-Anwendungen als nützlich, die den Pool als globale "Verbindungsfabrik" mit Verbindungswiederverwendung verwenden können, was den Code vereinfachen kann, da keine Verbindungsobjekte übergeben werden müssen um. Es macht auch die Fehlerbehebung ein bisschen einfacher.

In jedem Fall kann ein Pool (der nur eine Gruppe von Verbindungen darstellt) nicht zwischen Prozessen geteilt werden. zu versuchen, es zu serialisieren, macht aus diesem Grund keinen Sinn. (Pools verwenden Sperren, die dazu führen, dass die Serialisierung fehlschlägt, aber das ist nur ein Symptom.)

Vielleicht können Sie einen Helfer verwenden, der bedingt einen Pool (oder eine Verbindung) erstellt und ihn als modullokale Variable speichert, anstatt ihn beim Import zu instantiieren, z. B.

%Vor%

Dies instanziiert den Pool / die Verbindung bei der ersten Verwendung anstelle der Importzeit.

    
wouter bolsterlee 14.12.2016 19:12
quelle