Wie man Airflow benutzt: Eine praktische, durchgehende Anleitung zum Aufbau zuverlässiger Datenpipelines
Wenn Sie Daten verschieben oder ML-Jobs orchestrieren, haben Sie wahrscheinlich immer wieder denselben Refrain gehört: „Pack es einfach in Airflow.“ Die Wahrheit ist, Apache Airflow glänzt, wenn Sie Transparenz, Zuverlässigkeit und Kontrolle über komplexe Workflows benötigen. In dieser praktischen Anleitung führen wir Sie Schritt für Schritt durch die Verwendung von Airflow – von den Kernkonzepten bis hin zu produktionsreifen Mustern –, damit Sie Pipelines ausliefern können, denen Sie vertrauen.
Wir werden es handlungsorientiert halten: Sie erhalten ein mentales Modell für DAGs und Tasks, praktische Beispiele mit der TaskFlow-API, Bereitstellungsoptionen, Teststrategien und Best Practices. Am Ende werden Sie von „Ich kann das Tutorial ausführen“ zu „Ich kann das in der Produktion ausführen“ gelangen.
Hinweis: Für tiefere Einblicke und Referenzen ist die offizielle Dokumentation ausgezeichnet und wird regelmäßig aktualisiert.
Was ist Apache Airflow wirklich?
Airflow ist ein Orchestrator – kein Datenverarbeiter. Es plant, ordnet und überwacht die Arbeit, die Sie anderswo ausführen (Datenbanken, Data Warehouses, Spark-Jobs, APIs, Container). Sie definieren Workflows als DAGs (Directed Acyclic Graphs), die lediglich Python-Dateien sind, die Tasks und ihre Abhängigkeiten kodieren. Airflow führt diese Tasks dann gemäß Ihrem Zeitplan, Ihren Parametern und Ihrer Umgebung aus.
- DAG: Die Workflow-Definition (Graph von Tasks mit Abhängigkeiten).
- Task: Eine Arbeitseinheit (Python-Funktion, SQL-Ausführung, Bash-Befehl, externer Job-Trigger usw.).
- Operator: Eine Vorlage für eine Art von Task (z. B.
PythonOperator, BashOperator, KubernetesPodOperator).
- Scheduler: Entscheidet, was wann ausgeführt werden soll.
- Executor: Führt Tasks aus (lokal, mit Celery, Kubernetes usw.).
Beginnen Sie mit den offiziellen Tutorials, sobald Sie Airflow installiert haben; sie vermitteln Ihnen schnell das Gesamtbild.
Airflow richtig installieren und ausführen
Airflow ist flexibel. Wählen Sie den Pfad, der zu Ihrer Phase passt:
- Lokale Entwicklung (Schnellstart):
- Verwenden Sie das vom Projekt bereitgestellte Docker Compose für den Schnellstart. Es startet den Webserver, den Scheduler, die Datenbank und mehr mit sinnvollen Standardeinstellungen.
- Ideal zum Lernen und Iterieren von DAGs.
- Kleines Team oder Staging:
- Celery Executor oder Kubernetes Executor mit einem verwalteten Postgres.
- Speichern Sie Protokolle in S3/GCS und packen Sie Abhängigkeiten mit Ihrem Image oder
requirements.txt.
- Kubernetes Executor für Elastizität oder Celery Executor mit Autoscaling-Workern.
- Externe Secrets (Vault), robuste Observability (Protokolle + Metriken) und Blue/Green-Deploys für Upgrades.
Tipp: Verwalten Sie Ihre Airflow-Codebasis versionskontrolliert, containerisiert und getestet, bevor Sie sie hochstufen. Die Seite „Best Practices“ beschreibt produktionsreife Muster.
Kernkonzepte, die Sie täglich verwenden werden
DAGs: Ihr Workflow als Code
Ein DAG ist eine Python-Datei, die Folgendes definiert:
- DAG-Metadaten: ID, Zeitplan, Startdatum, Tags.
- Standardargumente: Wiederholungsversuche, Eigentümer, SLAs.
- Tasks und ihre Abhängigkeiten.
Betrachten Sie einen DAG als das „Was“ und „Wann“ und Tasks als das „Wie“.
Tasks und Operatoren
Operatoren sind Fertigbauteile für gängige Tasks. Beispiele:
- PythonOperator / TaskFlow
@task für Python-Code
- BashOperator für Shell-Befehle
- SimpleHttpOperator für APIs
- KubernetesPodOperator für containerisierte Jobs
- SQL-Provider (z. B. Snowflake, BigQuery, Postgres) für Data-Warehouse-Arbeiten
TaskFlow API: Der moderne, Python-ähnliche Weg
Mit der TaskFlow-API können Sie Tasks als Python-Funktionen mit @task schreiben, Werte zurückgeben, die über XCom übergeben werden, und diese sauber zusammensetzen. Sie reduziert Boilerplate-Code und verbessert die Lesbarkeit – sehr empfehlenswert.
Ihr erster Airflow DAG (TaskFlow Edition)
Nachfolgend finden Sie ein minimales Beispiel im ETL-Stil, um Schlüsselideen zu veranschaulichen: Scheduling, TaskFlow, Abhängigkeiten und XCom-Datenübergabe.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
.
Scheduling, Catchup und Backfills
schedule: Cron oder Voreinstellungen (@daily, @hourly).
start_date + catchup: Wenn catchup=True, führt Airflow Backfill-Ausführungen ab dem Startdatum durch. Setzen Sie für Streaming-ähnliche Pipelines catchup=False.
- Manuelle Backfills: Verwenden Sie die UI oder CLI, um historische Intervalle erneut auszuführen.
Praktische Faustregel: Aktivieren Sie Catchup für deterministische Batch-Jobs; deaktivieren Sie es für Echtzeit- oder API-ratenbegrenzte Pipelines.
Sichere Datenübergabe zwischen Tasks (XCom)
- Kleine Objekte: Rückgabewerte mit TaskFlow sind in Ordnung.
- Große Nutzdaten: Speichern Sie sie im Objektspeicher (S3/GCS) mit einem Schlüssel in XCom.
- Vermeiden Sie sensible Daten in XCom; verwenden Sie Secrets-Backends (z. B. Vault) und Umgebungsvariablen.
Dynamisches Task-Mapping und Fan-out-Workloads
Airflow kann Tasks zur Laufzeit dynamisch basierend auf Eingaben generieren – ideal für partitionierte Datensätze oder Multi-Tenant-Jobs.
- Halten Sie DAGs deterministisch und idempotent.
- Trennen Sie Orchestrierung (Airflow) von Berechnung (Spark, dbt, Data Warehouses).
- Verwenden Sie die TaskFlow-API für Klarheit und XCom-Hygiene.
- Parametrisieren Sie DAGs; verwenden Sie Variablen mit Bedacht.
- Überwachen, alarmieren und dokumentieren Sie Ihre Pipelines.
So arbeiten Sie mit Data Warehouses und ML
- Data Warehouses: Verwenden Sie Provider-Operatoren (z. B. SnowflakeOperator, BigQueryInsertJobOperator) für SQL-Jobs. Speichern Sie SQL in Dateien oder versionierten Modulen.
- dbt: Triggern Sie dbt über Bash/KubernetesPodOperator oder dedizierte dbt-Operatoren in Providern.
- ML: Orchestrieren Sie Feature-Generierung, Training und Batch-Inference als separate Tasks; cachen Sie Artefakte im Speicher und protokollieren Sie Metriken.
Erweiterte Planung: Datasets und DAG-übergreifende Abhängigkeiten
- Datasets ermöglichen es einem DAG, ein logisches Dataset zu erstellen, das einen anderen DAG auslöst, wenn es aktualisiert wird – sauberer als Ad-hoc-Trigger.
- Für Legacy-Muster funktioniert ExternalTaskSensor, aber Datasets sind deklarativer.
Sicherheit und Compliance
- Verwenden Sie rollenbasierte Zugriffskontrolle (RBAC) in der UI.
- Isolieren Sie Umgebungen pro Team oder Vertrauensgrenze.
- Führen Sie Audit-Trails über Protokolle und Verbindungsänderungshistorie.
Upgrades und Versionierung
- Testen Sie Upgrades im Staging mit produktionsähnlichen Workloads.
- Pinnen und aktualisieren Sie Provider bewusst.
- Lesen Sie die Versionshinweise für Executor-spezifische Änderungen und Deprecations.
Eine kurze Checkliste für Ihren ersten Produktions-DAG
- Klare Eigentümerschaft (
owner-Tag) und konfigurierte Warnmeldungen.
retries mit angemessenen Backoffs eingestellt.
- Idempotente Tasks und explizite Abhängigkeiten.
- Kleine XCom-Nutzdaten; große Daten im Speicher.
- Protokolle in dauerhaften Speicher verschoben; Metriken exportiert.
- Rollout-Plan (Kanarienvogel oder Blue/Green) und Rollback-Schritte.
Beispiel: Ein realistischer Warehouse-Load-DAG
Dieses Muster extrahiert tägliche Dateien, validiert sie und lädt sie in eine Data-Warehouse-Tabelle, mit dynamischem Mapping pro Partition und deferrable Sensoren.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
.
- Lesen Sie die Best Practices, bevor Sie in die Produktion gehen.
- Erkunden Sie die Provider-Dokumente für Ihre Systeme (Data Warehouses, Clouds, ML-Tools).
Übrigens: Beschleunigen Sie die Erstellung mit einem KI-Sidekick
Erwähnenswert: Wenn Sie viele DAGs entwerfen, kann ein KI-Assistent, der Code versteht, Boilerplate-Code beschleunigen, TaskFlow-Stubs generieren und sogar Abhängigkeitskorrekturen vorschlagen. Wenn Sie einen leichten Helfer neben Ihrem Editor und Browser wünschen, kann Sider.AI für schnelle Code-Umschreibungen und Erklärungen während der Entwicklung nützlich sein. Wichtige Erkenntnisse
- Verwenden Sie Airflow zum Orchestrieren, nicht zum Berechnen.
- Bevorzugen Sie die TaskFlow-API für saubere, testbare DAGs.
- Halten Sie Daten aus XCom heraus; übergeben Sie stattdessen Referenzen.
- Verwenden Sie deferrable Sensoren/Operatoren, um Slots zu sparen.
- Containerisieren, testen und fördern Sie durch Umgebungen.
- Verlassen Sie sich auf offizielle Tutorials und Best Practices als Ihren Nordstern.
FAQ
F1: Was ist der einfachste Weg, um zu lernen, wie man Airflow benutzt?
Beginnen Sie mit dem offiziellen Tutorial, um DAGs, Tasks, Scheduling und die UI zu verstehen. Erstellen Sie dann eine kleine TaskFlow-basierte Pipeline und iterieren Sie mit dem Best-Practices-Leitfaden für Produktionsbereitschaft.
F2: Soll ich die TaskFlow-API oder klassische Operatoren in Airflow verwenden?
Verwenden Sie die TaskFlow-API für die meisten Python-ähnlichen Pipelines, da sie sauberer ist und XCom-Rückgaben auf natürliche Weise verarbeitet. Klassische Operatoren sind immer noch großartig für Nicht-Python-Tasks wie Bash, SQL oder Container-Jobs.
F3: Wie übergebe ich große Daten zwischen Airflow-Tasks?
Vermeiden Sie es, große Nutzdaten in XCom zu legen. Speichern Sie Daten in S3/GCS oder einer Datenbank und übergeben Sie nur Referenzen oder URIs über XCom, um Tasks schnell und zuverlässig zu halten.
F4: Welchen Executor soll ich für Airflow in der Produktion wählen?
Für Elastizität und Isolation ist Kubernetes Executor eine starke Standardeinstellung. Für einfachere Setups funktioniert Celery Executor gut – stellen Sie einfach Autoscaling, robuste Protokollierung und externalisierte Secrets sicher.
F5: Wie handhabe ich Abhängigkeiten über mehrere Airflow-DAGs hinweg?
Verwenden Sie Datasets für deklarative DAG-übergreifende Trigger, wenn eine Pipeline Daten für eine andere erzeugt. Alternativ kann ExternalTaskSensor Ausführungen koordinieren, aber Datasets sind für die datengesteuerte Orchestrierung sauberer.