Airflow plant nicht richtig Python

8

Code:

Python-Version 2.7.x und Luftstromversion 1.5.1

mein dag-Skript ist das

%Vor%

Davon konnte ich sehen, dass ich eine DAG mit 6 Aufgaben erstelle, wobei die erste Aufgabe (Start1) zuerst startet, nach der alle anderen fünf Aufgaben beginnen

Momentan habe ich 5 Minuten Zeitverzögerung zwischen dem Start der DAG gegeben

Es ist perfekt für alle sechs Aufgaben des ersten Typs gelaufen, aber nach fünf Minuten wird die DAG nicht neu initialisiert

Es ist mehr als eine Stunde her, dass die DAG nicht neu initiiert wird. Ich weiß wirklich nicht, ob ich falsch liege.

Es wäre wirklich nett, wenn jemand mich darauf hinweisen könnte, was falsch ist. Ich habe versucht, mit airflow testing clear zu löschen, dann geschieht dasselbe. Es lief die erste Instanz und stand dann einfach da.

Das einzige, was die Befehlszeile anzeigt, ist Getting all instance for DAG testing

Wenn ich die Position von schedule_interval ändere, läuft es einfach ohne irgendein Schedule-Intervall parallel. Das ist mit in 5 Minuten 300 oder mehr Task-Instanzen wurde abgeschlossen. Es gibt kein 5-Minuten-Planungsintervall

Code 2:

%Vor%

Danke Vignesch,

    
The6thSense 14.10.2015, 13:06
quelle

2 Antworten

4

Für Code 2 ist der Grund warum er jede Minute läuft:

  1. Die Startzeit ist 2015-10-13 00:00

  2. Das Zeitplanintervall beträgt 5 Minuten

  3. Jeder Heartbeat des Schedulers (standardmäßig 5 Sekunden), Ihre DAG wird überprüft

    • Erste Überprüfung: Startdatum (kein letztes Ausführungsdatum gefunden) + Scheduler Intervall & lt; aktuelle Uhrzeit? Wenn ja, wird die DAG ausgeführt und zuletzt Ausführungszeit wird aufgezeichnet. (zB 2015-10-13 00:00 + 5min & lt; aktuell?)
    • Zweite Überprüfung des nächsten Heartbeats: letzte Ausführungszeit + Scheduler Intervall & lt; aktuelle Uhrzeit? Wenn dies der Fall ist, wird die DAG erneut ausgeführt.
    • ....

Als Lösung wird das DAG-Startdatum als datetime.now() - schedule_interval festgelegt.

Und auch wenn Sie debuggen wollen:

  1. Setzen Sie LOGGINGLEVEL auf debug in settings.py

  2. Ändern Sie die Klassenmethode is_queueable() von airflow.models.TaskInstance in

:

%Vor%     
Yongyiw 15.10.2015, 13:59
quelle
3

Da die Startzeit (2015-10-13 00:00) weniger Zeit hat als jetzt, löst sie den Luftstrom aus Verfüllung . Es wird vom 2015-10-13 00:00 ausgeführt, wenn der Luftstromplaner alle Sekunden erkannt hat (das Startdatum), aber das Ausführungsdatum zwischen 5 Minuten (Aufgabenintervallzeit) liegt.

Siehe den Protokollnamen:

%Vor%

Siehe die Erstellungszeit von Protokollen:

%Vor%

Sie können auch die Aufgabeninstanzen auf der Webbenutzeroberfläche sehen:

    
Kaibo Zhou 09.11.2015 07:08
quelle

Tags und Links