Wie parallel (...) auf einem benutzerdefinierten Iterator mit einem Thread in Rust?

8

Ich habe eine MyReader , die Iterator implementiert und Buffer s mit Buffer : Send erzeugt. MyReader erzeugt sehr schnell eine Menge Buffer s, aber ich habe einen CPU-intensiven Job für jede Buffer ( .map(|buf| ...) ), die meinen Engpass darstellt, und sammle dann die Ergebnisse (geordnet). Ich möchte die CPU-intensive Arbeit parallelisieren - hoffentlich zu N Threads, die Arbeitsstehlen verwenden würden, um sie so schnell auszuführen, wie es die Anzahl der Kerne erlaubt.

Bearbeiten: Um genau zu sein. Ich arbeite an rdedup . MyStruct ist Chunker , liest io::Read (typischerweise stdio), findet Teile (Chunks) von Daten und gibt sie aus. Dann soll map() für jeden Chunk sha256 digest davon berechnen, komprimieren, verschlüsseln, speichern und den Digest als Ergebnis von map(...) zurückgeben. Digest der gespeicherten Daten wird verwendet, um index der Daten zu erstellen. Die Reihenfolge zwischen Stücken, die von map(...) verarbeitet werden, spielt keine Rolle, aber der von jedem map(...) zurückgegebene Digest muss in der gleichen Reihenfolge wie die Chunks gesammelt werden. Der eigentliche Schritt save to file wird in einen anderen Thread (writer thread) ausgelagert. aktueller PR-Code

Ich hoffe, ich kann rayon dafür verwenden, aber rayon erwartet einen Iterator, der bereits parallellierbar ist - z. a Vec<...> oder so ähnlich. Ich habe keine Möglichkeit gefunden, ein par_iter von MyReader zu erhalten - mein Leser ist sehr single-threaded in der Natur.

Es gibt simple_parallel , aber die Dokumentation sagt, dass es nicht für den allgemeinen Gebrauch empfohlen wird. Und ich möchte sicherstellen, dass alles funktioniert.

Ich könnte einfach eine spmc-Warteschlangenimplementierung und eine benutzerdefinierte thread_pool verwenden, aber ich habe nach einer bestehenden Lösung gesucht, die optimiert und getestet wurde.

Es gibt auch pipeliner , unterstützt jedoch noch keine geordnete Karte.

    
dpc.pw 27.02.2017, 01:23
quelle

1 Antwort

5

Im Allgemeinen ist die Aufrechterhaltung der Ordnung eine ziemlich schwierige Voraussetzung, was die Parallelisierung betrifft.

Sie könnten versuchen, es manuell mit einem typischen Fan-Out / Fan-In-Setup zu erstellen:

  • ein einzelner Produzent, der Eingaben mit einer sequentiell monoton steigenden ID markiert,
  • ein Thread-Pool, der von diesem Produzenten konsumiert und dann das Ergebnis an den Endverbraucher sendet,
  • ein Konsument, der Ergebnisse puffert und neu anordnet, um sie in der sequentiellen Reihenfolge zu behandeln.

Oder Sie könnten das Abstraktionsniveau erhöhen.

Von besonderem Interesse hier: Future .

A Future repräsentiert das Ergebnis einer Berechnung, die möglicherweise noch nicht geschehen ist. Ein Verbraucher, der eine geordnete Liste von Future erhält, kann einfach auf jeden warten und die Pufferung in der Warteschlange natürlich passieren lassen.

Wenn Sie bei Bonuspunkten eine Warteschlange mit fester Größe verwenden, erhalten Sie automatisch einen Staudruck auf den Verbraucher.

Und deshalb würde ich empfehlen, etwas von CpuPool zu erstellen.

Das Setup wird sein:

%Vor%     
Matthieu M. 27.02.2017, 09:38
quelle