Dynamische Erstellung von Warteschlangen pro Prozess in Python-Multiprozessing

8

Ich möchte mehrere Process es dynamisch erstellen, wobei jede Instanz eine Warteschlange für eingehende Nachrichten von anderen Instanzen hat und jede Instanz auch neue Instanzen erstellen kann. Wir enden also mit einem Netzwerk von Prozessen, die sich gegenseitig senden. Jede Instanz darf an jeden anderen senden.

Der Code unter würde tun, was ich möchte: Er verwendet eine Manager.dict() , um die Warteschlangen zu speichern, stellt sicher, dass Updates weitergegeben werden und eine Lock() , um den Schreibzugriff auf die Warteschlangen zu schützen. Beim Hinzufügen einer neuen Warteschlange wird jedoch "RuntimeError: Queue objects should only be shared between processes through inheritance" ausgegeben.

Das Problem ist, dass wir beim Starten nicht wissen, wie viele Warteschlangen letztendlich benötigt werden, also müssen wir sie dynamisch erstellen. Aber da wir die Warteschlangen nur während der Bauzeit teilen können, weiß ich nicht, wie das geht.

Ich weiß, dass eine Möglichkeit wäre, queues zu einer globalen Variable zu machen anstatt zu einer verwalteten, die an __init__ weitergegeben wurde: Das Problem, so wie ich es verstehe, ist, dass Additionen zu der Variable queues würden nicht an andere Prozesse weitergegeben werden.

BEARBEITEN Ich arbeite an evolutionären Algorithmen. EAs sind eine Art von maschinellem Lernen. Ein EA simuliert eine "Population", die sich durch Überleben des Stärkeren, Crossover und Mutation entwickelt. In parallelen EAs, wie hier, haben wir auch Migrationen zwischen Populationen, die der Interprozesskommunikation entsprechen. Inseln können auch neue Inseln hervorbringen, und deshalb brauchen wir eine Möglichkeit, Nachrichten zwischen dynamisch erzeugten Prozessen zu senden.

%Vor%     
jmmcd 16.08.2011, 02:04
quelle

3 Antworten

3

Ich bin mir nicht ganz sicher, was Ihr Anwendungsfall tatsächlich hier ist. Vielleicht, wenn Sie ein wenig mehr erklären, warum Sie möchten, dass jeder Prozess dynamisch ein Kind mit einer verbundenen Warteschlange hervorbringt, wird es ein bisschen klarer, was die richtige Lösung in dieser Situation wäre.

Wie auch immer, mit der Frage scheint es, dass es nicht wirklich eine gute Möglichkeit gibt, Pipes oder Queues mit Multiprocessing dynamisch zu erstellen.

Ich denke, wenn du bereit bist Threads in jedem deiner Prozesse zu erzeugen, kannst du multiprocessing.connection.Listener/Client verwenden, um hin und her zu kommunizieren. Anstatt Threads zu erzeugen, habe ich einen Ansatz gewählt, bei dem Netzwerk-Sockets verwendet werden, und die Kommunikation zwischen Threads aktiviert.

Dynamische Prozess-Launch- und Netzwerk-Sockets können immer noch flockig sein, je nachdem, wie multiprocessing Ihre Dateideskriptoren bereinigt, wenn Sie einen neuen Prozess erstellen / abzweigen, und Ihre Lösung wird wahrscheinlich einfacher auf * nix-Derivaten funktionieren. Wenn Sie Bedenken hinsichtlich des Socket-Overheads haben, könnten Sie Unix-Domain-Sockets verwenden, um ein wenig leichter zu sein, auf Kosten der zusätzlichen Komplexität, wenn Knoten auf mehreren Arbeitsmaschinen ausgeführt werden.

Wie auch immer, hier ist ein Beispiel, das Netzwerk-Sockets und eine globale Prozessliste verwendet, um dies zu erreichen, da ich keinen guten Weg gefunden habe, multiprocessing zu tun.

%Vor%

Mit viel Liebe zum Polieren und Testen könnte dies eine logische Erweiterung zu multiprocessing.Process und / oder multiprocessing.Pool sein, da dies etwas zu sein scheint, was Leute benutzen würden, wenn es in der Standardbibliothek verfügbar wäre. Es kann auch sinnvoll sein, eine DynamicQueue-Klasse zu erstellen, die Sockets verwendet, um für andere Warteschlangen erkennbar zu sein.

Wie auch immer, hoffe es hilft. Bitte aktualisieren Sie, wenn Sie einen besseren Weg finden, dies zu tun.

    
stderr 16.08.2011, 12:36
quelle
3

Dieser Code basiert auf der angenommenen Antwort. Es ist in Python 3, da OSX Snow Leopard bei einigen Verwendungen von Multiprocessing-Stuff defaultiert.

%Vor%     
jmmcd 16.08.2011 21:06
quelle
1

Der Socketserver der Standardbibliothek wird bereitgestellt, um zu vermeiden, dass select () manuell programmiert wird. In dieser Version starten wir einen Socketserver in einem separaten Thread, so dass jeder Prozess die Berechnung in seiner Hauptschleife durchführen kann.

%Vor%     
jmmcd 19.08.2011 13:46
quelle

Tags und Links