Erzeuge Stream von Streams aus einem langen Stream

8

Ich möchte ein einzelnes Stream in ein Stream von Streams basierend auf dem Inhalt von Streams aufteilen. Die resultierende Stream sollte einen Teil der Daten der ursprünglichen Datenströme enthalten.

Meine reale Anwendung ist komplexer (es gruppiert Logzeilen, die sich in einer Liste von Zeitintervallen befinden), aber mein Problem ist, wie man mit den Streams umgehen soll. Hier frage ich nach einem vereinfachten Beispiel.

Beispielproblem

Ich möchte ein Stream<Integer> in ein Stream<Stream<Integer>> teilen können, basierend auf der gleichen Anzahl, die wiederholt wird, wobei nur die Ströme mit ungeraden Zahlen belassen werden.

Zum Beispiel der folgende Stream, der enthält:

{1,1,1,2,2,2,3,6,7,7,1,1}

Müsste zu einem Stream von Streams führen, der Folgendes enthält:

{{1,1,1},{3},{7,7},{1,1}}

Lassen Sie die geraden Zahlen weg, indem Sie mit einem Filter beginnen (oder enden):

%Vor%

Dies ist unerwünscht, da dies bedeuten würde, jeden Eingangswert zweimal zu bewerten, das ist akzeptabel, aber ich würde es vorziehen, dies zu vermeiden.

Ideen für Lösungen

Meine aktuelle Lösung besteht darin, den Inhalt des Streams zu durchlaufen und ein List<List<Integer>> zu erstellen und dieses in ein Stream<Stream<Integer>> umzuwandeln. Dies bedeutet jedoch, dass das vollständige Ergebnis im Speicher gehalten wird (was für meine Anwendung unerwünscht ist).

Ich denke auch, dass ich das ziehen könnte, indem ich meine eigene Iterator schreibe, die aus dem Stream liest, aber ich bin nicht sicher, wie das funktionieren würde.

Frage

Wie kann ich ein Stream in ein Stream von Streams konvertieren, basierend auf dem Inhalt des ursprünglichen Stream , ohne das vollständige Ergebnis als List von Lists zuerst zu speichern.

    
Thirler 24.06.2015, 08:43
quelle

4 Antworten

5

Vielleicht möchten Sie Ihren eigenen aggregierenden Spliterator implementieren um dies zu tun. Es gibt bereits etwas Ähnliches in der proton-pack -Bibliothek (der erste Link verweist auf den im proton-pack implementierten Link).

Beachten Sie, dass Sie Stream<List<Integer>> erhalten (Sie können versuchen, die Implementierung so zu ändern, dass Stream<Stream<Integer>> direkt angezeigt wird, aber Sie müssen immer eine kleine Menge an Elementen puffern; abhängig von der Fenstergröße; um zu testen, ob Sie erstellen sollen ein neues Fenster oder nicht). Also zum Beispiel:

%Vor%

Ausgänge:

%Vor%     
Alexis C. 24.06.2015, 09:34
quelle
3

Sie können meine StreamEx Bibliothek verwenden. Es hat groupRuns Was macht die Arbeit:

%Vor%

Anwendungsbeispiel:

%Vor%

Ausgabe:

%Vor%

Ähnlich wie bei der protonpack-Bibliothek gibt es einen benutzerdefinierten Spliterator, aber mit StreamEx können Sie die Vorteile der Parallelverarbeitung nutzen (das Protonpack spaltet überhaupt nicht).

Bei der sequentiellen Verarbeitung befindet sich höchstens eine Zwischenliste gleichzeitig im Speicher (andere sind für GC geeignet). Wenn Sie sich immer noch Gedanken über Speicherverbrauch machen (zum Beispiel haben Sie sehr lange Gruppen), gibt es seit StreamEx 0.3.3 eine alternative Möglichkeit, diese Aufgabe zu lösen:

%Vor%

Die Methode runLengths gibt den Datenstrom zurück, wobei key der Wert des Elements ist und value die Anzahl der benachbarten Werte wiederholende Elemente. Danach wird StreamEx.constant verwendet, was eine Verknüpfung für Stream.generate(() -> value).limit(length) ist. So haben Sie auch bei sehr langen Gruppen einen konstanten Zwischenspeicherverbrauch. Natürlich ist diese Version auch parallel-freundlich.

Update: StreamEx 0.3.3 wird freigegeben, daher ist jetzt auch die zweite Lösung möglich.

    
Tagir Valeev 24.06.2015 12:29
quelle
2

Ich fürchte, es ist nicht machbar, zumindest nicht auf eine nette Art. Selbst wenn Sie die Elemente in Streams mappen und sie reduzieren, müssen diese internen Streams wissen, welche Elemente sie enthalten, damit sie etwas speichern müssen.

Die einfachste Lösung besteht darin, groupingBy zu verwenden, jedoch werden alle Ergebnisse in der Karte gespeichert:

%Vor%

Sie könnten versuchen, reduce operation zu verwenden, aber Sie müssten Ihren eigenen Stream von Streams implementieren, in dem Sie die Elemente speichern müssten, die jeder Stream sowieso enthält. Ganz zu schweigen davon, dass es viel Aufwand wäre, es umzusetzen.

Die beste Lösung, an die ich in Ihrem Fall denken kann, wäre, die Liste zweimal zu durchlaufen:

%Vor%

Beachten Sie jedoch, dass es O(n^2) Zeitkomplexität hat.

BEARBEITEN:

Diese Lösung hat nur lokale Elementgruppen. Es speichert nur die aktuelle lokale Gruppe.

%Vor%

Ausgabe:

%Vor%     
Jaroslaw Pawlak 24.06.2015 09:17
quelle
-1

Wie @Jaroslaw, habe ich auch Map verwendet, um die verschiedenen Streams zu halten. Es ist jedoch möglich, dass die Karte Streams enthält, die aus der Eingabe erstellt und nicht im Voraus erfasst werden. Mit Stream.concat und Stream.of können Sie ein Element zu einem Stream hinzufügen:

%Vor%

Ausgabe:

%Vor%     
Sharon Ben Asher 24.06.2015 09:29
quelle

Tags und Links