Shuffle Manager in Spark verstehen

8

Lassen Sie mich klären, wie shuffle depth funktioniert und wie Spark shuffle manager verwendet. Ich melde einige sehr hilfreiche Ressourcen:

Ссылка

Ссылка

Ссылка

Wenn ich sie lese, habe ich verstanden, dass es verschiedene Shuffle-Manager gibt. Ich möchte mich auf zwei von ihnen konzentrieren: hash manager und sort manager (was der Standardmanager ist).

Um meine Frage zu enthüllen, möchte ich von einer sehr häufigen Transformation ausgehen:

%Vor%

Diese Umwandlung bewirkt map-side aggregation und dann shuffle , um alle Schlüssel in dieselbe Partition zu bringen.

Meine Fragen sind:

  • Wird Map-Side-Aggregation intern durch eine mapPartition-Transformation implementiert und somit alle die gleichen Schlüssel mit der Combiner-Funktion zusammengefasst oder wird sie mit einem AppendOnlyMap oder ExternalAppendOnlyMap ?

  • implementiert?
  • Wenn AppendOnlyMap oder ExternalAppendOnlyMap Maps für die Aggregation verwendet werden, werden sie auch für Seitenaggregation reduzieren in ResultTask verwendet?

  • Was genau ist der Zweck dieser beiden Arten von Karten ( AppendOnlyMap oder ExternalAppendOnlyMap )?

  • Werden AppendOnlyMap oder ExternalAppendOnlyMap von allen Shuffle-Managern oder nur vom sortManager verwendet?

  • Ich habe gelesen, dass nachdem AppendOnlyMap oder ExternalAppendOnlyMap voll sind, in eine Datei ausgelaufen ist , wie genau geschieht das?

  • Mit dem Sortier-Shuffle-Manager verwenden wir eine appendOnlyMap zum Zusammenfassen und Kombinieren von Partitionsdatensätzen, richtig? Dann, wenn der Ausführungsspeicher voll ist, fangen wir an, die Karte zu sortieren, sie auf die Festplatte zu schreiben und dann die Karte zu bereinigen. Meine Frage ist: Was ist der Unterschied zwischen "spill to disk" und shuffle write? Sie bestehen im Grunde genommen darin, eine Datei auf dem lokalen Dateisystem zu erstellen, aber sie sind anders behandelt, Shuffle-Schreib-Datensätze werden nicht in die appendOnlyMap geschrieben.

  • Können Sie ausführlich erklären was passiert, wenn reduceByKey ausgeführt wird, und mir alle Schritte erklären, um das zu erreichen? Wie zum Beispiel alle Schritte für die Kartenseitenaggregation, das Shuffling und so weiter.

Giorgio 11.01.2017, 08:09
quelle

1 Antwort

1

Es folgt die Beschreibung von reduceByKey Schritt-für-Schritt:

  1. reduceByKey ruft combineByKeyWithTag auf, mit Identity Combiner und identischem Merge-Wert und erzeugt den Wert
  2. combineByKeyWithClassTag erstellt ein Aggregator und gibt ShuffledRDD zurück. Sowohl "map" als auch "reduce" Seitenaggregationen verwenden einen internen Mechanismus und verwenden nicht mapPartitions .
  3. Agregator verwendet ExternalAppendOnlyMap für beide combineValuesByKey ("Kartenseitenreduktion") und combineCombinersByKey ("Seitenreduktion reduzieren")
  4. Beide Methoden verwenden ExternalAppendOnlyMap.insertAllMethod
  5. ExternalAppendOnlyMap hält Track von ausgelaufenen Teilen und die aktuelle In-Memory-Map ( SizeTrackingAppendOnlyMap )
  6. insertAll Methode aktualisiert In-Memory-Map und prüft den Einsatz , wenn die geschätzte Größe der aktuellen Karte die Schwelle überschreitet. Es verwendet die vererbte Methode Spillable.maybeSpill . Wenn der Schwellenwert überschritten wird, ruft diese Methode spill als Nebeneffekt und insertAll initialisiert clean SizeTrackingAppendOnlyMap
  7. spill ruft auf spillMemoryIteratorToDisk , das das Objekt DiskBlockObjectWriter vom Block-Manager erhält.

insertAll Schritte werden sowohl für das Zuordnen als auch für das Reduzieren von Seitenaggregationen mit entsprechenden Aggregator -Funktionen mit Shuffle-Phase dazwischen angewendet.

Ab Spark 2.0 gibt es nur einen sortierten Manager: SPARK-14667

    
user7337271 11.01.2017 15:06
quelle