Ein ThreadPoolExecutor in einem ProcessPoolExecutor

8

Ich bin neu im Futures-Modul und habe eine Aufgabe, die von der Parallelisierung profitieren könnte. aber ich bin anscheinend nicht in der Lage, genau herauszufinden, wie man die Funktion für einen Thread und die Funktion für einen Prozess einrichtet. Ich würde jede Hilfe schätzen, die jemand in der Sache werfen kann.

Ich führe eine Partikelschwarmoptimierung (PSO) durch. Ohne zu sehr auf PSO selbst einzugehen, hier ist das grundlegende Layout meines Codes:

Es gibt eine Particle -Klasse mit einer getFitness(self) -Methode (die einige Metrik berechnet und sie in self.fitness speichert). Eine PSO-Simulation hat mehrere Partikelinstanzen (leicht über 10, 100 oder sogar 1000 für einige Simulationen).
Von Zeit zu Zeit muss ich die Fitness der Partikel berechnen. Derzeit mache ich das in for-Schleife:

%Vor%

Allerdings bemerke ich, dass die Fitness jedes Teilchens unabhängig voneinander berechnet werden kann. Dies macht diese Fitnessrechnung zu einem idealen Kandidaten für die Parallelisierung. Tatsächlich könnte ich map(lambda p: p.getFitness(args), listOfParticles) machen.

Jetzt kann ich das leicht mit futures.ProcessPoolExecutor :

machen %Vor%

Da die Nebenwirkungen des Aufrufs von p.getFitness in jedem Partikel selbst gespeichert sind, muss ich mir keine Sorgen darüber machen, ob ich eine Rückgabe von futures.ProcessPoolExecutor() bekomme.

So weit, so gut. Aber jetzt merke ich, dass ProcessPoolExecutor neue Prozesse erstellt, was bedeutet, dass es Speicher kopiert, was langsam ist. Ich möchte Speicher teilen können - also sollte ich Threads verwenden. Das ist gut und gut, bis ich merke, dass das Ausführen mehrerer Prozesse mit mehreren Threads in jedem Prozess wahrscheinlich schneller ist, da mehrere Threads immer noch nur auf einem Prozessor meiner süßen 8-Core-Maschine laufen.

Hier komme ich in Schwierigkeiten:
Basierend auf den Beispielen, die ich gesehen habe, operiert ThreadPoolExecutor auf einem list . So auch ProcessPoolExecutor . Also kann ich nichts in ProcessPoolExecutor iterativ machen, um nach ThreadPoolExecutor zu farmen, denn dann wird ThreadPoolExecutor ein einzelnes Objekt zur Bearbeitung bekommen (siehe mein Versuch, unten veröffentlicht).
Auf der anderen Seite kann ich listOfParticles nicht selbst schneiden, weil ich ThreadPoolExecutor mag, um herauszufinden, wie viele Threads benötigt werden.

Also, die große Frage (endlich) :
Wie sollte ich meinen Code so strukturieren, dass ich effektiv die folgenden beiden Prozesse UND-Threads parallelisieren kann:

%Vor%

Das ist, was ich versucht habe, aber ich würde es nicht wagen, es zu versuchen, denn ich weiß, dass es nicht funktionieren wird:

%Vor%

Ich würde mich freuen, wenn ich darüber nachdenke, wie ich das beheben oder sogar meinen Ansatz verbessern kann

Wenn es darauf ankommt, bin ich auf python3.3.2

    
inspectorG4dget 15.11.2013, 05:58
quelle

2 Antworten

10

Ich gebe dir Arbeitscode, der Prozesse mit Threads mischt, um das Problem zu lösen, aber das ist nicht das, was du erwartest ;-) Das erste ist, ein Programm zu erstellen, das deine realen Daten nicht gefährdet. Experimentiere mit etwas Harmlosem. Also hier ist der Anfang:

%Vor%

Jetzt haben wir etwas zum Spielen. Als nächstes einige Konstanten:

%Vor%

Fiddle diejenigen nach Geschmack. CHUNKSIZE wird später erklärt.

Die erste Überraschung für Sie ist, was meine unterste Arbeiterfunktion tut. Das liegt daran, dass Sie hier zu optimistisch sind:

  

Da die Nebenwirkungen des Aufrufs von p.getFitness in gespeichert sind   jedes Teilchen selbst, muss ich mir keine Sorgen machen   Rückgabe von futures.ProcessPoolExecutor ().

Leider kann nothing in einem Worker-Prozess keine Auswirkungen auf die Particle -Instanzen in Ihrem Hauptprogramm haben. Ein Worker-Prozess funktioniert auf Kopien von Particle -Instanzen, ob über eine Copy-on-Write-Implementierung von fork() oder weil er an einer Kopie arbeitet, die durch das Entfernen einer Particle pickle erstellt wurde .

Wenn Sie möchten, dass Ihr Hauptprogramm die Fitnessergebnisse sieht , müssen Sie veranlassen, dass Informationen an das Hauptprogramm gesendet werden. Da ich nicht genug über Ihr aktuelles Programm weiß, gehe ich hier davon aus, dass Particle().i eine eindeutige Ganzzahl ist und dass das Hauptprogramm Integer ganz einfach auf Particle -Instanzen zurückbilden kann. Vor diesem Hintergrund muss die Worker-Funktion auf der untersten Ebene ein Paar zurückgeben: die eindeutige Ganzzahl und das Fitness-Ergebnis:

%Vor%

Da ist es leicht, eine Liste von Particle s über Threads zu verteilen und eine Liste von (particle_id, fitness) results zurückzuliefern:

%Vor%

Anmerkungen:

  1. Das ist die Funktion, die jeder Arbeitsprozess ausführt.
  2. Ich verwende Python 3, also verwende list() , um e.map() zu zwingen, alle Ergebnisse in einer Liste zu materialisieren.
  3. Wie in einem Kommentar erwähnt, ist es unter CPython so, dass CPU-gebundene Tasks über Threads hinweg langsamer sind als alle in einem einzigen Thread.

Es bleibt nur noch Code zu schreiben, um eine Liste von Particle s auf Prozesse zu verteilen und die Ergebnisse abzurufen. Das ist kinderleicht mit multiprocessing , also werde ich das verwenden. Ich habe keine Ahnung, ob concurrent.futures es kann (da wir auch Threads mischen), aber das ist mir egal. Aber weil ich dir Arbeitscode gebe, kannst du damit spielen und zurückmelden ;-)

%Vor%

Anmerkungen:

  1. Ich bringe die Liste von Particle s in Stücke "von Hand". Dafür steht CHUNKSIZE . Das liegt daran, dass ein Worker-Prozess eine Liste von Particle s benötigt, um daran zu arbeiten, und das wiederum deshalb, weil das die futures map() -Funktion ist. Es ist eine gute Idee, die Arbeit unabhängig voneinander zu erledigen, so dass Sie als Gegenleistung für die pro-invokativen Interprozess-Overheads einen echten Knall für das Geld bekommen.
  2. imap_unordered() gibt keine Garantie für die Reihenfolge aus, in der die Ergebnisse zurückgegeben werden. Dies gibt der Implementierung mehr Freiheit, die Arbeit so effizient wie möglich zu gestalten. Und wir kümmern uns nicht um die Reihenfolge hier, also ist das in Ordnung.
  3. Beachten Sie, dass die Schleife die (particle_id, fitness) -Ergebnisse abruft und die Particle -Instanzen entsprechend ändert. Vielleicht macht dein reales .getfitness andere Mutationen in Particle Instanzen - kann nicht raten. Unabhängig davon, das Hauptprogramm wird nie irgendwelche Mutationen in den Arbeitern "durch Magie" sehen - Sie müssen dafür ausdrücklich sorgen. Im Limit könnten Sie stattdessen (particle_id, particle_instance) -Paare und ersetzen die Particle -Instanzen im Hauptprogramm zurückgeben. Dann würden sie alle Mutationen widerspiegeln, die in Arbeiterprozessen gemacht wurden.

Viel Spaß: -)

Futures ganz nach unten

Es stellte sich heraus, dass es sehr einfach war, multiprocessing zu ersetzen. Hier sind die Änderungen. Dies ersetzt auch (wie bereits erwähnt) die ursprünglichen Particle Instanzen, um alle Mutationen zu erfassen. Es gibt hier allerdings einen Kompromiss: Bei der Beizung einer Instanz werden "viel mehr" Bytes benötigt als bei einem einzelnen "Fitness" -Ergebnis. Mehr Netzwerkverkehr Wähle dein Gift aus; -)

Zum Zurückgeben der mutierten Instanz muss nur die letzte Zeile von thread_worker() ersetzt werden, etwa so:

%Vor%

Ersetzen Sie dann alle " main " Blöcke durch folgende:

%Vor%

Der Code ist dem multiprocessor dance sehr ähnlich. Persönlich würde ich die multiprocessing Version verwenden, weil imap_unordered wertvoll ist. Das ist ein Problem mit vereinfachten Schnittstellen: Sie kaufen oft Einfachheit auf Kosten von versteckten nützlichen Möglichkeiten.

    
Tim Peters 25.11.2013, 01:01
quelle
4

Erstens, können Sie sicher sein, dass Sie mehrere Threads ausführen, während Sie alle Ihre Kerne mit Prozessen laden? Wenn es CPU-gebunden ist, kaum ja . Zumindest einige Tests müssen gemacht werden.

Wenn das Hinzufügen von Threads zu Ihrer Leistung beiträgt, lautet die nächste Frage, ob Sie mit manuellem Lastenausgleich oder automatisch eine bessere Leistung erzielen können. Von Hand gemacht, meine ich sorgfältige Workload-Partitionierung in Chunks mit ähnlicher Rechenkomplexität und die Einführung eines neuen Task-Prozessors pro Chunk, Ihrer ursprünglichen, aber zweifelhaften Lösung. Durch automatisches Erstellen eines Pools von Prozessen / Threads und Kommunikation in der Arbeitswarteschlange für neue Aufgaben, die Sie anstreben. Der erste Ansatz ist meines Erachtens ein Apache-Hadoop-Paradigma, der zweite wird von Arbeitswarteschlangenprozessoren wie Sellery implementiert. Der erste Ansatz kann unter einigen Aufgaben-Chunks leiden, die langsamer sind und ausgeführt werden, während andere abgeschlossen sind, zweitens werden Kommutierungs- und Warte-auf-Task-Overheads hinzugefügt, und dies ist der zweite Punkt der Leistungstests, die durchgeführt werden müssen.

Wenn Sie eine statische Sammlung von Prozessen mit Multithreads innerhalb von AFAIK haben möchten, können Sie sie nicht mit concurrent.futures wie gewünscht ändern und müssen sie etwas modifizieren. Ich weiß nicht, ob es für diese Aufgabe existierende Lösungen gibt, aber da concurrent eine reine Python-Lösung (ohne C-Code) ist, kann es leicht gemacht werden. Der Workprozessor ist in der _adjust_process_count Routine von% co_de definiert % class, und Unterklassen und Überschreiben mit Multi-Threading-Ansatz ist eher straightward, müssen Sie nur Ihre benutzerdefinierte ProcessPoolExecutor , basierend auf _process_worker

Original concurrent.features.thread als Referenz:

%Vor%     
alko 15.11.2013 06:56
quelle