Rx - Teilen Sie den Strom in Segmente (Listen) nach Bedingung

8

Ich habe einen RX-Producer, der einen Stream von Strings wie folgt erzeugt (vereinfachte Version des realen Streams):

A1 A2 A3 B1 B2 C1 C2 C3 C4 C5 C6....

Der Strom ist endlos, aber geordnet. Nachdem die Strings, die mit A ausgehen, beginnen, beginnt B . Wenn B abläuft, beginnt C ... Wenn Z abgelaufen ist, gehen wir zu AA1 usw. Es gibt eine unbekannte Anzahl von A 's, B ' s usw., aber das ist typisch 10-30 Instanzen pro Buchstabe.

Ich suche nach einer Möglichkeit, diesen Stream in Blöcke aller A's aufzuteilen: A1 A2 A3 , alle B's: B1 B2 , alle C's: C1 C2 C3 C4 C5 C6 usw. Jeder Block kann entweder ein Observable sein (was ich ' Ich werde zu einer Liste) oder einfach zu einer Liste.

Ich habe mehrere verschiedene Ansätze mit RxJava ausprobiert, die alle fehlgeschlagen sind. Zu den Dingen, die nicht funktionierten, gehören:

  • Gruppierung nach : Da der Stream endlos ist, wird die beobachtbare Variable nicht abgeschlossen. Wenn also das A ausläuft und das B beginnt, wird A's Observable nicht abgeschlossen. Es gibt also eine ständig wachsende Anzahl von Observablen.

  • Fenster / Puffer mit distinctUntilChanged - Ich verwende "distinctUntilChanged" im ursprünglichen Stream, um das erste Element jeder Gruppe auszugeben (das erste A, das erste B usw.). Dann benutze ich diesen Stream als Eingabe für window oder einen "Puffer" -Operator, der als Grenze zwischen Fenstern / Puffern verwendet wird. Das hat nicht funktioniert und alles, was ich bekam, waren leere Listen.

Was ist eine richtige Lösung mit RX? Ich würde eine Java-Lösung bevorzugen, aber Lösungen in anderen Implementierungen von RX, die leicht in Java konvertiert werden können, sind ebenfalls sehr willkommen.

    
Malt 17.09.2015, 07:58
quelle

4 Antworten

3

Hier ist ein Weg, wie ich das lösen könnte:

%Vor%

So funktioniert es:

  • Ich benutze Defer, weil wir einen Puffer pro Teilnehmer brauchen, keinen globalen
  • Ich verkette die Pufferung mit der Emission des letzten Puffers, wenn die Quelle endlich ist
  • Ich verwende concatMap und füge einen Puffer hinzu, bis sich der Schlüssel ändert. Bis dahin gebe ich leere Observables aus. Sobald sich der Schlüssel geändert hat, gebe ich den Inhalt des Puffers aus und starte einen neuen.
akarnokd 17.09.2015, 08:35
quelle
5

Sie können rxjava-extras .toListWhile :

verwenden %Vor%

Es tut was @akarnokd unter den Abdeckungen getan hat und ist Unit-getestet.

    
Dave Moten 17.09.2015 09:59
quelle
1

Angenommen, wir haben einen source -Stream von string und eine Funktion key , die den Schlüssel für jedes string extrahiert, wie zum Beispiel:

%Vor%

Dann können wir Buffer mit einem benutzerdefinierten Abschlussselektor verwenden.

%Vor%

Jeder Puffer endet, wenn das erste Element im Puffer und das aktuelle Element, das wir dem Puffer hinzufügen möchten, nicht denselben Schlüssel haben.

    
Timothy Shields 17.09.2015 14:35
quelle
0

Nach dem Betrachten von akarnokd und Dave 's Antworten, habe ich meine eigene Lösung gefunden, indem ich einen benutzerdefinierten Rx-Operator namens BufferWhile implementiert habe. Es scheint genauso zu funktionieren wie die anderen Lösungen (jemand korrigiert mich bitte, wenn ich falsch liege), aber es scheint einfacher zu sein:

%Vor%

Ich kann diesen Operator auf das ursprüngliche Observable mit lift anwenden und ein Observable erhalten, das Listen von Strings ausgibt.

    
Malt 17.09.2015 12:19
quelle