So kombinieren Sie Streaming-Daten mit großen Verlaufsdaten in Dataflow / Beam

10

Ich untersuche Verarbeitungsprotokolle von Web-Benutzersitzungen über Google Dataflow / Apache Beam und muss die Logs der Benutzer beim Eintreffen (Streaming) mit dem Verlauf einer Benutzersitzung aus dem letzten Monat kombinieren.

Ich habe mir die folgenden Ansätze angeschaut:

  1. Verwenden Sie ein festes 30-Tage-Fenster: Wahrscheinlich passt ein großes Fenster in den Speicher, und ich muss den Verlauf des Benutzers nicht aktualisieren, sondern einfach darauf verweisen
  2. Verwenden Sie CoGroupByKey, um zwei Datensätze zu verbinden, aber die beiden Datensätze müssen die gleiche Fenstergröße haben ( Ссылка ), was in meinem Fall nicht stimmt (24h vs. 30 Tage)
  3. Verwenden Sie Side Input, um den Sitzungsverlauf des Benutzers für eine bestimmte element in processElement(ProcessContext processContext) abzurufen.

Nach meinem Verständnis müssen die über .withSideInputs(pCollectionView) geladenen Daten in den Speicher passen. Ich weiß, dass ich den gesamten Sitzungsverlauf eines einzelnen Benutzers in den Speicher aufnehmen kann, aber nicht alle Sitzungsverläufe.

Meine Frage ist, ob es eine Möglichkeit gibt, Daten von einer Nebeneingabe zu laden / zu streamen, die nur für die aktuelle Benutzersitzung relevant ist.

Ich stelle mir eine ParDo-Funktion vor, die die History-Sitzung des Benutzers von der Seiteneingabe lädt, indem sie die ID des Benutzers angibt. Aber nur die History-Sitzung des aktuellen Benutzers würde in den Speicher passen; Das Laden von all Verlaufssitzungen über die Seiteneingabe wäre zu groß.

Einige Pseudo-Code zur Veranschaulichung:

%Vor%     
Florian 29.04.2016, 00:21
quelle

1 Antwort

1

Es gibt derzeit keine Möglichkeit, auf die Eingaben per Key-Seite im Streaming zuzugreifen, aber es wäre definitiv genau so nützlich, wie Sie es beschreiben, und wir denken darüber nach, es zu implementieren.

Eine mögliche Problemumgehung besteht darin, die Seiteneingaben zu verwenden, um Zeiger auf den tatsächlichen Sitzungsverlauf zu verteilen. Der Code, der die 24h-Sitzungshistorien generiert, könnte sie in GCS / BigQuery / etc hochladen und dann die Orte als Nebeneingabe an den Fügecode senden.

    
danielm 30.04.2016, 00:58
quelle