Ich habe einen RX-Producer, der einen Stream von Strings wie folgt erzeugt (vereinfachte Version des realen Streams):
A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....
Der Strom ist endlos, aber geordnet. Nachdem die Strings, die mit A
ausgehen, beginnen, beginnt B
. Wenn B
abläuft, beginnt C
... Wenn Z
abgelaufen ist, gehen wir zu AA1
usw. Es gibt eine unbekannte Anzahl von A
's, B
' s usw., aber das ist typisch 10-30 Instanzen pro Buchstabe.
Ich suche nach einer Möglichkeit, diesen Stream in Blöcke aller A's aufzuteilen: A1 A2 A3
, alle B's: B1 B2
, alle C's: C1 C2 C3 C4 C5 C6
usw. Jeder Block kann entweder ein Observable sein (was ich ' Ich werde zu einer Liste) oder einfach zu einer Liste.
Ich habe mehrere verschiedene Ansätze mit RxJava ausprobiert, die alle fehlgeschlagen sind. Zu den Dingen, die nicht funktionierten, gehören:
Gruppierung nach : Da der Stream endlos ist, wird die beobachtbare Variable nicht abgeschlossen. Wenn also das A ausläuft und das B beginnt, wird A's Observable nicht abgeschlossen. Es gibt also eine ständig wachsende Anzahl von Observablen.
Fenster / Puffer mit distinctUntilChanged - Ich verwende "distinctUntilChanged" im ursprünglichen Stream, um das erste Element jeder Gruppe auszugeben (das erste A, das erste B usw.). Dann benutze ich diesen Stream als Eingabe für window
oder einen "Puffer" -Operator, der als Grenze zwischen Fenstern / Puffern verwendet wird. Das hat nicht funktioniert und alles, was ich bekam, waren leere Listen.
Was ist eine richtige Lösung mit RX? Ich würde eine Java-Lösung bevorzugen, aber Lösungen in anderen Implementierungen von RX, die leicht in Java konvertiert werden können, sind ebenfalls sehr willkommen.
Hier ist ein Weg, wie ich das lösen könnte:
%Vor%So funktioniert es:
Sie können rxjava-extras .toListWhile
:
Es tut was @akarnokd unter den Abdeckungen getan hat und ist Unit-getestet.
Angenommen, wir haben einen source
-Stream von string
und eine Funktion key
, die den Schlüssel für jedes string
extrahiert, wie zum Beispiel:
Dann können wir Buffer
mit einem benutzerdefinierten Abschlussselektor verwenden.
Jeder Puffer endet, wenn das erste Element im Puffer und das aktuelle Element, das wir dem Puffer hinzufügen möchten, nicht denselben Schlüssel haben.
Nach dem Betrachten von akarnokd und Dave 's Antworten, habe ich meine eigene Lösung gefunden, indem ich einen benutzerdefinierten Rx-Operator namens BufferWhile
implementiert habe. Es scheint genauso zu funktionieren wie die anderen Lösungen (jemand korrigiert mich bitte, wenn ich falsch liege), aber es scheint einfacher zu sein:
Ich kann diesen Operator auf das ursprüngliche Observable mit lift
anwenden und ein Observable erhalten, das Listen von Strings ausgibt.
Tags und Links java c# reactive-programming rx-java system.reactive