Wie treten Sie großen Datenrahmen in Spark SQL? (Best Practices, Stabilität, Leistung)

9

Ich erhalte denselben Fehler wie Fehlende Ausgabe Speicherort für Shuffle , wenn Sie in Spark SQL großen Datenfeldern beitreten. Die Empfehlung besteht darin, MEMORY_AND_DISK und / oder spark.shuffle.memoryFraction 0 festzulegen. Allerdings ist spark.shuffle.memoryFraction in Spark & ​​gt; = 1.6.0 veraltet und die Einstellung MEMORY_AND_DISK sollte nicht helfen, wenn ich keine RDD oder Dataframe cache, oder? Außerdem bekomme ich viele andere WARN-Logs und Task-Wiederholungen, die mich glauben machen lassen, dass der Job nicht stabil ist.

Daher ist meine Frage:

  • Was sind Best Practices für die Verbindung riesiger Datenfelder in Spark SQL & gt; = 1.6.0?

Speziellere Fragen sind:

  • Wie kann Anzahl der Executoren und spark.sql.shuffle.partitions optimiert werden, um eine bessere Stabilität / Leistung zu erzielen?
  • Wie finden Sie die richtige Balance zwischen Parallelitätsebene (Anzahl der Executoren / Cores) und Anzahl der Partitionen ? Ich habe herausgefunden, dass das Erhöhen der Anzahl von Executoren nicht immer die Lösung ist, da sie aufgrund des Netzwerkverkehrs I / O-Lese-Timeout -Ausnahmen generieren kann.
  • Gibt es noch einen anderen relevanten Parameter für diesen Zweck?
  • Nach meinem Verständnis bietet die Verknüpfung von Daten, die als ORC oder Parkett gespeichert sind, eine bessere Leistung als Text oder Avro für Join-Vorgänge. Gibt es einen signifikanten Unterschied zwischen Parkett und ORC?
  • Gibt es einen Vorteil von SQLContext gegenüber HiveContext hinsichtlich Stabilität / Leistung für Join-Vorgänge?
  • Gibt es einen Unterschied in Bezug auf Leistung / Stabilität, wenn die an der Verknüpfung beteiligten Datenrahmen zuvor registerTempTable () oder saveAsTable () ?
  • sind

Bisher verwende ich dies ist die Antwort und dieses Kapitel als Ausgangspunkt. Und es gibt ein paar mehr stackoverflow-Seiten zu diesem Thema. Dennoch habe ich keine umfassende Antwort auf dieses beliebte Thema gefunden.

Vielen Dank im Voraus.

    
leo9r 23.06.2016, 09:34
quelle

1 Antwort

0

Das sind viele Fragen. Erlaube mir, diese eins nach dem anderen zu beantworten:

In einer Produktionsumgebung ist die Anzahl der Executoren meistens variabel. Dies hängt von den verfügbaren Ressourcen ab. Die Anzahl der Partitionen ist wichtig, wenn Sie Shuffles durchführen. Unter der Annahme, dass Ihre Daten jetzt verzerrt sind, können Sie die Belastung pro Aufgabe verringern, indem Sie die Anzahl der Partitionen erhöhen. Eine Aufgabe sollte idealerweise ein paar Minuszeichen enthalten. Wenn die Aufgabe zu lange dauert, ist es möglich, dass Ihr Container vorweggenommen wird und die Arbeit verloren geht. Wenn die Task nur wenige Millisekunden benötigt, wird der Overhead beim Starten der Task dominant.

Der Grad der Parallelität und die Optimierung Ihrer Executor-Größen, ich möchte auf den exzellenten Leitfaden von Cloudera verweisen: Ссылка

ORC und Parquet codieren nur die Daten im Ruhezustand. Wenn Sie den tatsächlichen Join ausführen, befinden sich die Daten in dem Speicherformat von Spark. Parkett wird immer populärer, seit Netflix und Facebook es angenommen haben und sich sehr viel Mühe gegeben haben. Parquet ermöglicht es Ihnen, die Daten effizienter zu speichern und hat einige Optimierungen (Prädikat Pushdown), die Spark verwendet.

Sie sollten den SQLContext anstelle des HiveContext verwenden, da der HiveContext veraltet ist. Der SQLContext ist allgemeiner und funktioniert nicht nur mit Hive.

Bei der Ausführung von registerTempTable werden die Daten in SparkSession gespeichert. Dies hat keinen Einfluss auf die Ausführung des Joins. Es speichert nur den Ausführungsplan, der aufgerufen wird, wenn eine Aktion ausgeführt wird (z. B. saveAsTable ). Bei der Ausführung von saveAsTable werden die Daten im verteilten Dateisystem gespeichert.

Hoffe, das hilft. Ich würde auch vorschlagen, dass wir unseren Vortrag auf dem Spark Summit über das Treffen von Joins verfolgen: Ссылка . Dies könnte Ihnen einige Einblicke geben.

Prost, Fokko

    
Fokko Driesprong 14.11.2017 09:29
quelle