Airflow dynamische DAG und Task-IDs

8

Ich sehe Airflow hauptsächlich für ETL- / Bid-Daten-bezogene Jobs. Ich versuche, es für Business-Workflows zu verwenden, in denen eine Benutzeraktion in Zukunft eine Reihe abhängiger Aufgaben auslöst. Einige dieser Aufgaben müssen möglicherweise aufgrund bestimmter anderer Benutzeraktionen gelöscht (gelöscht) werden. Ich dachte, der beste Weg, dies zu bewältigen, wäre über dynamische Aufgaben-IDs. Ich habe gelesen, dass Airflow dynamische Dag-IDs unterstützt. Also habe ich ein einfaches Python-Skript erstellt, das die DAG-ID und die Task-ID als Befehlszeilenparameter verwendet. Ich stoße jedoch auf Probleme, die es funktionieren lassen. Es gibt dag_id nicht gefundenen Fehler. Hat jemand das versucht? Hier ist der Code für das Skript (nennen Sie es tmp.py), die ich in der Befehlszeile als Python ausführen (python tmp.py 820 2016-08-24T22: 50: 00):

%Vor%     
Dean Sha 24.08.2016, 21:34
quelle

2 Antworten

8

Nach zahlreichen Versuchen und Fehlern konnte ich das herausfinden. Hoffentlich wird es jemandem helfen. Und so funktioniert es: Sie benötigen einen Iterator oder eine externe Quelle (Datei / Datenbanktabelle), um dags / task dynamisch über eine Vorlage zu generieren. Sie können die dag- und task-Namen statisch behalten, ordnen Sie ihnen einfach ids dynamisch zu, um einen dag von dem anderen zu unterscheiden. Sie legen dieses Python-Skript in den dags-Ordner. Wenn Sie den Luftstrom-Scheduler starten, durchläuft er bei jedem Herzschlag dieses Skript und schreibt die DAGs in die dag-Tabelle in der Datenbank. Wenn eine dag (eindeutige dag id) bereits geschrieben wurde, wird sie einfach übersprungen. Der Scheduler betrachtet auch den Zeitplan einzelner DAGs, um zu bestimmen, welcher für die Ausführung bereit ist. Wenn eine DAG zur Ausführung bereit ist, führt sie sie aus und aktualisiert ihren Status. Hier ist ein Beispielcode:

%Vor%     
Dean Sha 23.09.2016 18:04
quelle
7

Aus Wie kann ich DAGs dynamisch erstellen? :

  

Airflow sucht in Ihrem DAGS_FOLDER nach Modulen, die DAG-Objekte in ihrem globalen Namespace enthalten, und fügt die Objekte hinzu, die im DagBag gefunden werden. Zu wissen, dass alles, was wir brauchen, eine Möglichkeit ist, dynamisch Variablen im globalen Namespace zuzuordnen, was in Python mit der Funktion globals () für die Standard-Bibliothek, die sich wie ein einfaches Wörterbuch verhält, leicht gemacht werden kann.

%Vor%
    
Scott Ding 23.05.2017 06:48
quelle

Tags und Links