Das Producer-Modul meiner Anwendung wird von Benutzern ausgeführt, die Arbeiten an einem kleinen Cluster vornehmen möchten. Er sendet die Abonnements im JSON-Format über den RabbitMQ-Nachrichtenbroker.
Ich habe mehrere Strategien ausprobiert, und das Beste bisher ist das Folgende, das immer noch nicht voll funktionsfähig ist:
Jeder Clustercomputer führt ein Consumer-Modul aus, das sich bei der AMQP-Warteschlange anmeldet und einen prefetch_count absetzt, um dem Broker mitzuteilen, wie viele Aufgaben gleichzeitig ausgeführt werden können.
Ich konnte es mit SelectConnection aus der Pika AMQP-Bibliothek arbeiten lassen. Sowohl der Verbraucher als auch der Produzent starten zwei Kanäle, von denen einer mit jeder Warteschlange verbunden ist. Der Produzent sendet Anfragen auf Kanal [A] und wartet auf Antworten in Kanal [B], und der Verbraucher wartet auf Anfragen auf Kanal [A] und sendet Antworten auf Kanal [B]. Es scheint jedoch, dass, wenn der Verbraucher den Rückruf ausführt, der die Antwort berechnet, er blockiert, so dass ich nur eine Aufgabe zu jedem Zeitpunkt bei jedem Verbraucher ausgeführt habe.
Was ich am Ende brauche:
Einschränkungen:
AKTUALISIEREN
Ich habe ein bisschen weiter studiert und mein eigentliches Problem scheint zu sein, dass ich eine einfache Funktion als Callback für die pika's SelectConnection.channel.basic_consume () -Funktion verwende. Meine letzte (nicht implementierte) Idee besteht darin, eine Threading-Funktion anstelle einer regulären Threading-Funktion zu übergeben, damit der Rückruf nicht blockiert wird und der Verbraucher weiterhören kann.
Wie Sie bemerkt haben, blockiert Ihr Prozess, wenn ein Callback ausgeführt wird. Je nachdem, was Ihr Callback tut, gibt es mehrere Möglichkeiten, damit umzugehen.
Wenn Ihr Callback IO-gebunden ist (viel Netzwerk- oder Festplatten-IO), können Sie entweder Threads oder eine Greenlet-basierte Lösung verwenden, z. B. gevent , eventlet oder Gewächshaus . Denken Sie jedoch daran, dass Python durch die GIL (Global Interpreter Lock) begrenzt ist, was bedeutet, dass nur ein Stück Python-Code jemals in einem einzigen Python-Prozess ausgeführt wird. Das heißt, wenn Sie viel mit Python-Code rechnen, sind diese Lösungen wahrscheinlich nicht viel schneller als das, was Sie bereits haben.
Eine weitere Option wäre, Ihren Consumer als mehrere Prozesse zu implementieren, indem Sie Multiprozessing verwenden. Ich habe herausgefunden, dass Multiprocessing sehr nützlich ist, wenn man parallel arbeitet. Sie können dies implementieren, indem Sie entweder eine Warteschlange mit dem übergeordneten Element verwenden Prozess, der der Verbraucher ist und die Arbeit an seine Kinder ausgibt, oder indem er einfach mehrere Prozesse startet, die jeder für sich selbst verbraucht. Ich würde vorschlagen, wenn Ihre Anwendung nicht sehr parallel ist (1000 Arbeiter), einfach mehrere Arbeiter zu starten, von denen jeder von seiner eigenen Verbindung konsumiert. Auf diese Weise können Sie die Bestätigungsfunktion von AMQP verwenden. Wenn ein Kunde stirbt, während er noch eine Aufgabe bearbeitet, wird die Nachricht automatisch an die Warteschlange zurückgesendet und von einem anderen Mitarbeiter abgeholt, anstatt die Anfrage einfach zu verlieren.
Eine letzte Option, wenn Sie den Erzeuger kontrollieren und es auch in Python geschrieben ist, ist die Verwendung einer Aufgabenbibliothek wie Sellerie um die Aufgaben / Queue-Abläufe für Sie zu abstrahieren. Ich habe Sellerie für mehrere große Projekte verwendet und finde es sehr gut geschrieben. Es wird auch die verschiedenen Consumer-Probleme für Sie mit der entsprechenden Konfiguration behandeln.
Dein Setup klingt gut für mich. Und Sie haben Recht: Sie können den Callback einfach so einrichten, dass er einen Thread startet und ihn mit einem separaten Callback verkettet, wenn der Thread die Queue der Antwort über Kanal B beendet hat.
Grundsätzlich sollten Ihre Kunden eine eigene Warteschlange haben (Größe von N, Parallelität, die sie unterstützen). Wenn eine Anfrage über Kanal A eingeht, sollte sie das Ergebnis in der Warteschlange zwischen dem Hauptthread mit Pika und den Worker-Threads im Thread-Pool speichern. Sobald es in die Warteschlange gestellt wird, sollte pika mit ACK antworten und Ihr Worker-Thread würde aufwachen und mit der Verarbeitung beginnen.
Sobald der Mitarbeiter mit seiner Arbeit fertig ist, wird das Ergebnis in eine separate Ergebniswarteschlange eingereiht und ein Rückruf an den Hauptthread gesendet, um es an den Verbraucher zurückzusenden.
Sie sollten darauf achten, dass sich die Worker-Threads nicht gegenseitig stören, wenn sie gemeinsam genutzte Ressourcen verwenden. Dies ist jedoch ein separates Thema.
Unerfahren im Threading, würde mein Setup mehrere Consumer-Prozesse (deren Anzahl im Grunde Ihre Prefetch-Anzahl ist) ausführen. Jeder würde sich mit den beiden Warteschlangen verbinden, und sie würden die Jobs glücklich verarbeiten, ohne die Existenz des anderen zu kennen.
Tags und Links python design-patterns rabbitmq amqp pika