Wie kann man den Status eines Datensatzes aus einer Datenbank in Apache Flink nachschlagen und aktualisieren?

10

Ich arbeite an einer Daten-Streaming-Anwendung und untersuche die Möglichkeit, Apache Flink für dieses Projekt zu verwenden. Der Hauptgrund dafür ist, dass es nette High-Level-Streaming-Konstrukte unterstützt, die der Stream-API von Java 8 sehr ähnlich sind.

Ich werde Ereignisse empfangen, die einem bestimmten Datensatz in einer Datenbank entsprechen, und ich möchte diese Ereignisse (die von einem Nachrichtenbroker wie RabbitMQ oder Kafka kommen) verarbeiten und schließlich die Datensätze in der Datenbank aktualisieren und pushen können die verarbeiteten / transformierten Ereignisse in eine andere Senke (wahrscheinlich ein anderer Nachrichten-Broker).

Ereignisse, die sich auf einen bestimmten Datensatz beziehen, müssen idealerweise in der FIFO-Reihenfolge verarbeitet werden (obwohl es einen Zeitstempel geben wird, der auch Ereignisse außerhalb der Reihenfolge erkennt), aber Ereignisse in Bezug auf verschiedene Datensätze können parallel verarbeitet werden. Ich plante, das keyBy() -Konstrukt zu verwenden, um den Strom durch Aufzeichnung zu teilen.

Die Verarbeitung, die ausgeführt werden muss, hängt von den aktuellen Informationen in der Datenbank über den Datensatz ab. Ich bin jedoch nicht in der Lage, ein Beispiel oder einen empfohlenen Ansatz für die Abfrage einer Datenbank nach solchen Datensätzen zu finden, um das Ereignis, das verarbeitet wird, mit den zusätzlichen Informationen anzureichern, die ich für die Verarbeitung benötige.

Die Pipeline, an die ich denke, ist folgende:

- & gt; keyBy () für die ID empfangen - & gt; den Datensatz aus der Datenbank abrufen, der der ID entspricht - & gt; Führen Sie Verarbeitungsschritte für den Datensatz aus - & gt; schiebt das verarbeitete Ereignis in eine externe Warteschlange und aktualisiert den Datenbankeintrag

Der Datenbankeintrag muss aktualisiert werden, da eine andere Anwendung die Daten abfragt.

Es könnte zusätzliche Optimierungen geben, die man machen könnte, nachdem diese Pipeline erreicht wurde. Zum Beispiel könnte man den (aktualisierten) Datensatz in einem verwalteten Zustand zwischenspeichern, so dass das nächste Ereignis im selben Datensatz keine weitere Datenbankabfrage benötigt. Wenn die Anwendung jedoch einen bestimmten Datensatz nicht kennt, muss sie ihn aus der Datenbank abrufen.

Was ist der beste Ansatz für diese Art von Szenario in Apache Flink?

    
jbx 10.08.2016, 06:45
quelle

1 Antwort

5

Sie können eine Datenbanksuche durchführen, indem Sie eine reiche Funktion für z. a RichFlatMap function, initialisiere die Datenbankverbindung einmal in ihrer Methode open() und bearbeite dann jedes Ereignis in der Methode flatMap() :

%Vor%

Und dann können Sie DatabaseMapper wie folgt verwenden:

%Vor%

Sie finden hier ein Beispiel mit zwischengespeicherten Daten von Redis.

    
Yassine Marzougui 15.08.2016, 16:15
quelle

Tags und Links