Kombinieren Sie Ergebnisse aus Batch-RDD mit Streaming-RDD in Apache Spark

9

Kontext: Ich verwende Apache Spark, um eine laufende Anzahl verschiedener Ereignistypen aus Protokollen zu aggregieren. Die Protokolle werden sowohl in Cassandra für historische Analysezwecke als auch in Kafka für Echtzeitanalysezwecke gespeichert. Jedes Protokoll hat einen Datums- und Ereignistyp. Aus Gründen der Einfachheit nehmen wir an, ich wollte die Anzahl der Protokolle eines einzelnen Typs für jeden Tag verfolgen.

Wir haben zwei RDDs, eine RDD von Batch-Daten von Cassandra und eine weitere Streaming-RDD von Kafka. Pseudocode:

%Vor%

Frage: Wie kombiniere ich die Ergebnisse von streamRDD mit dem BatchRDD? Nehmen wir an, dass batchRDD die folgenden Daten enthält und dieser Job am 16.10.2014 ausgeführt wurde:

%Vor%

Da die Cassandra-Abfrage nur alle Daten bis zur Startzeit der Stapelabfrage enthielt, müssen wir nach Abschluss der Abfrage von Kafka lesen, wobei wir nur Protokolle nach der Startzeit des Jobs berücksichtigen. Wir nehmen an, dass die Abfrage sehr lange dauert. Das bedeutet, dass ich die historischen Ergebnisse mit den Streaming-Ergebnissen kombinieren muss.

Zur Veranschaulichung:

%Vor%

Dann nehmen wir an, dass wir im ersten Stream-Batch diese Daten erhalten haben:

%Vor%

Dann möchte ich die Batch RDD mit diesem Stream RDD kombinieren, so dass der Stream RDD jetzt den Wert:

hat %Vor%

Dann nehmen wir an, dass wir im zweiten Stream-Batch diese Daten erhalten haben:

%Vor%

Dann sollte der Stream RDD aktualisiert werden, um den Wert zu haben:

%Vor%

Und so weiter ...

Es ist möglich, streamRDD.transformToPair(...) zu verwenden, um die streamRDD-Daten mit den batchRDD-Daten mit join zu kombinieren, aber wenn wir dies für jeden Stream-Chunk machen, würden wir die Anzahl aus dem batchRDD für jeden Stream-Chunk addieren der Statuswert "doppelt gezählt", wenn er nur dem ersten Stream-Chunk hinzugefügt werden sollte.

    
Bobby 23.10.2014, 02:59
quelle

2 Antworten

5

Um diesen Fall anzugehen, würde ich die Basis-rdd mit dem Ergebnis der aggregierten StateDStream verbinden, die die Gesamtsummen der Streaming-Daten enthält. Dies stellt effektiv eine Grundlinie für Daten bereit, die in jedem Streaming-Intervall berichtet werden, ohne die x-Zeiten der Grundlinie zu zählen.

Ich habe diese Idee mit dem Beispiel WordCount versucht und es funktioniert. Löschen Sie dies auf der REPL für ein Live-Beispiel:

(Verwenden Sie nc -lk 9876 in einer separaten Shell, um dem socketTextStream eine Eingabe zu geben)

%Vor%     
maasg 24.10.2014, 16:53
quelle
0

Sie könnten updateStateByKey ausprobieren:

%Vor%     
mithra 23.10.2014 11:14
quelle