Sicherstellen, dass eine Nachricht, die in einer Themenbörse veröffentlicht wurde, von mindestens einem Verbraucher empfangen wird

8

TLDR; Im Kontext eines Themenaustauschs und von Warteschlangen, die von den Kunden im Handumdrehen erstellt werden, wie kann eine Nachricht erneut übermittelt werden / der Produzent wird benachrichtigt, wenn kein Verbraucher die Nachricht konsumiert?

Ich habe folgende Komponenten:

  • ein Hauptdienst, der Dateien erzeugt. Jede Datei hat eine bestimmte Kategorie (z. B. Bilder.Profil , Bilder.Galerie )
  • eine Gruppe von Arbeitern, die Dateien konsumieren und eine Textausgabe von ihnen erzeugen (z. B. die Größe der Datei)

Ich habe momentan einen einzigen RabbitMQ-Themenaustausch.

  • Der Produzent sendet Nachrichten an den Austausch mit routing_key = file_category .
  • Jeder Verbraucher erstellt eine Warteschlange und bindet den Austausch an diese Warteschlange für eine Reihe von Routingschlüsseln (z. B. Bilder. * und videos.trending ).
  • Wenn ein Verbraucher eine Datei verarbeitet hat, wird das Ergebnis in eine Warteschlange processing_results verschoben.

Nun - das funktioniert richtig, aber es hat immer noch ein großes Problem. Wenn der Publisher derzeit eine Nachricht mit einem Routingschlüssel sendet, an den kein Verbraucher gebunden ist, geht die Nachricht verloren. Dies ist , denn selbst wenn die von den Konsumenten erstellte Warteschlange dauerhaft ist, wird sie zerstört, sobald der Kunde die Verbindung trennt, da er für diesen Kunden eindeutig ist .

Verbrauchercode (Python):

%Vor%

Das Löschen einer Nachricht in diesem Zustand ist in meinem Anwendungsfall nicht akzeptabel.

Lösungsversuch

Das "Verlieren" einer Nachricht wird jedoch akzeptabel, wenn der Hersteller benachrichtigt wird, dass kein Verbraucher die Nachricht erhalten hat () (in diesem Fall kann er sie später erneut senden). Ich habe herausgefunden, dass das obligatorische Feld helfen könnte, da die Spezifikation von AMQP besagt:

  

Dieses Flag teilt dem Server mit, wie er reagieren soll, wenn die Nachricht nicht an eine Warteschlange weitergeleitet werden kann. Wenn dieses Flag gesetzt ist, gibt der Server eine nicht umsetzbare Nachricht mit einer Return-Methode zurück.

Das funktioniert tatsächlich - im Producer kann ich ein ReturnListener :

registrieren %Vor%

Dies wird wie erwartet gedruckt A message was returned by the broker , wenn eine Nachricht mit einem Routing-Schlüssel gesendet wird, an den kein Verbraucher gebunden ist.

Nun möchte ich auch wissen, wann die Nachricht von einem Verbraucher richtig empfangen wurde. Also habe ich versucht, auch ein ConfirmListener zu registrieren:

%Vor%

Das Problem hier ist, dass das ACK vom Broker gesendet wird und nicht vom Kunden selbst. Wenn der Produzent also eine Nachricht mit einem Routing-Schlüssel sendet K :

  • Wenn ein Verbraucher an diesen Routing-Schlüssel gebunden ist, sendet der Broker nur ein ACK
  • Andernfalls sendet der Broker eine basic.return gefolgt von einer ACK

Siehe die Dokumentation:

  

Bei nicht routbaren Nachrichten gibt der Broker eine Bestätigung aus, sobald der Austausch bestätigt, dass eine Nachricht nicht an eine Warteschlange weitergeleitet wird (gibt eine leere Liste von Warteschlangen zurück). Wenn die Nachricht auch als obligatorisch veröffentlicht wird, wird basic.return vor basic.ack an den Client gesendet. Das gleiche gilt für negative Bestätigungen (basic.nack).

Während also mein Problem theoretisch lösbar ist, würde es die Logik machen zu wissen, ob eine Nachricht korrekt konsumiert wurde (besonders im Zusammenhang mit Multi-Threading, Persistenz in einer Datenbank, etc.):

%Vor%

Mögliche andere Lösungen

  • Halten Sie eine Warteschlange für jede Dateikategorie bereit, d. h. die Warteschlangen pictures_profile, pictures_gallery, usw. Nicht gut, da sie den Kunden viel Flexibilität ersparen

  • Habe eine "response timeout" -Logik im Producer. Der Produzent sendet eine Nachricht. Er erwartet eine "Antwort" für diese Nachricht in der Warteschlange processing_results . Eine Lösung wäre, die Nachricht erneut zu senden, wenn sie nach X Sekunden nicht beantwortet wurde. Ich mag es aber nicht, es würde eine zusätzliche knifflige Logik im Produzenten schaffen.

  • Produzieren Sie die Nachrichten mit einer TTL von 0, und lassen Sie den Produzenten einen Dead-Letter-Austausch anhören. Dies ist die offizielle vorgeschlagene Lösung , um die zu ersetzen "Sofort" -Flag wurde in RabbitMQ 3.0 entfernt (siehe Abschnitt Entfernung des "Sofort" -Flags ). Laut den Dokumenten des Dead Letter Exchanges kann ein Dead Letter Exchange nur pro Warteschlange konfiguriert werden. Also würde es hier nicht funktionieren

  • [edit] Eine letzte Lösung, die ich sehe, ist, dass jeder Konsument eine dauerhafte Warteschlange erstellt, die nicht zerstört wird, wenn er die Verbindung unterbricht und sie darauf hört. Beispiel: consumer1 erstellt queue-consumer-1 , das an die Nachricht von myExchange mit einem Routingschlüssel abcd gebunden ist. Das Problem, das ich vorhersehe, ist, dass es impliziert, eine eindeutige Kennung für jede Verbraucheranwendungsinstanz zu finden (z. B. den Hostnamen der Maschine, auf der sie ausgeführt wird).

Ich hätte gerne etwas dazu - Danke!

Verwandt mit:

[Bearbeiten] Lösung

Ich habe etwas implementiert, das wie bereits erwähnt eine basic.return verwendet. Es ist eigentlich nicht so schwierig zu implementieren, Sie müssen nur sicherstellen, dass Ihre Methode die Nachrichten und die Methode mit den grundlegenden Rückgaben zu synchronisieren synchronisiert (oder eine gemeinsame Sperre haben, wenn nicht in der gleichen Klasse), sonst können Sie am Ende Interleaved Execution Flows, die Ihre Geschäftslogik durcheinander bringen.

    
christophetd 21.03.2017, 16:27
quelle

1 Antwort

2

Ich glaube, ein alternativer Austausch wäre die beste Lösung für Ihren Anwendungsfall für den Teil, der die Identifizierung nicht betrifft geroutete Nachrichten.

  

Wenn ein Austausch mit einer konfigurierten AE keine Nachricht an eine Warteschlange weiterleiten kann, wird stattdessen die Nachricht an die angegebene AE gesendet.

Grundsätzlich bei der Erstellung des "Haupt" Exchange konfigurieren Sie einen alternativen Austausch dafür. Für den referenzierten alternativen Austausch tendiere ich dazu, mit einem Fanout zu gehen und dann eine Warteschlange zu erstellen (notroutedq), die daran gebunden ist. Dies bedeutet, dass jede Nachricht, die nicht in mindestens einer der Warteschlangen, die an Ihre "Haupt" -Vermittlung gebunden sind, veröffentlicht wird, in der nicht-geroutet-Gruppe endet

Nun zu Ihrer Aussage:

  

Auch wenn die von den Konsumenten erstellte Warteschlange dauerhaft ist, wird sie zerstört, sobald der Kunde die Verbindung trennt, da er für diesen Verbraucher eindeutig ist.

Scheint, dass Sie Ihre Warteschlangen so konfiguriert haben, dass die automatische Löschung auf "true" gesetzt ist. Wenn dies der Fall ist, im Falle einer Trennung, wie Sie angegeben haben, wird die Warteschlange zerstört und die in der Warteschlange verbliebenen Nachrichten gehen verloren, ein Fall, der nicht durch die alternative Austauschkonfiguration abgedeckt ist .

Aus Ihrer Anwendungsfallbeschreibung geht nicht klar hervor, ob Sie in einigen Fällen erwarten, dass eine Nachricht in mehr als einer Warteschlange endet. Es scheint mehr eine Warteschlange pro Art der Verarbeitung zu geben (während die Gruppierung flexibel bleibt). . Wenn die Warteschlangenaufteilung in der Tat mit der Art der Verarbeitung zusammenhängt, sehe ich keinen Vorteil darin, die Warteschlange automatisch löschen zu lassen, erwarte, dass Sie möglicherweise keine Säuberungswartung durchführen müssen, wenn Sie die Bindungen ändern wollen .

Vorausgesetzt, Sie können mit dauerhaften Warteschlangen gehen, dann eine Dead Letter Austausch (würde wieder mit Fanout gehen) mit einer Bindung zu einem dlq würde die fehlenden Fälle abdecken.

  • nicht weitergeleitet durch alternativen Austausch
  • abgedeckt
  • korrekte Verarbeitung, die bereits von Ihrer Warteschlange "processing_result" verarbeitet wurde
  • problematische Verarbeitung oder zu lange verarbeitet werden durch den Dead Letter Exchange abgedeckt, in welchem ​​Fall die zusätzlichen Header bei totem Beschriften der Nachricht hinzugefügt kann sogar dazu beitragen, die Art der Maßnahmen zu identifizieren
Olivier 24.03.2017 08:34
quelle