Wie wird ein Thread-sicherer Collector implementiert?

9

Ich möchte etwas ähnlich wie Collectors.maxBy() haben, ein Kollektor, der die obersten Elemente in einer Sammlung erhält ( maxBy erhält nur eins).

Ich habe einen Stream von Possibility -Objekten, die mit einer Integer score(Possibility) -Methode bewertet werden können.

Zuerst habe ich versucht:

%Vor%

Aber wenn ich das tue, scanne ich die Sammlung dreimal. Einmal, um es zu bauen, ein zweites Mal, um die höchste Punktzahl zu erhalten, und ein drittes Mal, um es zu filtern, und das ist nicht optimal. Außerdem könnte die Anzahl der Möglichkeiten sehr groß sein (& gt; 10 12 ).

Der beste Weg sollte sein, direkt die besten Möglichkeiten in der ersten Sammlung zu bekommen, aber es scheint keinen eingebauten Sammler zu geben, um so etwas zu tun.

Also habe ich mein eigenes Collector :

implementiert %Vor%

Und dann:

%Vor%

Und das macht den Job in einem sequentiellen Modus (ohne .parallel() ), aber im parallelen Modus gibt es gelegentlich Ausnahmen an zwei Stellen:

  • A java.lang.IndexOutOfBoundsException Index: 0, Size: 0 in der Zeile:

    %Vor%

der Methode accumulator()

Ich verstehe, dass es passiert, wenn ein list.clear() zwischen list.isEmpty() und list.get(0) aufgerufen wird.

  • A java.lang.NullPointerException in der Methode score (Possibility), weil die Möglichkeit null ist. Auch hier ist die gleiche Zeile beteiligt: ​​

    %Vor%

Ich verstehe nicht, wie list.get(0) null ...

zurückgeben könnte

Im parallelen Modus erhöht list.get(0) manchmal IndexOutOfBoundsException und gibt manchmal null zurück.

Ich verstehe, dass mein Code nicht Thread-sicher ist, also habe ich mehrere Lösungen ausprobiert:

  • Fügen Sie synchronized in allen Methoden von BestCollector hinzu: public synchronized …
  • Verwenden Sie eine Thread-sichere Sammlung anstelle von ArrayList : java.util.concurrent.CopyOnWriteArrayList
  • Fügen Sie synchronized hinzu und verwenden Sie gleichzeitig CopyOnWriteArrayList
  • Entfernen Sie Characteristics.CONCURRENT aus der Set<Characteristics> der characteristics() Methode

    %Vor%

Aber ich weiß nicht, ob der Characteristics.CONCURRENT hier ist, um anzuzeigen, dass mein Code Thread-sicher ist oder dass mein Code in einer Parallelverarbeitung verwendet wird.

Aber keine dieser Lösungen löst das Problem tatsächlich.

Tatsächlich, wenn ich CONCURRENT aus den Eigenschaften entferne, gibt es manchmal java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 , aber in der Zeile:

%Vor%

der Methode combiner() .

Die durch die Methode accumulator() ausgelösten Ausnahmen scheinen jedoch nicht mehr aufzutreten.

@ Holgers Antwort ist richtig.

Die vollständige Lösung besteht darin, die Methoden combiner() und characteristics() zu ändern:

%Vor%     
kwisatz 28.04.2015, 10:27
quelle

1 Antwort

8

Ihr Code weist nur einen signifikanten Fehler auf: Wenn Ihr Collector nicht Thread-sicher ist, sollte er nicht Characteristics.CONCURRENT melden, da dieser genau angibt, dass er threadsicher war.

Der wichtige Punkt, den Sie verstehen müssen, ist, dass das Framework für nicht CONCURRENT -Kollektoren die notwendigen Schritte ausführt, um es auf threadsichere, aber dennoch effiziente Weise zu verwenden:

  • Für jeden Worker-Thread wird ein neuer Container über supplier() erworben.
  • Jeder Arbeiter verwendet die Funktion accumulator() zusammen mit seinem eigenen lokalen Container
  • combiner() wird verwendet, sobald zwei Arbeitsthreads ihre Arbeit beendet haben
  • Das finisher() wird verwendet, wenn alle Worker-Threads ihre Arbeit beendet haben und alle Container kombiniert wurden

Sie müssen also nur sicherstellen, dass Ihr Lieferant wirklich eine neue Instanz bei jedem Aufruf zurückgibt und dass alle Funktionen nicht störend und nebenwirkungsfrei sind (in Bezug auf alles andere als den Container, den sie als Argumente erhalten) und natürlich nicht Characteristics.CONCURRENT , wenn Ihr Kollektor kein gleichzeitiger Kollektor ist.

Sie benötigen weder das Schlüsselwort synchronized noch gleichzeitige Sammlungen.

Übrigens kann eine Comparator der Form (p1, p2) -> score(p1).compareTo(score(p2)) mit Comparator.comparing(p -> score(p)) implementiert werden, oder wenn der Score-Wert ein int : Comparator.comparingInt(p -> score(p)) ist.

Schließlich überprüft Ihre Combiner-Funktion nicht, ob eine der Listen leer ist. Dies erklärt perfekt IndexOutOfBoundsException in combiner , während IndexOutOfBoundsException in accumulator das Ergebnis Ihrer Sammelberichterstattung Characteristics.CONCURRENT ...

ist

Es ist auch wichtig zu verstehen, dass das Hinzufügen eines synchronized Schlüsselworts zu einer accumulator() oder combiner() Methode die über einen Lambda-Ausdruck konstruierte Funktion nicht schützt. Es schützt die Methode, die die Funktionsinstanz konstruiert, aber nicht den Code der Funktion selbst. Im Gegensatz zu einer inneren Klasse gibt es keine Möglichkeit, ein synchronized -Schlüsselwort zu der Implementierungsmethode der tatsächlichen Funktion hinzuzufügen.

    
Holger 28.04.2015, 13:04
quelle