Wie kann ich eine große rdd mit einer sehr großen rdd in spark verbinden?

8

Ich habe zwei RDDs. Ein RDD ist zwischen 5-10 Millionen Einträge und der andere RDD ist zwischen 500 Millionen - 750 Millionen Einträge. Irgendwann muss ich diese beiden rdds mit einem gemeinsamen Schlüssel verbinden.

%Vor%

Wenn Spark entscheidet, diesen Beitritt zu machen, entscheidet er sich für einen ShuffledHashJoin. Dies führt dazu, dass viele der Elemente in rddB im Netzwerk gemischt werden. In ähnlicher Weise werden auch einige von rddA im Netzwerk gemischt. In diesem Fall ist rddA zu "groß", um als Broadcast-Variable verwendet zu werden, aber es scheint, als ob BroadcastHashJoin effizienter wäre. Gibt es einen Hinweis darauf, einen BroadcastHashJoin zu verwenden? (Apache Flink unterstützt diese Through Join-Hinweise.)

Wenn nicht, ist die einzige Option, um den autoBroadcastJoinThreshold zu erhöhen?

Update 7/14

Mein Leistungsproblem scheint direkt bei der Neupartitionierung zu liegen. Normalerweise würde ein RDD-Read von HDFS blockweise partitioniert werden, aber in diesem Fall war die Quelle eine Parkett-Datenquelle [die ich gemacht habe]. Wenn Spark (Databricks) die Parkettdatei schreibt, schreibt sie eine Datei pro Partition und liest identisch eine Partition pro Datei. Also, die beste Antwort, die ich gefunden habe, ist, dass während der Produktion der Datenquelle, um es per Schlüssel zu partitionieren, schreibe die Parkett-Senke (die dann natürlich co-partitioniert ist) und verwenden Sie das als rddB.

Die Antwort ist korrekt, aber ich denke, dass die Details über Parkett Datenquelle für jemand anderen nützlich sein können.

    
Ajaxx 13.07.2015, 19:55
quelle

1 Antwort

16

Sie können RDDs mit dem gleichen Partitionierer partitionieren, in diesem Fall werden Partitionen mit dem gleichen Schlüssel auf demselben Executor angeordnet.

In diesem Fall vermeiden Sie das Mischen für Join-Operationen.

Shuffle passiert nur einmal, wenn Sie den Partitoner aktualisieren, und wenn Sie RDDs alle Joins cachen, sollte dies lokal für Executors sein

%Vor%

Sie können auch versuchen, die Übertragungsschwellengröße zu aktualisieren, vielleicht kann rddA gesendet werden:

%Vor%

Wir verwenden 400mb für Broadcast-Joins und es funktioniert gut.

    
Eugene Zhulenev 13.07.2015, 20:04
quelle

Tags und Links