Demultiplexen von Nachrichten aus einer Warteschlange zur Verarbeitung in parallelen Streams mit amqp?

9

Ich versuche herauszufinden, ob ich von einem blockierenden Szenario zu einem reaktiveren Muster wechseln kann.

Ich habe eingehende Update-Befehle, die in einer Warteschlange ankommen, und ich muss sie in der Reihenfolge behandeln, aber nur diejenigen, die dieselbe Entität betreffen. Im Wesentlichen kann ich so viele parallele Streams von Aktualisierungsereignissen erstellen, wie ich möchte, solange keine zwei Streams Ereignisse bezüglich derselben Entität enthalten.

Ich dachte, dass der Verbraucher der primären Warteschlange möglicherweise in der Lage wäre, die Routing-Mechanismen und temporären Warteschlangen von amqp zu nutzen, indem er für jede Entitäts-ID temporäre Warteschlangen erstellt und einen Verbraucher an sie anhängt. Sobald der Teilnehmer beendet ist und keine anderen Ereignisse in Bezug auf die fragliche Einheit in der Warteschlange sind, kann die Warteschlange entsorgt werden.

Wird dieses Szenario regelmäßig verwendet? Gibt es einen besseren Weg, dies zu erreichen? In unserem aktuellen System verwenden wir eine benannte Sperre, die auf der ID basiert, um gleichzeitige Aktualisierungen zu verhindern.

    
loteq 17.07.2014, 08:32
quelle

1 Antwort

1

Es gibt mindestens 2 Optionen:

Eine einzelne Warteschlange für jede Entität Und n Consumers in einer Entity-Queue.

Eine Warteschlange mit Nachrichten aller Entitäten . Wo die Nachricht Daten enthält, was es für eine Entität ist. Sie könnten dies in mehrere Warteschlangen aufteilen (Eine AMQP-Warteschlange für einen Entitätstyp) oder indem Sie eine BlockingQueue Implementierung.

Vorteile der Aufteilung der Entitäten in qmqp-queues

  • Sie könnten ein ha-Setup mit rabbitmq
  • erstellen
  • Sie können Nachrichten weiterleiten
  • Sie könnten vielleicht mehr als einen Consumer einer Entity Queue haben, wenn es sie gibt ist eines Tages notwendig (Skalierbarkeit)
  • Nachrichten können persistent sein und daher auf einem Anwendungsabsturz

Vorteile der Verwendung einer internen BlockingQueue-Implementierung

  • Es ist schneller (kein net-io offensichtlich)
  • Alles muss in einer JVM passieren

Wie auch immer es hängt davon ab, was Sie wollen, da beide Wege ihre Vorteile haben könnten.

UPDATE: Ich bin mir nicht sicher, ob ich Sie jetzt habe, aber lassen Sie mich Ihnen einige Ressourcen geben, um einige Dinge auszuprobieren. Es gibt spezielle rabbitmq Erweiterungen , von denen einige Ihnen vielleicht eine Idee geben können. Werfen Sie einen Blick auf lternate exchanges und Austausch zum Austausch Bindings.

Auch für grundlegende Tests bin ich nicht sicher, ob es alle rabbitmq-Features oder überhaupt alle amqp-Features abdeckt, aber das kann manchmal nützlich sein. Denken Sie daran, dass der Routing-Schlüssel in dieser Visualisierung der Herstellername ist, Sie können dort auch einige Beispiele finden. Importieren und exportieren Sie Ihre Konfiguration.

    
Zarathustra 24.07.2014 05:35
quelle