Ich habe zwei Arten von Aufgaben. Aufgabe A wird jede Stunde durch Selleriebeat erzeugt. Es wird sofort ausgeführt und generiert tausend (oder viele tausend) Instanzen von Task B, von denen jede eine ETA von einem Tag in der Zukunft hat.
Beim Start wird eine Instanz von Task A ausgeführt und generiert tausend Bs. Und von da an passiert nichts mehr. Ich sollte jede Stunde ein anderes A mit weiteren tausend Bs sehen. Aber in Wirklichkeit sehe ich nichts.
Beim Einfrieren zeigt rabbitmqctl 1000 Nachrichten an, mit 968 bereit und 32 unbestätigt. Eine Stunde später gibt es 1001 Nachrichten, 969 bereit und 32 unbestätigt. Und so weiter, jede Stunde eine neue Nachricht, die als fertig klassifiziert wurde. Vermutlich passiert es, dass der Worker 32 Nachrichten vorab abruft, aber nicht auf sie reagieren kann, da ihre ETA noch in der Zukunft liegt. In der Zwischenzeit können neuere Aufgaben, die gerade ausgeführt werden sollten, nicht ausgeführt werden.
Was ist der richtige Weg, damit umzugehen? Ich nehme an, dass ich mehrere Arbeiter brauche und vielleicht mehrere Warteschlangen (aber ich bin mir nicht sicher, was der letzte Punkt ist). Gibt es einen einfacheren Weg? Ich habe versucht, mit CELERYD_PREFETCH_MULTIPLIER und -Ofail zu spielen (wie hier diskutiert: Ссылка ), aber kann ' Lass es nicht gehen. Ist meine Frage die gleiche wie diese: [[Django Celery]] Sellerie blockiert tun IO Aufgaben ?
Jedenfalls: Ich kann dieses Problem nur angehen, weil ich viel über die Art der Aufgaben und deren Timing weiß. Scheint es nicht ein Designfehler, dass genug Aufgaben mit zukünftigen ETA das ganze System sperren können? Wenn ich ein paar Stunden warte und dann den Worker abtöte und neu starte, schnappt er sich wieder die ersten 32 Tasks und friert ein, obwohl sich zu diesem Zeitpunkt Aufgaben in der Warteschlange befinden, die jetzt sofort ausgeführt werden können. Sollte eine Komponente nicht schlau genug sein, ETAs zu betrachten und Aufgaben zu ignorieren, die nicht ausführbar sind?
NACHTRAG: Ich denke jetzt, dass das Problem ein bekannter Fehler ist, wenn RabbitMQ 3.3 mit Sellerie 3.1.0 verwendet wird. Mehr Informationen hier: Ссылка
Nach dem Update auf Sellery 3.1.1 sieht es besser aus. Aufgabe A läuft stündlich (gut, hat sie für ein paar Stunden) und plant ihre Kopien von Aufgabe B. Diese scheinen den Arbeiter aufzufüllen: Die Anzahl der unbestätigten Nachrichten wächst weiter. Ich muss sehen, ob es ohne Grenzen wachsen kann.
Es scheint, dass dies ein Problem ist, das mit Routing gelöst werden kann: Ссылка
Wenn Sie Routing verwenden, können Sie mehrere Warteschlangen haben, die mit verschiedenen Arten von Aufgaben gefüllt sind. Wenn Sie möchten, dass Task B nicht mehr Task A blockiert, können Sie sie in separate Arbeitswarteschlangen mit unterschiedlicher Priorität aufteilen, sodass Ihre Worker in der großen Warteschlange mit Task B arbeiten, aber wenn eine Task A ankommt, wird sie vom nächsten verfügbaren Pull gezogen Arbeiter.
Der zusätzliche Vorteil besteht darin, dass Sie auch stark ausgelasteten Warteschlangen mehr Mitarbeiter zuweisen können und diese Mitarbeiter nur aus der angegebenen Warteschlange mit hohem Volumen ziehen.
Wie viele Mitarbeiter haben Sie zur Zeit mit welchem Nebenläufigkeit ausgeführt?
Die Erhöhung der Parallelität Ihrer Mitarbeiter könnte das Problem beheben. Wenn Thread X bei Task A hängen bleibt, der lange dauert oder in einem Wartezustand ist, könnte ein anderer Thread bei den anderen vorab geholten Tasks arbeiten.
Tags und Links python celery rabbitmq scheduling