Ich möchte ein einzelnes Stream
in ein Stream
von Streams
basierend auf dem Inhalt von Streams
aufteilen. Die resultierende Stream
sollte einen Teil der Daten der ursprünglichen Datenströme enthalten.
Meine reale Anwendung ist komplexer (es gruppiert Logzeilen, die sich in einer Liste von Zeitintervallen befinden), aber mein Problem ist, wie man mit den Streams umgehen soll. Hier frage ich nach einem vereinfachten Beispiel.
Ich möchte ein Stream<Integer>
in ein Stream<Stream<Integer>>
teilen können, basierend auf der gleichen Anzahl, die wiederholt wird, wobei nur die Ströme mit ungeraden Zahlen belassen werden.
Zum Beispiel der folgende Stream, der enthält:
{1,1,1,2,2,2,3,6,7,7,1,1}
Müsste zu einem Stream von Streams führen, der Folgendes enthält:
{{1,1,1},{3},{7,7},{1,1}}
Lassen Sie die geraden Zahlen weg, indem Sie mit einem Filter beginnen (oder enden):
%Vor%Dies ist unerwünscht, da dies bedeuten würde, jeden Eingangswert zweimal zu bewerten, das ist akzeptabel, aber ich würde es vorziehen, dies zu vermeiden.
Meine aktuelle Lösung besteht darin, den Inhalt des Streams zu durchlaufen und ein List<List<Integer>>
zu erstellen und dieses in ein Stream<Stream<Integer>>
umzuwandeln. Dies bedeutet jedoch, dass das vollständige Ergebnis im Speicher gehalten wird (was für meine Anwendung unerwünscht ist).
Ich denke auch, dass ich das ziehen könnte, indem ich meine eigene Iterator
schreibe, die aus dem Stream liest, aber ich bin nicht sicher, wie das funktionieren würde.
Wie kann ich ein Stream
in ein Stream
von Streams
konvertieren, basierend auf dem Inhalt des ursprünglichen Stream
, ohne das vollständige Ergebnis als List
von Lists
zuerst zu speichern.
Vielleicht möchten Sie Ihren eigenen aggregierenden Spliterator implementieren um dies zu tun. Es gibt bereits etwas Ähnliches in der proton-pack -Bibliothek (der erste Link verweist auf den im proton-pack implementierten Link).
Beachten Sie, dass Sie Stream<List<Integer>>
erhalten (Sie können versuchen, die Implementierung so zu ändern, dass Stream<Stream<Integer>>
direkt angezeigt wird, aber Sie müssen immer eine kleine Menge an Elementen puffern; abhängig von der Fenstergröße; um zu testen, ob Sie erstellen sollen ein neues Fenster oder nicht). Also zum Beispiel:
Ausgänge:
%Vor% Sie können meine StreamEx
Bibliothek verwenden. Es hat groupRuns
Was macht die Arbeit:
Anwendungsbeispiel:
%Vor%Ausgabe:
%Vor%Ähnlich wie bei der protonpack-Bibliothek gibt es einen benutzerdefinierten Spliterator, aber mit StreamEx können Sie die Vorteile der Parallelverarbeitung nutzen (das Protonpack spaltet überhaupt nicht).
Bei der sequentiellen Verarbeitung befindet sich höchstens eine Zwischenliste gleichzeitig im Speicher (andere sind für GC geeignet). Wenn Sie sich immer noch Gedanken über Speicherverbrauch machen (zum Beispiel haben Sie sehr lange Gruppen), gibt es seit StreamEx 0.3.3 eine alternative Möglichkeit, diese Aufgabe zu lösen:
%Vor% Die Methode runLengths
gibt den Datenstrom zurück, wobei key der Wert des Elements ist und value die Anzahl der benachbarten Werte wiederholende Elemente. Danach wird StreamEx.constant
verwendet, was eine Verknüpfung für Stream.generate(() -> value).limit(length)
ist. So haben Sie auch bei sehr langen Gruppen einen konstanten Zwischenspeicherverbrauch. Natürlich ist diese Version auch parallel-freundlich.
Update: StreamEx 0.3.3 wird freigegeben, daher ist jetzt auch die zweite Lösung möglich.
Ich fürchte, es ist nicht machbar, zumindest nicht auf eine nette Art. Selbst wenn Sie die Elemente in Streams mappen und sie reduzieren, müssen diese internen Streams wissen, welche Elemente sie enthalten, damit sie etwas speichern müssen.
Die einfachste Lösung besteht darin, groupingBy
zu verwenden, jedoch werden alle Ergebnisse in der Karte gespeichert:
Sie könnten versuchen, reduce
operation zu verwenden, aber Sie müssten Ihren eigenen Stream von Streams implementieren, in dem Sie die Elemente speichern müssten, die jeder Stream sowieso enthält. Ganz zu schweigen davon, dass es viel Aufwand wäre, es umzusetzen.
Die beste Lösung, an die ich in Ihrem Fall denken kann, wäre, die Liste zweimal zu durchlaufen:
%Vor% Beachten Sie jedoch, dass es O(n^2)
Zeitkomplexität hat.
BEARBEITEN:
Diese Lösung hat nur lokale Elementgruppen. Es speichert nur die aktuelle lokale Gruppe.
%Vor%Ausgabe:
%Vor% Wie @Jaroslaw, habe ich auch Map verwendet, um die verschiedenen Streams zu halten. Es ist jedoch möglich, dass die Karte Streams enthält, die aus der Eingabe erstellt und nicht im Voraus erfasst werden. Mit Stream.concat
und Stream.of
können Sie ein Element zu einem Stream hinzufügen:
Ausgabe:
%Vor%Tags und Links java java-8 java-stream