Spark: Effizientere Aggregation zum Verknüpfen von Strings aus verschiedenen Zeilen

8

Ich arbeite gerade mit DNA-Sequenzdaten und bin in einen Performance-Roadblock geraten.

Ich habe zwei Nachschlagewörterbücher / Hashes (als RDDs) mit DNA-Wörtern (kurze Sequenzen) als Schlüssel und einer Liste von Indexpositionen als Wert. Einer ist für eine kürzere Abfragesequenz und der andere für eine Datenbanksequenz. Das Erstellen der Tabellen ist selbst bei sehr, sehr großen Sequenzen ziemlich schnell.

Für den nächsten Schritt muss ich diese paarweise zusammenstellen und "Treffer" (Paare von Indexpositionen für jedes gebräuchliche Wort) finden.

Ich schließe mich zuerst den Nachschlag-Wörterbüchern an, was ziemlich schnell ist. Allerdings brauche ich jetzt die Paare, also muss ich zweimal flatmap, einmal um die Liste der Indizes aus der Abfrage zu erweitern und das zweite Mal, um die Liste der Indizes aus der Datenbank zu erweitern. Das ist nicht ideal, aber ich sehe keinen anderen Weg. Zumindest ist es ok.

Die Ausgabe an diesem Punkt ist: (query_index, (word_length, diagonal_offset)) , wobei der diagonale Offset der database_sequence_index minus dem Abfragefolgenindex ist.

Allerdings muss ich nun Indexpaare mit demselben Diagonalversatz (db_index - query_index) finden, die einigermaßen dicht beieinander liegen und sie verbinden (so dass ich die Länge des Wortes vergrößere), aber nur als Paare (dh einmal verbinden Sie einen Index mit einem anderen, ich möchte nichts anderes damit verschmelzen).

Ich mache das mit einer aggregateByKey-Operation, die ein spezielles Objekt namens Seed () verwendet.

%Vor%

Seed ():

%Vor%

Dies ist der Punkt, an dem die Performance für Sequenzen moderater Länge zusammenbricht.

Gibt es einen besseren Weg, diese Aggregation zu machen? Mein Bauchgefühl sagt ja, aber ich kann nicht darauf kommen.

Ich weiß, dass dies eine sehr langatmige und technische Frage ist, und ich würde wirklich jede Einsicht schätzen, auch wenn es keine einfache Lösung gibt.

Edit: So mache ich die Lookup-Tabellen:

%Vor%

Und hier ist die Funktion, die den gesamten Vorgang ausführt:

%Vor%

Edit 2: Ich habe die Dinge ein wenig optimiert und die Leistung leicht verbessert, indem ich Folgendes ersetzte:

%Vor%

in hits_rdd mit:

%Vor%

Wenigstens verbrenne ich nicht so viel Speicher mit Zwischendatenstrukturen.

    
Chris Chambers 19.12.2015, 20:57
quelle

1 Antwort

1

Vergessen wir die technischen Details, was Sie tun, und denken Sie "funktional" über die einzelnen Schritte, vergessen Sie dabei die Einzelheiten der Implementierung. Funktionales Denken wie dieses ist ein wichtiger Teil der parallelen Datenanalyse; Idealerweise können wir, wenn wir das Problem so lösen können, klarer über die einzelnen Schritte sprechen und klarer und oft prägnanter werden. Im Sinne eines tabellarischen Datenmodells würde ich Ihr Problem aus folgenden Schritten betrachten:

  1. Verknüpfen Sie Ihre beiden Datensätze in der Sequenzspalte.
  2. Erstellen Sie eine neue Spalte delta , die die Differenz zwischen den Indizes enthält.
  3. Sortiert nach (entweder) Index, um sicherzustellen, dass die Untersequenzen in der richtigen Reihenfolge sind.
  4. Gruppieren Sie nach delta und verketten Sie die Strings in der Sequenzspalte, um die vollständigen Übereinstimmungen zwischen Ihren Datasets zu erhalten.

Für die ersten drei Schritte halte ich es für sinnvoll, DataFrames zu verwenden, da dieses Datenmodell in meinem Kopf der Art der Verarbeitung, die wir machen, Sinn macht. (Eigentlich könnte ich DataFrames auch für Schritt 4 verwenden, außer dass pyspark derzeit keine benutzerdefinierten Aggregatfunktionen für DataFrames unterstützt, obwohl Scala dies tut).

Für den vierten Schritt (was ist, wenn ich richtig verstehe, worum es in Ihrer Frage wirklich geht), ist es ein wenig schwierig darüber nachzudenken, wie dies funktional funktioniert, aber ich denke, eine elegante und effiziente Lösung ist zu verwenden ein Reduzieren (auch eine Rechtsfalte genannt); Dieses Muster kann auf jedes Problem angewandt werden, das man im Sinne einer iterativen Anwendung einer assoziativen Binärfunktion formulieren kann, das ist eine Funktion, bei der die "Gruppierung" von 3 Argumenten keine Rolle spielt (obwohl die Reihenfolge sicherlich von Belang sein kann). Dies ist eine Funktion x,y -> f(x,y) wo f(x, f(y, z)) = f(f(x, y), z) . Die Verkettung von Zeichenketten (oder allgemeiner Listen) ist eine solche Funktion.

Hier ist ein Beispiel, wie das aussehen könnte in pyspark ; hoffentlich können Sie dies an die Details Ihrer Daten anpassen:

%Vor%     
maxymoo 23.12.2015 00:17
quelle

Tags und Links