Abonniere eine Warteschlange, empfange 1 Nachricht und melde sich dann ab

8

Ich habe ein Szenario, in dem ich Jobs extrem schnell verteilen und bearbeiten muss. Ich werde ungefähr 45 Jobs in der Warteschlange schnell haben und ich kann ungefähr 20 gleichzeitig bearbeiten (5 Maschinen, 4 Kerne). Jeder Job benötigt eine variable Zeit, und um die Angelegenheit zu komplizieren, ist die Garbage Collection ein Problem. Daher muss ich in der Lage sein, einen Consumer offline zur Garbage Collection zu nehmen.

Momentan habe ich alles mit Pop (jeder Verbraucher knallt alle 5ms). Dies scheint unerwünscht zu sein, da es zu rabbitmq zu 600 Pop-Anfragen pro Sekunde führt.

Ich würde es lieben, wenn es einen pop-Befehl geben würde, der sich wie subscribe verhält, aber für nur eine Nachricht. (Der Prozess würde blockieren und auf Eingabe von der rabbitMQ-Verbindung warten, über etwas, das Kernel.select ähnelt)

Ich habe versucht, das AMQP-Juwel dazu zu bringen, so etwas zu tun, aber es funktioniert nicht: Ich kann mich anscheinend nicht abmelden, bis die Warteschlange leer ist und keine weiteren Nachrichten an den Verbraucher gesendet werden. Andere Methoden der Abmeldung befürchte ich werde Nachrichten verlieren.

consume_1.rb:

%Vor%

consumer_many.rb:

%Vor%

producer.rb:

%Vor%

Ich starte consume_many.rb und producer.rb. Nachrichten werden wie erwartet fließen.

Wenn ich consume_1.rb starte, erhält es (wie erwartet) jede zweite Nachricht. Aber es kündigt NIEMALS ab, weil es nie alle seine Nachrichten verarbeitet ... so weiter geht es.

Wie bekomme ich consume_1.rb, um die Warteschlange zu abonnieren, eine einzelne Nachricht zu erhalten und sich dann selbst aus dem Load-Balancer-Ring herauszuholen, damit es seine Arbeit erledigen kann, ohne zusätzliche ausstehende Jobs zu verlieren, die sich möglicherweise in der Warteschlange befinden Warteschlange und würde sonst für den Prozess gesendet werden?

Tim

    
Tim Harper 15.04.2011, 23:13
quelle

2 Antworten

13

Dies ist ein einfaches, aber sehr schlecht dokumentiertes Feature des AMQP-Edelsteins. Was Sie brauchen, ist dies:

In Ihrem Verbraucher:

%Vor%

Und dann mit Ihrem Subskriptionsblock:

%Vor%

Was dies bedeutet, teilt es RabbitMQ mit, dass es nur Ihre Consumer 1-Nachricht sendet und keine weitere Nachricht sendet, bis Sie ack auf der Warteschlangenkopfzeile aufrufen, nachdem an einer lang laufenden Task für eine Weile.

Ich muss hier vielleicht korrigiert werden, aber ich glaube, ein direct exchange wäre für diese Aufgabe besser geeignet.

    
Ivan 16.04.2011, 03:19
quelle
1

Mit der Umgebung, die ich habe,

RabbitMQ Version: 3.3.3

amqp gem Version: 1.5.0

Die Lösung von Ivan führte immer noch dazu, dass alle Nachrichten aus der Warteschlange abgerufen wurden.

Stattdessen kann die Anzahl nicht quittierter Nachrichten durch das Abonnieren einer Warteschlange durch Festlegen der QoS eines Kanals begrenzt werden.

Gemäß dem API-Dokument von AMQP :: Channel,

%Vor%

Ein Hinweis für die Methode, dass prefetch_size veraltet ist, wenn Sie RabbitMQ-Server nach Version 2.3.6 ausführen.

%Vor%

Hoffe, die Lösung hilft jemandem aus.

Prost.

    
Jack Wu 27.03.2015 20:44
quelle

Tags und Links