Ich habe einen Ordner mit 150 G txt-Dateien (ca. 700 Dateien, im Durchschnitt je 200 MB).
Ich benutze scala, um die Dateien zu verarbeiten und am Ende einige zusammengefasste Statistiken zu berechnen. Ich sehe zwei mögliche Ansätze dafür:
Ich lehne mich dem zweiten Ansatz zu, da er sauberer erscheint (keine Notwendigkeit für parallelisierungsspezifischen Code), aber ich frage mich, ob mein Szenario den Einschränkungen meiner Hardware und Daten entspricht. Ich habe eine Workstation mit 16 Threads und 64 GB RAM zur Verfügung (so wird die Parallelisierung zwischen verschiedenen Prozessorkernen streng lokal sein). Ich könnte später die Infrastruktur mit mehr Maschinen skalieren, aber im Moment möchte ich nur die Einstellungen für dieses eine Workstation-Szenario tunen.
Der Code, den ich verwende: - liest TSV-Dateien und extrahiert aussagekräftige Daten in Triplets (String, String, String) - Anschließend wird eine Filterung, Zuordnung und Gruppierung durchgeführt - Schließlich werden die Daten reduziert und einige Aggregate berechnet.
Ich konnte diesen Code mit einer einzigen Datei ausführen (~ 200 MB Daten), aber ich bekomme einen java.lang.OutOfMemoryError: GC Overhead-Grenzwert überschritten und / oder eine Java Out-of-Heap-Ausnahme, wenn mehr Daten hinzugefügt werden (die Anwendung bricht mit 6 GB Daten, aber ich möchte sie mit 150 GB Daten verwenden).
Ich denke, ich müsste einige Parameter einstellen, damit dies funktioniert. Ich würde mich über Tipps freuen, wie Sie dieses Problem angehen (wie Sie nach Speicheranforderungen suchen). Ich habe versucht, die 'spark.executor.memory' zu erhöhen und eine kleinere Anzahl von Kernen zu verwenden (das rationale Wesen, dass jeder Kern etwas Heap-Speicherplatz benötigt), aber das hat meine Probleme nicht gelöst.
Ich brauche die Lösung nicht, um sehr schnell zu sein (sie kann bei Bedarf leicht für ein paar Stunden sogar Tage laufen). Ich speichere auch keine Daten, sondern speichere sie am Ende nur im Dateisystem. Wenn Sie der Meinung sind, dass es einfacher wäre, den manuellen Parallelisierungsansatz zu wählen, könnte ich das auch tun.
Ich und mein Team hatten eine csv-Datengröße von über 1 TB über 5 Rechner bei jeweils 32 GB RAM erfolgreich verarbeitet. Es hängt stark davon ab, welche Art von Verarbeitung Sie tun und wie.
Wenn Sie eine RDD neu partitionieren, erfordert dies zusätzliche Berechnungen
hat Overhead über Ihre Heap-Größe, versuchen Sie, die Datei mit mehr zu laden
Paralelismus durch abnehmende Split-Size in
TextInputFormat.SPLIT_MINSIZE
und TextInputFormat.SPLIT_MAXSIZE
(wenn Sie TextInputFormat verwenden), um die Stufe von
Paralelismus.
Verwenden Sie mapPartition anstelle von map, damit Sie die Berechnung innerhalb einer Partition. Wenn die Berechnung ein temporäres verwendet Variable oder Instanz und Sie haben immer noch nicht genügend Speicher, versuchen Sie es Verringern der Anzahl der Daten pro Partition (Erhöhen der Partition Nummer)
Erhöhen Sie den Speicher des Treibers und den Executor-Speicher mit "spark.executor.memory" und "spark.driver.memory" in Spark Konfiguration vor dem Erstellen von Spark Context
Beachten Sie, dass Spark ein Allzweck-Cluster-Computersystem ist, so dass es uneffizient ist (IMHO), Spark in einer einzelnen Maschine zu verwenden
So fügen Sie eine andere Perspektive basierend auf Code hinzu (im Gegensatz zur Konfiguration): Manchmal ist es am besten herauszufinden, in welcher Phase Ihre Spark-Anwendung den Speicher überschreitet und ob Sie Änderungen zur Behebung des Problems vornehmen können. Als ich Spark lernte, hatte ich eine Python Spark-Anwendung, die mit OOM-Fehlern abstürzte. Der Grund war, weil ich alle Ergebnisse zurück in den Master sammelte, anstatt die Aufgaben die Ausgabe speichern zu lassen.
z.
%Vor% processed_data.saveAsTextFile(output_dir)
Tags und Links scala apache-spark