Spark Streaming mit einer dynamischen Nachschlagetabelle

8

Ich bin derzeit dabei, Spark-Streaming zu verwenden, um Logfile-ähnliche Einträge aufzunehmen und aus statistischen Gründen etwas zu berechnen.

Es gibt Datensätze auf HDFS, auf die HBase und Hive gerade zugreifen können, die benötigt werden, um einige Daten nachzuschlagen und zu transformieren, z. B. Mappings zwischen IPs und Maschinennamen und Maschinenbesitzern.

Es wird erwartet, dass die Funkenanwendung Tag für Tag auf unserem Cluster läuft, ohne Neustart. Diese Referenztabellen werden jedoch alle paar Stunden aktualisiert.

Es ist in Ordnung, wenn die verwendeten Daten etwas alt sind, aber es ist nicht in Ordnung, wenn die Daten zwei Wochen alt sind. Daher möchte ich wissen, wie ich Daten für Transformationen und Anreicherungen in meiner Map suchen und Phasen reduzieren kann. Ich hatte ein paar Ideen.

  1. Broadcast-Variablen können den Datensatz einlesen und ihn effizient weiterleiten. Sobald jedoch eine Broadcast-Variable festgelegt wurde, kann sie nicht mehr geändert werden, und das erneute Abrufen der Daten in der Treiberklasse führt nicht dazu, dass das Nichtauftreten und Übertragen der neuen Variable funktioniert, da die Worker-Zeiger alle auf das alte Dataset zeigen. Ich weiß nicht, ob es eine Möglichkeit gibt, das zu umgehen.

  2. HBase get () Abfragen können gemacht werden. Wenn die Daten basierend auf dem Schlüssel der Suche zu Reduzierern geleitet werden, kann jeder Reduzierer einen Cache einer Teilmenge des Gesamtdatensatzes halten und kann seinen eigenen lokalen Cache halten. HBase sollte minimale Latenz beim Abrufen einzelner Datensätze haben.

  3. Noch etwas?

Simon Hollingshead 01.02.2015, 16:13
quelle

1 Antwort

3

Sie haben hier zwei Möglichkeiten.

Zuerst muss foreachRDD transformation auf deinem DStream verwendet werden. foreachRDD wird auf der Treiberseite ausgeführt, dh Sie können dort eine neue RDD erstellen. Sie können den Zeitzähler speichern und die Datei alle 10 bis 15 Minuten erneut aus HDFS lesen.

Zweitens müssen Sie eine Datei in der transform -Umwandlung über den DStream lesen und die Ergebnisse im Speicher ablegen. Bei diesem Ansatz müssen Sie die gesamte Nachschlagetabelle von jedem der Executoren lesen, was nicht effizient ist

Ich würde Ihnen empfehlen, den ersten Ansatz zu verwenden. Um noch genauer zu sein, können Sie das Flag bei der letzten Aktualisierung der Daten speichern und in Ihrer Spark-Anwendung speichern. Bei jeder Iteration prüfen Sie den Wert dieses Flags (z. B. in HBase oder Zookeeper gespeichert) und vergleichen ihn mit dem lokal gespeicherten - wenn es anders ist, dann lesen Sie die Nachschlagetabelle erneut, falls nicht - führen Sie den Vorgang mit der alte

    
0x0FFF 03.02.2015 16:41
quelle