Ich habe eine MyReader
, die Iterator
implementiert und Buffer
s mit Buffer : Send
erzeugt. MyReader
erzeugt sehr schnell eine Menge Buffer
s, aber ich habe einen CPU-intensiven Job für jede Buffer
( .map(|buf| ...)
), die meinen Engpass darstellt, und sammle dann die Ergebnisse (geordnet). Ich möchte die CPU-intensive Arbeit parallelisieren - hoffentlich zu N Threads, die Arbeitsstehlen verwenden würden, um sie so schnell auszuführen, wie es die Anzahl der Kerne erlaubt.
Bearbeiten: Um genau zu sein. Ich arbeite an rdedup
. MyStruct
ist Chunker
, liest io::Read
(typischerweise stdio), findet Teile (Chunks) von Daten und gibt sie aus. Dann soll map()
für jeden Chunk sha256 digest davon berechnen, komprimieren, verschlüsseln, speichern und den Digest als Ergebnis von map(...)
zurückgeben. Digest der gespeicherten Daten wird verwendet, um index
der Daten zu erstellen. Die Reihenfolge zwischen Stücken, die von map(...)
verarbeitet werden, spielt keine Rolle, aber der von jedem map(...)
zurückgegebene Digest muss in der gleichen Reihenfolge wie die Chunks gesammelt werden. Der eigentliche Schritt save
to file wird in einen anderen Thread (writer thread) ausgelagert. aktueller PR-Code
Ich hoffe, ich kann rayon
dafür verwenden, aber rayon
erwartet einen Iterator, der bereits parallellierbar ist - z. a Vec<...>
oder so ähnlich. Ich habe keine Möglichkeit gefunden, ein par_iter
von MyReader
zu erhalten - mein Leser ist sehr single-threaded in der Natur.
Es gibt simple_parallel
, aber die Dokumentation sagt, dass es nicht für den allgemeinen Gebrauch empfohlen wird. Und ich möchte sicherstellen, dass alles funktioniert.
Ich könnte einfach eine spmc-Warteschlangenimplementierung und eine benutzerdefinierte thread_pool
verwenden, aber ich habe nach einer bestehenden Lösung gesucht, die optimiert und getestet wurde.
Es gibt auch pipeliner
, unterstützt jedoch noch keine geordnete Karte.
Im Allgemeinen ist die Aufrechterhaltung der Ordnung eine ziemlich schwierige Voraussetzung, was die Parallelisierung betrifft.
Sie könnten versuchen, es manuell mit einem typischen Fan-Out / Fan-In-Setup zu erstellen:
Oder Sie könnten das Abstraktionsniveau erhöhen.
Von besonderem Interesse hier: Future
.
A Future
repräsentiert das Ergebnis einer Berechnung, die möglicherweise noch nicht geschehen ist. Ein Verbraucher, der eine geordnete Liste von Future
erhält, kann einfach auf jeden warten und die Pufferung in der Warteschlange natürlich passieren lassen.
Wenn Sie bei Bonuspunkten eine Warteschlange mit fester Größe verwenden, erhalten Sie automatisch einen Staudruck auf den Verbraucher.
Und deshalb würde ich empfehlen, etwas von CpuPool
zu erstellen.
Das Setup wird sein:
%Vor%Tags und Links multithreading parallel-processing rust