Code:
Python-Version 2.7.x und Luftstromversion 1.5.1
mein dag-Skript ist das
%Vor%Davon konnte ich sehen, dass ich eine DAG mit 6 Aufgaben erstelle, wobei die erste Aufgabe (Start1) zuerst startet, nach der alle anderen fünf Aufgaben beginnen
Momentan habe ich 5 Minuten Zeitverzögerung zwischen dem Start der DAG gegeben
Es ist perfekt für alle sechs Aufgaben des ersten Typs gelaufen, aber nach fünf Minuten wird die DAG nicht neu initialisiert
Es ist mehr als eine Stunde her, dass die DAG nicht neu initiiert wird. Ich weiß wirklich nicht, ob ich falsch liege.
Es wäre wirklich nett, wenn jemand mich darauf hinweisen könnte, was falsch ist. Ich habe versucht, mit airflow testing clear
zu löschen, dann geschieht dasselbe. Es lief die erste Instanz und stand dann einfach da.
Das einzige, was die Befehlszeile anzeigt, ist Getting all instance for DAG testing
Wenn ich die Position von schedule_interval ändere, läuft es einfach ohne irgendein Schedule-Intervall parallel. Das ist mit in 5 Minuten 300 oder mehr Task-Instanzen wurde abgeschlossen. Es gibt kein 5-Minuten-Planungsintervall
Code 2:
%Vor%Danke Vignesch,
Für Code 2 ist der Grund warum er jede Minute läuft:
Die Startzeit ist 2015-10-13 00:00
Das Zeitplanintervall beträgt 5 Minuten
Jeder Heartbeat des Schedulers (standardmäßig 5 Sekunden), Ihre DAG wird überprüft
Als Lösung wird das DAG-Startdatum als datetime.now() - schedule_interval
festgelegt.
Und auch wenn Sie debuggen wollen:
Setzen Sie LOGGINGLEVEL auf debug
in settings.py
Ändern Sie die Klassenmethode is_queueable()
von airflow.models.TaskInstance
in
:
%Vor%Da die Startzeit (2015-10-13 00:00) weniger Zeit hat als jetzt, löst sie den Luftstrom aus Verfüllung . Es wird vom 2015-10-13 00:00 ausgeführt, wenn der Luftstromplaner alle Sekunden erkannt hat (das Startdatum), aber das Ausführungsdatum zwischen 5 Minuten (Aufgabenintervallzeit) liegt.
Siehe den Protokollnamen:
%Vor%Siehe die Erstellungszeit von Protokollen:
%Vor%Sie können auch die Aufgabeninstanzen auf der Webbenutzeroberfläche sehen: