Ich frage mich, ob es eine Möglichkeit gibt, RabbitMQ oder Redis so einzurichten, dass sie mit Sellery arbeiten, so dass, wenn ich eine Aufgabe an die Warteschlange sende, diese nicht in eine Aufgabenliste, sondern in eine Reihe von Aufgaben geht basierend auf der Nutzlast meiner Aufgabe, um Duplikate zu vermeiden.
Hier ist mein Setup für mehr Kontext: Python + Sellerie. Ich habe RabbitMQ als Backend ausprobiert, jetzt benutze ich Redis als Backend, weil ich nicht die 100% ige Zuverlässigkeit brauche, einfacher zu bedienen, kleiner Speicherbedarf, etc.
Ich habe ungefähr 1000 IDs, die wiederholt ausgeführt werden müssen. Stufe 1 meiner Datenpipeline wird von einem Scheduler ausgelöst und gibt Tasks für Stufe 2 aus. Die Tasks enthalten nur die ID, für die Arbeit geleistet werden muss, und die eigentlichen Daten werden in der Datenbank gespeichert. Ich kann jede Kombination oder Reihenfolge von Stufe 1 und Stufe 2 ohne Schaden ausführen.
Wenn Stufe 2 nicht genügend Verarbeitungsleistung hat, um mit dem Volumen der von Stufe 1 ausgegebenen Aufgaben fertig zu werden, wächst und wächst meine Aufgabenwarteschlange. Dies müsste nicht der Fall sein, wenn die verwendete Task-Queue anstelle von Listen die zugrunde liegende Datenstruktur setzt.
Gibt es eine Standardlösung für den Wechsel von Listen zu Sets als verteilte Aufgabenwarteschlangen? Ist Sellerie dazu in der Lage? Ich habe kürzlich gesehen, dass Redis gerade eine Alpha-Version eines Warteschlangensystems veröffentlicht hat, so dass es noch nicht für die Produktion bereit ist.
Soll ich meine Pipeline anders gestalten?
Sie können eine externe Datenstruktur verwenden, um den aktuellen Status Ihrer Sellerie-Warteschlange zu speichern und zu überwachen. 1. Nehmen wir zum Beispiel einen redis-Schlüsselwert. Immer wenn Sie eine Aufgabe in Sellerie schieben, markieren Sie einen Schlüssel mit Ihrem 'id' Feld als wahr in redis.
Bevor Sie versuchen, eine neue Aufgabe mit einer 'id' zu pushen, würden Sie überprüfen, ob der Schlüssel mit 'id' in redis wahr ist oder nicht, wenn ja, überspringen Sie die Aufgabe.
Um die Schlüssel zur richtigen Zeit zu löschen, können Sie after_return verwenden Handler von Sellerie, der läuft, wenn die Aufgabe zurückgekehrt ist. Dieser Handler hebt die Schlüssel-ID in redis auf und löscht somit die Sperre für den nächsten Task-Push.
Diese Methode stellt sicher, dass Sie nur EINE Instanz pro ID der Aufgabe haben, die in der Sellerie-Warteschlange ausgeführt wird. Sie können es auch so erweitern, dass nur N Tasks pro ID zulässig sind, indem Sie die INCR- und DECR-Befehle auf der Redis-Taste verwenden, wenn die Task gedrückt wird und die Task nachher zurückkehrt.
Können Ihre Aufgaben in Stufe 2 prüfen, ob die Arbeit bereits erledigt wurde und, wenn dies der Fall ist, die Arbeit nicht erneut ausführen? Auf diese Weise, auch wenn Ihre Aufgabenliste wächst, wird die Menge an Arbeit, die Sie tun müssen, nicht.
Ich habe keine Lösung für die Sets / Listen gefunden, und ich würde denken, dass es viele andere Möglichkeiten gibt, dieses Problem zu umgehen.
Verwenden Sie ein SortedSet innerhalb von Redis für Ihre Jobwarteschlange. Es ist in der Tat ein Set, wenn Sie die exakt gleichen Daten darin einfügen, wird es keinen neuen Wert hinzufügen (es müssen unbedingt genau die gleichen Daten sein, Sie können die in SortedSet in Redis verwendete Hash-Funktion nicht überschreiben).
Sie benötigen einen Punktwert, um mit SortedSet zu verwenden, Sie können einen Zeitstempel (Wert als ein Doppel, zum Beispiel unixtime) verwenden, mit dem Sie die neuesten Elemente / ältesten Elemente abrufen können, wenn Sie möchten. ZRANGBYSCORE ist wahrscheinlich der Befehl, nach dem Sie suchen werden. Ссылка
Wenn Sie zusätzliche Verhaltensweisen benötigen, können Sie bei Bedarf auch alles innerhalb eines Lua-Skripts für atomistisches Verhalten und benutzerdefinierte Räumungsstrategie umschließen. Zum Beispiel das Aufrufen eines "get" -Skripts, das den Auftrag erhält und atomar aus der Warteschlange löscht oder die Daten entfernt, wenn zu viel Gegendruck besteht, usw.
Tags und Links python celery rabbitmq redis message-queue