Reaktive Streams - Batch mit Timeout

8

Ich bin dabei, eine selbst erstellte Bibliothek für die Protokollverarbeitung zu ersetzen, die ReactiveStreams mit io.projectreactor sehr nahe kommt. Das Ziel besteht darin, den von uns gepflegten Code zu reduzieren und neue Funktionen zu nutzen, die von der Community hinzugefügt wurden ("Eyeing Operator Fusion").

Als Erstes muss ich das stdio konsumieren und die mehrzeiligen Log-Einträge in Text-Blobs zusammenführen, die in der Pipeline fließen. Der Anwendungsfall wird ausführlich in mehrzeiligen Protokolleinträgen erläutert Dateibeats-Dokumente (außer wir wollen es in Bearbeitung).

Bisher ist der Code, den ich habe:

%Vor%

Dies sorgt für die mehrzeilige Zusammenführung, wenn ein neuer Protokollkopf erkannt wird, aber in der vorhandenen Bibliothek werden auch die akkumulierten Zeilen nach einer Zeitüberschreitung gelöscht (dh wenn innerhalb von 5 Sekunden kein Text empfangen wird, wird der Datensatz geleert).

Was wäre der richtige Weg, dies in Reactor zu modellieren? Muss ich meinen eigenen Operator schreiben, oder kann ich einen der vorhandenen anpassen?

Alle Hinweise auf relevante Beispiele und Dokumente zum Erreichen dieses Anwendungsfalls in Project Reactor oder RxJava wären sehr willkommen.

    
ddimitrov 12.07.2017, 10:48
quelle

2 Antworten

3

Es hängt davon ab, wie Sie den Anfang und das Ende jedes Puffers identifizieren, daher ist der folgende RxJava 2-Code als Hinweis darauf gedacht, den Wert der Hauptquelle zum Öffnen und Schließen des Puffers zu verwenden:

%Vor%

Drucke:

%Vor%

Es funktioniert, indem die Quelle über publish freigegeben wird, was die Wiederverwendung desselben Werts aus der Upstream-Umgebung ermöglicht, ohne dass mehrere Quellkopien gleichzeitig ausgeführt werden müssen. Die Öffnung wird durch die Erkennung eines "Start" -Strings auf der Linie bestimmt. Das Schließen wird entweder durch die Erkennung der Zeichenfolge "Ende" oder durch einen Timer ausgelöst, der nach einer Toleranzperiode ausgelöst wird.

Bearbeiten:

Wenn "Start" auch der Indikator für den nächsten Stapel ist, können Sie die Prüfung "Ende" durch "Start" ersetzen und den Inhalt des Puffers ändern, da der neue Header im vorherigen Puffer enthalten ist, andernfalls:

%Vor%     
akarnokd 19.07.2017, 23:27
quelle
1

buffer operator scheint am besten geeignet und einfache Lösung für mich.

Es hat größen- und zeitbasierte Strategien. Sie haben log, also denke ich, Sie können Zeilenanzahl als Puffergröße interpretieren.

Hier - Beispiel - Wie emittieren Elemente gruppiert nach 4 oder 5 Sekunden Zeitspanne:

%Vor%     
zella 19.07.2017 12:03
quelle