Sådan bruges Airflow: En praktisk, komplet guide til at bygge pålidelige datapipelines
Hvis du flytter data eller orkestrerer ML-jobs, har du sandsynligvis hørt det samme omkvæd: “Bare læg det i Airflow.” Sandheden er, at Apache Airflow virkelig brillerer, når du har brug for synlighed, pålidelighed og kontrol over komplekse workflows. I denne praktiske guide vil vi trin for trin gennemgå, hvordan du bruger Airflow – fra kernekoncepter til produktionsklare mønstre – så du kan levere pipelines, du har tillid til.
Vi holder det handlingsorienteret: du får en mental model for DAG'er og tasks, praktiske eksempler med TaskFlow API'en, implementeringsmuligheder, teststrategier og best practices. Til sidst vil du gå fra "Jeg kan køre tutorialen" til "Jeg kan køre dette i produktion."
Bemærk: For dybere dyk og reference er de officielle dokumenter fremragende og regelmæssigt opdaterede.
Hvad er Apache Airflow egentlig?
Airflow er en orkestrator – ikke en databehandler. Den planlægger, ordner og overvåger arbejde, du kører andre steder (databaser, datalagre, Spark-jobs, API'er, containere). Du definerer workflows som DAG'er (Directed Acyclic Graphs), som bare er Python-filer, der koder tasks og deres afhængigheder. Airflow udfører derefter disse tasks i henhold til din tidsplan, parametre og miljø.
- DAG: Workflow-definitionen (graf over tasks med afhængigheder).
- Task: En arbejdsenhed (Python-funktion, SQL-udførelse, Bash-kommando, ekstern jobtrigger osv.).
- Operator: En skabelon til en type task (f.eks.
PythonOperator, BashOperator, KubernetesPodOperator).
- Scheduler: Beslutter, hvad der skal køres, og hvornår.
- Executor: Kører tasks (lokalt, med Celery, Kubernetes osv.).
- UI: Dit kontrolcenter for kørsler, logs, genforsøg og lineage.
Start med de officielle tutorials, når du har installeret Airflow; de giver dig hurtigt det store billede.
Installation og kørsel af Airflow på den rigtige måde
Airflow er fleksibel. Vælg den sti, der matcher dit stadie:
- Lokal udvikling (hurtig start):
- Brug den hurtige Docker Compose, der leveres af projektet. Den spinner webserveren, scheduler, database og mere op med fornuftige standardindstillinger.
- Fantastisk til at lære og iterere på DAG'er.
- Lille team eller staging:
- Celery Executor eller Kubernetes Executor med en administreret Postgres.
- Gem logs i S3/GCS og pak afhængigheder med dit image eller
requirements.txt.
- Kubernetes Executor for elasticitet eller Celery Executor med autoskalering af workers.
- Eksterne secrets (Vault), robust observerbarhed (logs + metrics) og blue/green deploys til opgraderinger.
Tip: Hold din Airflow-kodebase versionskontrolleret, containeriseret og testet før promovering. Siden "Best Practices" skitserer produktionsklare mønstre.
Kernekoncepter, du vil bruge dagligt
DAG'er: Dit workflow som kode
En DAG er en Python-fil, der definerer:
- DAG-metadata: id, tidsplan, startdato, tags.
- Standardargumenter: genforsøg, ejere, SLA'er.
- Tasks og deres afhængigheder.
Tænk på en DAG som "hvad" og "hvornår", og tasks som "hvordan".
Tasks og Operators
Operators er præfabrikerede komponenter til almindelige tasks. Eksempler:
- PythonOperator / TaskFlow
@task til Python-kode
- BashOperator til shell-kommandoer
- SimpleHttpOperator til API'er
- KubernetesPodOperator til containeriserede jobs
- SQL-udbydere (f.eks. Snowflake, BigQuery, Postgres) til datalagerarbejde
TaskFlow API: Den moderne, Pythonic måde
TaskFlow API'en lader dig skrive tasks som Python-funktioner med @task, returnere værdier, der sendes via XCom, og sammensætte dem rent. Det reducerer boilerplate og forbedrer læsbarheden – stærkt anbefalet.
Din første Airflow DAG (TaskFlow Edition)
Nedenfor er et minimalt ETL-stil eksempel for at illustrere nøgleideer: planlægning, TaskFlow, afhængigheder og XCom-datagennemgang.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Planlægning, Catchup og Backfills
schedule: Cron eller forudindstillinger (@daily, @hourly).
start_date + catchup: Hvis catchup=True, vil Airflow backfille kørsler fra startdatoen. For streaming-stil pipelines skal du indstille catchup=False.
- Manuelle backfills: Brug UI'en eller CLI'en til at genkøre historiske intervaller.
Praktisk tommelfingerregel: aktiver catchup for deterministiske batchjobs; deaktiver for real-time eller API rate-begrænsede pipelines.
Sikker datagennemgang mellem Tasks (XCom)
- Små objekter: returnerede værdier med TaskFlow er fine.
- Store payloads: gem i objektlager (S3/GCS) med en nøgle i XCom.
- Undgå følsomme data i XCom; brug secrets backends (f.eks. Vault) og miljøvariabler.
Dynamisk Task Mapping og Fan-out Workloads
Airflow kan generere tasks dynamisk ved runtime baseret på input – ideelt til partitionerede datasæt eller multi-tenant jobs.
- Hold DAG'er deterministiske og idempotente.
- Adskil orkestrering (Airflow) fra beregning (Spark, dbt, datalagre).
- Brug TaskFlow API'en for klarhed og XCom-hygiejne.
- Parametriser DAG'er; brug variabler med omtanke.
- Overvåg, alarmer og dokumenter dine pipelines.
Sådan arbejder du med datalagre og ML
- Datalagre: Brug udbyder-operators (f.eks. SnowflakeOperator, BigQueryInsertJobOperator) til SQL-jobs. Gem SQL i filer eller versionsstyrede moduler.
- dbt: Trigger dbt via Bash/KubernetesPodOperator eller dedikerede dbt-operators i udbydere.
- ML: Orkestrer featuregenerering, træning og batch-inferens som separate tasks; cache artefakter i lager og log metrics.
Avanceret planlægning: Datasæt og Cross-DAG-afhængigheder
- Datasæt lader en DAG producere et logisk datasæt, der udløser en anden DAG, når det opdateres – renere end ad-hoc triggere.
- For legacy-mønstre fungerer ExternalTaskSensor, men datasæt er mere deklarative.
Sikkerhed og Compliance
- Brug rollebaseret adgangskontrol (RBAC) i UI'en.
- Isoler miljøer pr. team eller trust boundary.
- Hold audit trails via logs og connection change history.
Opgraderinger og Versionsstyring
- Test opgraderinger i staging med produktionslignende workloads.
- Pin og opgrader udbydere bevidst.
- Læs release notes for executorspecifikke ændringer og deprecations.
En hurtig tjekliste til din første produktions-DAG
- Klar ejerskab (
owner tag) og alarmer konfigureret.
retries sat med rimelige backoffs.
- Idempotente tasks og eksplicitte afhængigheder.
- Små XCom payloads; store data i lager.
- Logs sendt til holdbart lager; metrics eksporteret.
- Rollout-plan (canary eller blue/green) og rollback-trin.
Eksempel: En realistisk datalager Load DAG
Dette mønster udtrækker daglige filer, validerer dem og indlæser dem i en datalagertabel med dynamisk mapping pr. partition og deferrable sensors.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Gennemgå Best Practices, før du promoverer til produktion.
- Udforsk udbyderdokumenter til dine systemer (datalagre, clouds, ML-værktøjer).
Forresten: Fremskynd forfatterskabet med en AI sidekick
Værd at bemærke: Hvis du udarbejder mange DAG'er, kan en AI-assistent, der forstår kode, fremskynde boilerplate, generere TaskFlow-stubs og endda foreslå afhængighedsrettelser. Hvis du vil have en letvægts hjælper ved siden af din editor og browser, kan Sider.AI være praktisk til hurtige kodeomskrivninger og forklaringer under udviklingen. Vigtigste pointer
- Brug Airflow til at orkestrere, ikke beregne.
- Foretræk TaskFlow API'en for rene, testbare DAG'er.
- Hold data ude af XCom; send referencer i stedet.
- Brug deferrable sensors/operators til at spare slots.
- Containeriser, test og promover gennem miljøer.
- Stol på officielle tutorials og best practices som din nordstjerne.
FAQ
Q1: Hvad er den nemmeste måde at lære at bruge Airflow?
Start med den officielle Tutorial for at forstå DAG'er, tasks, planlægning og UI'en. Byg derefter en lille TaskFlow-baseret pipeline og iterer med best practices-guiden for produktionsparathed.
Q2: Skal jeg bruge TaskFlow API'en eller klassiske operators i Airflow?
Brug TaskFlow API'en til de fleste Pythonic pipelines, fordi den er renere og håndterer XCom-returneringer naturligt. Klassiske operators er stadig gode til ikke-Python tasks som Bash, SQL eller container jobs.
Q3: Hvordan sender jeg store data mellem Airflow tasks?
Undgå at lægge store payloads i XCom. Gem data i S3/GCS eller en database, og send kun referencer eller URI'er gennem XCom for at holde tasks hurtige og pålidelige.
Q4: Hvilken executor skal jeg vælge til Airflow i produktion?
For elasticitet og isolation er Kubernetes Executor en stærk standard. For enklere opsætninger fungerer Celery Executor godt – sørg bare for autoskalering, robust logging og eksternaliserede secrets.
Q5: Hvordan håndterer jeg afhængigheder på tværs af flere Airflow DAG'er?
Brug Datasæt til deklarative cross-DAG triggere, når en pipeline producerer data til en anden. Alternativt kan ExternalTaskSensor koordinere kørsler, men Datasæt er renere til datadrevet orkestrering.