Hinzufügen von Elementen zu Java 8 parallelen Streams on-the-fly

8

Ziel ist es, einen kontinuierlichen Strom von Elementen mit Hilfe von Java 8 Streams zu verarbeiten. Daher werden während der Verarbeitung dieses Streams Elemente zur Datenquelle eines parallelen Streams hinzugefügt.

Das Javadoc von Streams beschreibt Folgendes Eigenschaften in Abschnitt "Nicht-Interferenz":

  

Bei den meisten Datenquellen bedeutet die Vermeidung von Interferenzen, dass sichergestellt wird, dass die Datenquelle während der Ausführung der Stream-Pipeline überhaupt nicht geändert wird. Die wesentliche Ausnahme sind Streams, deren Quellen gleichzeitige Collections sind, die speziell für die gleichzeitige Bearbeitung entwickelt wurden. Gleichzeitige Stream-Quellen sind diejenigen, deren Spliterator das CONCURRENT-Merkmal meldet.

Aus diesem Grund wird in unseren Versuchen eine ConcurrentLinkedQueue verwendet, die für

den Wert true zurückgibt %Vor%

Es wird nicht explizit gesagt, dass die Datenquelle nicht modifiziert werden darf, wenn sie in parallelen Streams verwendet wird.

In unserem Beispiel wird für jedes der Elemente im Stream der inkrementierte Zählerwert zur Warteschlange hinzugefügt, die die Datenquelle des Datenstroms ist, bis der Zähler größer als N ist. Mit dem Aufruf von queue.stream () funktioniert alles gut während der sequentiellen Ausführung:

%Vor%

Als zweiten Versuch ist der Datenstrom parallel und löst einen java.lang.AssertionError aus, weil die Prüfung kleiner als N ist und nicht jedes Element in der Warteschlange verarbeitet wurde. Der Stream wurde möglicherweise vorzeitig beendet, da die Warteschlange möglicherweise zu einem bestimmten Zeitpunkt leer geworden ist.

%Vor%

Der nächste Versuch bestand darin, den Haupt-Thread zu signalisieren, sobald der kontinuierliche Stream "wirklich" endete (die Warteschlange ist leer) und das Stream-Objekt danach zu schließen. Hier ist das Problem, dass das Stream-Objekt Elemente aus der Warteschlange nur einmal oder zumindest nicht kontinuierlich zu lesen scheint und niemals das "echte" Ende des Streams erreicht.

%Vor%

Die sich daraus ergebenden Fragen sind:

  • Haben wir etwas falsch gemacht?
  • Ist es eine nicht spezifizierte oder sogar falsche Verwendung der Stream-API?
  • Wie können wir sonst das gewünschte Verhalten erreichen?
L.M. Hackl 17.10.2016, 11:37
quelle

2 Antworten

2

Es besteht ein signifikanter Unterschied zwischen "die Quelle von Stream wird nicht geändert" und Ihre Annahme "Änderungen werden von der laufenden Operation Stream widergespiegelt".

Die CONCURRENT -Eigenschaft impliziert, dass die Änderung der Quelle erlaubt ist, dh dass sie niemals ConcurrentModificationException liefert, aber es bedeutet nicht, dass Sie sich auf ein bestimmtes Verhalten verlassen können ob diese Änderungen reflektiert werden oder nicht.

Die Dokumentation des CONCURRENT -Flagers selbst sagt:

  

Die meisten gleichzeitigen Sammlungen behalten eine Konsistenz-Richtlinie bei, die Genauigkeit in Bezug auf Elemente garantiert, die am Punkt der Spliterator-Konstruktion vorhanden sind, aber möglicherweise spätere Ergänzungen oder Entfernungen nicht widerspiegeln.

Dieses Stream-Verhalten stimmt mit dem bereits bekannten Verhalten von % überein. co_de% :

  

Iteratoren sind schwach konsistent und geben Elemente zurück, die den Status der Warteschlange an irgendeinem Punkt bei oder seit der Erstellung des Iterators widerspiegeln. Sie werfen nicht ConcurrentLinkedQueue und kann gleichzeitig mit anderen Operationen fortgesetzt werden. Elemente, die seit der Erstellung des Iterators in der Warteschlange enthalten sind, werden genau einmal zurückgegeben.

Es ist schwer zu sagen, wie man "das gewünschte Verhalten sonst erreicht", da Sie das "gewünschte Verhalten" in keiner anderen Form als Code beschrieben haben, die einfach durch

ersetzt werden kann %Vor%

das ist der einzige beobachtbare Effekt ... Betrachten Sie Ihr Problem neu zu definieren ...

    
Holger 17.10.2016 14:47
quelle
0

Stream kann kontinuierlich generiert werden oder aus einer Sammlung, die modifiziert wurde, und ist nicht dafür ausgelegt, kontinuierlich ausgeführt zu werden. Es wurde entwickelt, um die beim Start des Streams verfügbaren Elemente zu verarbeiten und nach der Verarbeitung zurückzugeben. Sobald das Ende erreicht ist, stoppt es.

  

Wie können wir sonst das gewünschte Verhalten erreichen?

Sie müssen einen anderen Ansatz verwenden. Ich würde ein ExecutorService verwenden, wo Sie übergeben Aufgabe, die Sie ausführen möchten.

Eine Alternative wäre, einen kontinuierlichen Strom zu verwenden, der blockiert, wenn kein Ergebnis verfügbar ist. Hinweis: Dadurch wird die vom parallelen Stream verwendete Common ForkJoinPool gesperrt und kein anderer Code kann sie verwenden.

    
Peter Lawrey 17.10.2016 11:58
quelle