Worker-Pools und Multi-Tenant-Warteschlangen mit RabbitMQ

10

Ich arbeite an einer Web-Anwendung, die eine mandantenbasierte cloudbasierte Anwendung ist (viele Clients, jede mit ihrer eigenen "Umgebung", aber alle auf gemeinsam genutzten Hardware-Sets), und wir stellen die Möglichkeit für einen Benutzer vor um die Arbeit für die spätere Verarbeitung aufzuladen. Die Arten der Batch-Arbeit ist wirklich nicht wichtig, es ist nur von ausreichender Menge, dass es ohne eine Arbeits-Warteschlange nicht wirklich praktisch ist. Wir haben RabbitMQ als unser zugrunde liegendes Warteschlangen-Framework ausgewählt.

Da es sich um eine mandantenfähige Anwendung handelt, möchten Clients nicht unbedingt lange Warteschlangenprozesszeiten für einen anderen Client verursachen. Eine Idee, die wir uns vorgestellt haben, ist das Erstellen einer Warteschlange pro Client Basis und mit einem gemeinsamen Arbeitspool auf alle unsere Client-Warteschlangen hingewiesen. Das Problem ist, dass die Mitarbeiter, soweit ich das beurteilen kann, direkt an eine bestimmte Warteschlange gebunden sind, nicht an einen Austausch. In unserer idealen Welt werden unsere Client-Warteschlangen weiterhin verarbeitet, ohne dass ein Client einen anderen blockiert, aus einem gemeinsamen Arbeitspool, den wir bei Bedarf erweitern oder verkleinern können, indem wir mehr Arbeiter starten oder untätige schließen. Mitarbeiter an eine bestimmte Warteschlange gebunden zu haben, verhindert uns dies in einem praktischen Sinn, da wir häufig viele Mitarbeiter haben, die sich ohne Aktivität in einer Warteschlange aufhalten.

Ist das relativ einfach, um das zu erreichen? Ich bin ziemlich neu bei RabbitMQ und konnte nicht wirklich erreichen, wonach wir suchen. Wir möchten auch keine sehr komplexe Multithread-Anwendung schreiben müssen, das ist eine Zeitverschwendung in der Entwicklungs- und Testzeit, die wir uns wahrscheinlich nicht leisten können. Unser Stack ist Windows / .Net / C # -basiert, wenn das der Fall ist, aber ich denke nicht, dass dies eine wichtige Rolle in der vorliegenden Frage spielen sollte.

    
bakasan 28.11.2011, 20:24
quelle

4 Antworten

2

Sie können sich die Implementierung der Prioritätswarteschlange anschauen (die nicht implementiert wurde, als diese Frage ursprünglich gestellt wurde): Ссылка

Wenn das für Sie nicht funktioniert, können Sie einige andere Hacks versuchen, um zu erreichen, was Sie wollen (was mit älteren Versionen von RabbitMQ funktionieren sollte):

Sie könnten 100 Warteschlangen an einen Themenaustausch gebunden haben und den Routing-Schlüssel auf einen Hash der Benutzer-ID% 100 setzen, dh jede Aufgabe hat einen Schlüssel zwischen 1 und 100 und Aufgaben für denselben Benutzer haben denselben Schlüssel . Jede Warteschlange ist mit einem eindeutigen Muster zwischen 1 und 100 gebunden. Jetzt haben Sie eine Flotte von Arbeitern, die mit einer zufälligen Warteschlangennummer beginnen und dann diese Warteschlangennummer nach jedem Auftrag erhöhen, wiederum% 100, um nach Warteschlange 100 wieder in Warteschlange 1 zu wechseln.

Jetzt kann Ihre Worker-Flotte bis zu 100 einzelne Benutzer parallel bearbeiten, oder alle Mitarbeiter können sich auf einen einzelnen Benutzer konzentrieren, wenn keine anderen Aufgaben zu erledigen sind. Wenn die Mitarbeiter alle 100 Warteschlangen zwischen den einzelnen Jobs durchlaufen müssen, haben Sie in dem Szenario, in dem nur ein einziger Benutzer viele Jobs in einer einzigen Warteschlange hat, einen Overhead zwischen den einzelnen Jobs. Eine kleinere Anzahl von Warteschlangen ist eine Möglichkeit, damit umzugehen. Sie können auch festlegen, dass jeder Mitarbeiter eine Verbindung zu jeder der Warteschlangen hält und bis zu einer nicht bestätigten Nachricht von jeder der Warteschlangen erhält. Der Arbeiter kann dann die ausstehenden Nachrichten im Speicher viel schneller durchlaufen, vorausgesetzt, das Zeitlimit für nicht bestätigte Nachrichten ist ausreichend hoch eingestellt.

Alternativ können Sie zwei Austauschvorgänge mit jeweils einer gebundenen Warteschlange erstellen. Alle Arbeit geht an den ersten Austausch und die Schlange, die ein Pool von Arbeitern konsumiert. Wenn eine Arbeitseinheit zu lange dauert, kann der Arbeiter sie abbrechen und in die zweite Warteschlange schieben. Arbeiter verarbeiten nur die zweite Warteschlange, wenn sich in der ersten Warteschlange nichts befindet. Möglicherweise möchten Sie auch ein paar Mitarbeiter mit der umgekehrten Warteschlangenpriorisierung, um sicherzustellen, dass lange laufende Aufgaben noch verarbeitet werden, wenn ein nie endender Strom von kurzen Aufgaben eintrifft, so dass ein Benutzerstapel immer verarbeitet wird. Dies wird Ihre Arbeitsflotte nicht wirklich auf alle Aufgaben verteilen, aber es wird lang andauernde Aufgaben von einem Benutzer stoppen, die Ihre Arbeiter davon abhalten, kurz laufende Aufgaben für denselben oder einen anderen Benutzer auszuführen. Außerdem wird davon ausgegangen, dass Sie einen Job abbrechen und später ohne Probleme erneut ausführen können. Es bedeutet auch, dass Ressourcen von Tasks verschwendet werden, die eine Zeitüberschreitung aufweisen und erneut mit niedriger Priorität ausgeführt werden müssen. Es sei denn, Sie können schnelle und langsame Aufgaben im Voraus erkennen

Der erste Vorschlag mit den 100 Warteschlangen könnte auch ein Problem haben, wenn es 100 langsame Aufgaben für einen einzelnen Benutzer gibt, dann bucht ein anderer Benutzer einen Stapel von Aufgaben. Diese Aufgaben werden erst untersucht, wenn eine der langsamen Aufgaben beendet ist. Wenn sich dies als ein berechtigtes Problem herausstellt, können Sie die beiden Lösungen möglicherweise kombinieren.

    
LaserJesus 27.09.2016 22:45
quelle
1

Sie können Ihren Pool von Arbeitern alle die gleiche eindeutige Warteschlange verwenden. Die Arbeit wird dann über sie verteilt und Sie können Ihren Pool vergrößern / verkleinern, um Ihre Arbeitsverarbeitungskapazität zu erhöhen / verringern.

    
David Dossot 28.11.2011 20:36
quelle
1

Ich verstehe nicht, warum Sie die Vhosts von RabbitMQ nicht verwenden und sich Ihre App bei RabbitMQ anmelden und sich bei einer separaten Verbindung für jeden Benutzer authentifizieren.

Dies bedeutet nicht, dass Sie keinen Arbeiter-Supervisor haben können, der Arbeiter einem oder anderen Benutzer zuweist. Aber es bedeutet, dass alle Nachrichten für jeden Benutzer von völlig separaten Austauschen und Warteschlangen verarbeitet werden.

    
Michael Dillon 29.01.2012 06:34
quelle
0

Arbeitern werden 0+ Warteschlangen zugewiesen, keine Austauschvorgänge.

Die Logik, für die Aufgaben aus welchen Warteschlangen für jeden Worker ausgeführt werden, wird in der Klasse CELERYD_CONSUMER implementiert, die standardmäßig celery.worker.consumer.Consumer ist.

Sie können eine benutzerdefinierte Verbraucherklasse erstellen, die beliebige Logik implementiert. Der schwierige Teil wird die Details des "Fairness" -Algorithmus bestimmen, den Sie verwenden möchten; Sobald Sie dies jedoch entschieden haben, können Sie es implementieren, indem Sie eine benutzerdefinierte Verbraucherklasse erstellen und diese den entsprechenden Mitarbeitern zuweisen.

    
Chris Johnson 29.08.2013 18:57
quelle

Tags und Links