Kann das Eventlet eine AMQP-Verbindung mit Nachrichten verwalten, die asynchron ein- und ausgehen?

8

Tatsächliches Design:

Für diejenigen, die auf diese Frage zurückkommen, hat mich die hilfreiche Antwort auf ein funktionierendes Design gebracht, das gut läuft. Drei Einsichten waren der Schlüssel:

  1. Eventlet ist eine sehr sichere Umgebung - wenn zwei Greenlets versuchen recv() oder beide send() gleichzeitig aus dem gleichen Socket auszuprobieren, dann wird das zweite Greenlet mit Ausnahme von Eventlet elegant beendet. Dies ist großartig und bedeutet, dass einfache Ausnahmen und nicht unmöglich zu reproduzierende Daten Interleaving-Fehler ergeben, wenn amqplib "Grüns" schlecht.
  2. Die amqplib Methoden fallen grob in zwei Gruppen: wait() Schleifen innerhalb von recv() bis eine AMQP Nachricht zusammengefügt wird, während andere Methoden send() Nachrichten zurückgeben und nicht ihre eigene recv() versuchen. Das ist erstaunlich viel Glück, denn die amqplib Autoren hatten keine Ahnung, dass jemand versuchen würde, ihre Bibliothek zu "grünen"! Dies bedeutet, dass das Senden von Nachrichten nicht nur vor dem Rückruf von wait() sicher ist, sondern dass Nachrichten auch sicher von anderen Greenlets gesendet werden können, die sich vollständig außerhalb der Kontrolle der Schleife wait() befinden. Diese sicheren Methoden - die von jedem Greenlet aufgerufen werden können, nicht nur vom wait() Callback - sind:
    1. basic_ack
    2. basic_consume mit nowait=True
    3. basic_publish
    4. basic_recover
    5. basic_reject
    6. exchange_declare mit nowait=True
    7. exchange_delete mit nowait=True
    8. queue_bind mit nowait=True
    9. queue_unbind mit nowait=True
    10. queue_declare mit nowait=True
    11. queue_delete mit nowait=True
    12. queue_purge mit nowait=True
  3. Semaphore können als Sperren verwendet werden: initialisiere den Semaphor mit dem count 1 und dann mit acquire() und release() zum Sperren und Entsperren. Alle meine asynchronen Greenlets, die Nachrichten schreiben möchten, können eine solche Sperre verwenden, um zu vermeiden, dass ihre separaten send() -Aufrufe verschachteln und das AMQP-Protokoll ruinieren.

Also sieht mein Code ungefähr so ​​aus:

%Vor%

Viel Spaß!

Ursprüngliche Frage:

Stellen Sie sich Hunderte von AMQP-Nachrichten vor, die jede Minute in einer kleinen Python Eventlet -Anwendung ankommen, die jeweils verarbeitet und beantwortet werden müssen - wo der CPU-Overhead ist der Verarbeitung wird minimal sein, aber möglicherweise warten auf Antworten von anderen Diensten und Sockets.

Damit beispielsweise 100 Nachrichten gleichzeitig verarbeitet werden, könnte ich natürlich 100 separate TCP-Verbindungen zu RabbitMQ aufbauen und einen Mitarbeiter für jede Verbindung haben, die einzelne Nachrichten in Lock-Step empfängt, verarbeitet und beantwortet. Aber um TCP-Verbindungen zu erhalten, würde ich am liebsten nur eine AMQP-Verbindung erstellen, RabbitMQ erlauben, Nachrichten in voller Geschwindigkeit über die Pipe zu streamen, diese Aufgaben an die Arbeiter weiterzugeben und die Antworten zurückzuschicken, wenn jeder Arbeiter fertig ist:

%Vor%

Beachten Sie Folgendes:

  • Eine Eventlet-Warteschlange kann eingehende Arbeit unter den Arbeitern elegant verteilen, wenn sie für mehr Arbeit verfügbar sind. li>
  • Die Flusssteuerung von RabbitMQ ist möglicherweise sogar möglich: Ich kann Nachrichten nur bestätigen, bis meine Mitarbeiter alle beschäftigt sind, und dann warten, bis weitere ACKs gesendet werden, bis die Warteschlange leer wird.
  • Die Arbeit wird mit ziemlicher Sicherheit out-of-order abgeschlossen: Eine Anfrage wird möglicherweise schnell abgeschlossen, während eine andere, die früher eintraf, viel länger dauert; und einige Anfragen werden vielleicht nie abgeschlossen; Die Arbeiter werden die Antworten in einer unvorhersehbaren und asynchronen Reihenfolge zurückgeben.

Ich hatte geplant, dies mit Eventlet und py-amqplib zu schreiben, nachdem ich diesen attraktiven Blogbeitrag darüber gesehen hatte leicht, dass die AMQP-Bibliothek in das Eventlet-Verarbeitungsmodell übernommen werden konnte:

Ссылка

Mein Problem ist, dass ich, nachdem ich die Dokumentation für beide Bibliotheken gelesen habe, den amqplib-Quellcode und einen Großteil des Eventlet-Quellcodes, nicht herausfinden kann, wie ich dem Eventlet, das die AMQP-Verbindung besitzt, das Eventlet namens% co_de beibringen kann % im Blog-Post - to auch wachen auf, wenn ein Worker seine Arbeit beendet und eine Antwort generiert. Die Methode connect_to_host() in amqplib kann nur durch Aktivitäten am AMQP-Socket aktiviert werden. Obwohl es sich anfühlt, als ob ich in der Lage sein sollte, dass die Arbeiter ihre Antworten in eine Warteschlange schreiben und das wait() eventlet aufwachen entweder wenn eine neue eingehende Nachricht eintrifft oder Wenn ein Mitarbeiter bereit ist, eine Antwort zu senden, kann ich keine Möglichkeit finden, ein Eventlet zu sagen: "weck mich auf, wenn entweder von diesen Dingen passiert."

Es kam mir in den Sinn, dass die Arbeiter versuchen könnten, das AMQP-Verbindungsobjekt - oder sogar den rohen Sockel - zu kommandieren und ihre eigenen Nachrichten über TCP zu schreiben; aber es scheint, als ob Sperren erforderlich wären, um zu verhindern, dass die ausgehenden Worker-Nachrichten miteinander oder mit ACK-Nachrichten verschachtelt werden, die vom Haupt-Listener-Eventlet geschrieben wurden, und ich kann auch nicht herausfinden, wo Sperren in Eventlet verfügbar sind.

All dies lässt mich fast sicher sein, dass ich versuche, dieses Problem irgendwie genau rückwärts anzugehen. Ist ein Problem wie dieses - eine sichere Verbindung zwischen einem Listener-Dispatcher und vielen Arbeitern - einfach nicht einem Coroutinemodell zuzuordnen und erfordert eine vollwertige Async-Bibliothek? (In welchem ​​Fall: Gibt es einen, den Sie für dieses Problem empfehlen würden, und wie würde das Multiplexing zwischen eingehenden Nachrichten und ausgehenden Arbeiterantworten stattfinden? Ich habe heute keine saubere Lösung gefunden, die Kombinationen wie Pika + Ioloop versucht - obwohl ich gerade einen anderen gesehen habe Bibliothek, stormed_amqp , das könnte besser sein als Pika.) Oder muss ich wirklich auf echtes Leben zurückgreifen Python-Threads, wenn ich sauberen und wartbaren Code möchte, der dieses Modell ausführen kann? Ich bin offen für alle Optionen.

Danke für jede Hilfe oder Ideen! Ich denke immer wieder, dass ich die ganze Nebenläufigkeit-in-Python-Sache ziemlich niedergeschlagen habe, dann lerne ich noch einmal, dass ich es nicht tue. :) Und ich hoffe, dir hat die ASCII-Grafik oben jedenfalls gefallen.

    
Brandon Rhodes 02.11.2011, 00:46
quelle

1 Antwort

5

Nachdem ich Ihren Beitrag gelesen und mit gevent eine ähnliche Bibliothek wie eventlet bearbeitet habe, sind mir einige Dinge klar geworden, weil ich gerade ein ähnliches Problem gelöst habe

Im Allgemeinen gibt es keine Notwendigkeit zum Sperren, da es immer nur ein Eventlet oder Greenlet gleichzeitig gibt, solange keiner von ihnen blockiert, scheint alles gleichzeitig zu laufen. ABER du willst keine Daten senden eine Steckdose, während ein anderes Greenlet sendet. Du hast Recht und brauchst in der Tat eine Sperre dafür.

Wenn ich solche Fragen in der Dokumentation habe, reicht das nicht aus ... geh in die Quelle! Es ist sowieso OpenSource, Sie lernen eine Menge mehr auf andere Menschen Code suchen.

Hier ist ein vereinfachter Beispielcode, der die Dinge für Sie klären könnte.

in Ihrem Dispatcher haben 2 Warteschlangen

%Vor%

lassen Arbeiter ihr Ergebnis in die Serverwarteschlange stellen.

der Sende- und Empfangscode

%Vor%

in der Sendefunktion Ihres Verbindungscodes

%Vor%     
Stephan 02.11.2011, 10:02
quelle

Tags und Links