Tatsächliches Design:
Für diejenigen, die auf diese Frage zurückkommen, hat mich die hilfreiche Antwort auf ein funktionierendes Design gebracht, das gut läuft. Drei Einsichten waren der Schlüssel:
recv()
oder beide send()
gleichzeitig aus dem gleichen Socket auszuprobieren, dann wird das zweite Greenlet mit Ausnahme von Eventlet elegant beendet. Dies ist großartig und bedeutet, dass einfache Ausnahmen und nicht unmöglich zu reproduzierende Daten Interleaving-Fehler ergeben, wenn amqplib
"Grüns" schlecht. amqplib
Methoden fallen grob in zwei Gruppen: wait()
Schleifen innerhalb von recv()
bis eine AMQP Nachricht zusammengefügt wird, während andere Methoden send()
Nachrichten zurückgeben und nicht ihre eigene recv()
versuchen. Das ist erstaunlich viel Glück, denn die amqplib
Autoren hatten keine Ahnung, dass jemand versuchen würde, ihre Bibliothek zu "grünen"! Dies bedeutet, dass das Senden von Nachrichten nicht nur vor dem Rückruf von wait()
sicher ist, sondern dass Nachrichten auch sicher von anderen Greenlets gesendet werden können, die sich vollständig außerhalb der Kontrolle der Schleife wait()
befinden. Diese sicheren Methoden - die von jedem Greenlet aufgerufen werden können, nicht nur vom wait()
Callback - sind:
basic_ack
basic_consume
mit nowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
mit nowait=True
exchange_delete
mit nowait=True
queue_bind
mit nowait=True
queue_unbind
mit nowait=True
queue_declare
mit nowait=True
queue_delete
mit nowait=True
queue_purge
mit nowait=True
1
und dann mit acquire()
und release()
zum Sperren und Entsperren. Alle meine asynchronen Greenlets, die Nachrichten schreiben möchten, können eine solche Sperre verwenden, um zu vermeiden, dass ihre separaten send()
-Aufrufe verschachteln und das AMQP-Protokoll ruinieren. Also sieht mein Code ungefähr so aus:
%Vor%Viel Spaß!
Ursprüngliche Frage:
Stellen Sie sich Hunderte von AMQP-Nachrichten vor, die jede Minute in einer kleinen Python Eventlet -Anwendung ankommen, die jeweils verarbeitet und beantwortet werden müssen - wo der CPU-Overhead ist der Verarbeitung wird minimal sein, aber möglicherweise warten auf Antworten von anderen Diensten und Sockets.
Damit beispielsweise 100 Nachrichten gleichzeitig verarbeitet werden, könnte ich natürlich 100 separate TCP-Verbindungen zu RabbitMQ aufbauen und einen Mitarbeiter für jede Verbindung haben, die einzelne Nachrichten in Lock-Step empfängt, verarbeitet und beantwortet. Aber um TCP-Verbindungen zu erhalten, würde ich am liebsten nur eine AMQP-Verbindung erstellen, RabbitMQ erlauben, Nachrichten in voller Geschwindigkeit über die Pipe zu streamen, diese Aufgaben an die Arbeiter weiterzugeben und die Antworten zurückzuschicken, wenn jeder Arbeiter fertig ist:
%Vor%Beachten Sie Folgendes:
Ich hatte geplant, dies mit Eventlet und py-amqplib zu schreiben, nachdem ich diesen attraktiven Blogbeitrag darüber gesehen hatte leicht, dass die AMQP-Bibliothek in das Eventlet-Verarbeitungsmodell übernommen werden konnte:
Mein Problem ist, dass ich, nachdem ich die Dokumentation für beide Bibliotheken gelesen habe, den amqplib-Quellcode und einen Großteil des Eventlet-Quellcodes, nicht herausfinden kann, wie ich dem Eventlet, das die AMQP-Verbindung besitzt, das Eventlet namens% co_de beibringen kann % im Blog-Post - to auch wachen auf, wenn ein Worker seine Arbeit beendet und eine Antwort generiert. Die Methode connect_to_host()
in amqplib kann nur durch Aktivitäten am AMQP-Socket aktiviert werden. Obwohl es sich anfühlt, als ob ich in der Lage sein sollte, dass die Arbeiter ihre Antworten in eine Warteschlange schreiben und das wait()
eventlet aufwachen entweder wenn eine neue eingehende Nachricht eintrifft oder Wenn ein Mitarbeiter bereit ist, eine Antwort zu senden, kann ich keine Möglichkeit finden, ein Eventlet zu sagen: "weck mich auf, wenn entweder von diesen Dingen passiert."
Es kam mir in den Sinn, dass die Arbeiter versuchen könnten, das AMQP-Verbindungsobjekt - oder sogar den rohen Sockel - zu kommandieren und ihre eigenen Nachrichten über TCP zu schreiben; aber es scheint, als ob Sperren erforderlich wären, um zu verhindern, dass die ausgehenden Worker-Nachrichten miteinander oder mit ACK-Nachrichten verschachtelt werden, die vom Haupt-Listener-Eventlet geschrieben wurden, und ich kann auch nicht herausfinden, wo Sperren in Eventlet verfügbar sind.
All dies lässt mich fast sicher sein, dass ich versuche, dieses Problem irgendwie genau rückwärts anzugehen. Ist ein Problem wie dieses - eine sichere Verbindung zwischen einem Listener-Dispatcher und vielen Arbeitern - einfach nicht einem Coroutinemodell zuzuordnen und erfordert eine vollwertige Async-Bibliothek? (In welchem Fall: Gibt es einen, den Sie für dieses Problem empfehlen würden, und wie würde das Multiplexing zwischen eingehenden Nachrichten und ausgehenden Arbeiterantworten stattfinden? Ich habe heute keine saubere Lösung gefunden, die Kombinationen wie Pika + Ioloop versucht - obwohl ich gerade einen anderen gesehen habe Bibliothek, stormed_amqp , das könnte besser sein als Pika.) Oder muss ich wirklich auf echtes Leben zurückgreifen Python-Threads, wenn ich sauberen und wartbaren Code möchte, der dieses Modell ausführen kann? Ich bin offen für alle Optionen.
Danke für jede Hilfe oder Ideen! Ich denke immer wieder, dass ich die ganze Nebenläufigkeit-in-Python-Sache ziemlich niedergeschlagen habe, dann lerne ich noch einmal, dass ich es nicht tue. :) Und ich hoffe, dir hat die ASCII-Grafik oben jedenfalls gefallen.
Nachdem ich Ihren Beitrag gelesen und mit gevent eine ähnliche Bibliothek wie eventlet bearbeitet habe, sind mir einige Dinge klar geworden, weil ich gerade ein ähnliches Problem gelöst habe
Im Allgemeinen gibt es keine Notwendigkeit zum Sperren, da es immer nur ein Eventlet oder Greenlet gleichzeitig gibt, solange keiner von ihnen blockiert, scheint alles gleichzeitig zu laufen. ABER du willst keine Daten senden eine Steckdose, während ein anderes Greenlet sendet. Du hast Recht und brauchst in der Tat eine Sperre dafür.
Wenn ich solche Fragen in der Dokumentation habe, reicht das nicht aus ... geh in die Quelle! Es ist sowieso OpenSource, Sie lernen eine Menge mehr auf andere Menschen Code suchen.
Hier ist ein vereinfachter Beispielcode, der die Dinge für Sie klären könnte.
in Ihrem Dispatcher haben 2 Warteschlangen
%Vor%lassen Arbeiter ihr Ergebnis in die Serverwarteschlange stellen.
der Sende- und Empfangscode
%Vor%in der Sendefunktion Ihres Verbindungscodes
%Vor%Tags und Links python asynchronous amqp eventlet