Jak používat Airflow: Praktický a komplexní průvodce pro vytváření spolehlivých datových pipeline
Pokud přesouváte data nebo orchestrujete ML úlohy, pravděpodobně jste už slyšeli opakující se: „Prostě to dejte do Airflow.“ Pravdou je, že Apache Airflow vyniká, když potřebujete viditelnost, spolehlivost a kontrolu nad složitými workflow. V tomto praktickém průvodci krok za krokem projdeme, jak používat Airflow – od základních konceptů po vzory připravené pro produkční prostředí – abyste mohli dodávat pipeline, kterým věříte.
Budeme se držet akčních kroků: získáte mentální model pro DAGy a tasky, praktické příklady s TaskFlow API, možnosti nasazení, testovací strategie a osvědčené postupy. Na konci se posunete od „Umím spustit tutoriál“ k „Umím to spustit v produkci.“
Poznámka: Pro hlubší ponory a reference je oficiální dokumentace vynikající a pravidelně aktualizovaná.
Co je vlastně Apache Airflow?
Airflow je orchestrátor – nikoli zpracovatel dat. Plánuje, řadí a monitoruje práci, kterou spouštíte jinde (databáze, datové sklady, Spark úlohy, API, kontejnery). Workflow definujete jako DAGy (Directed Acyclic Graphs), což jsou jen Python soubory, které kódují tasky a jejich závislosti. Airflow pak tyto tasky spouští podle vašeho plánu, parametrů a prostředí.
- DAG: Definice workflow (graf tasků se závislostmi).
- Task: Jednotka práce (Python funkce, SQL příkaz, Bash příkaz, spouštěč externí úlohy atd.).
- Operator: Šablona pro určitý typ tasku (např.
PythonOperator, BashOperator, KubernetesPodOperator).
- Scheduler: Rozhoduje, co se má spustit a kdy.
- Executor: Spouští tasky (lokálně, s Celery, Kubernetes atd.).
- UI: Vaše řídicí centrum pro spuštění, logy, opakování a lineage.
Začněte s oficiálními tutoriály, jakmile si nainstalujete Airflow; rychle vám poskytnou celkový obraz.
Instalace a spuštění Airflow správným způsobem
Airflow je flexibilní. Vyberte si cestu, která odpovídá vaší fázi:
- Lokální vývoj (rychlý start):
- Použijte quick-start Docker Compose, který poskytuje projekt. Spustí webový server, scheduler, databázi a další s rozumnými výchozími hodnotami.
- Skvělé pro učení a iterování na DAGech.
- Celery Executor nebo Kubernetes Executor se spravovaným Postgres.
- Ukládejte logy do S3/GCS a zabalte závislosti do svého image nebo
requirements.txt.
- Kubernetes Executor pro elasticitu nebo Celery Executor s automatickým škálováním workerů.
- Externí secrets (Vault), robustní pozorovatelnost (logy + metriky) a blue/green nasazení pro upgrady.
Tip: Udržujte svou Airflow codebase pod kontrolou verzí, kontejnerizovanou a otestovanou před povýšením. Stránka „Osvědčené postupy“ nastiňuje vzory připravené pro produkční prostředí.
Základní koncepty, které budete denně používat
DAGy: Váš workflow jako kód
DAG je Python soubor, který definuje:
- Metadata DAGu: id, plán, datum zahájení, tagy.
- Výchozí argumenty: retries, owners, SLA.
- Tasky a jejich závislosti.
Představte si DAG jako „co“ a „kdy“ a tasky jako „jak“.
Tasky a Operátoři
Operátoři jsou prefabrikáty pro běžné tasky. Příklady:
@task / TaskFlow @task pro Python kód
- BashOperator pro shell příkazy
- SimpleHttpOperator pro API
- KubernetesPodOperator pro kontejnerizované úlohy
- SQL provideři (např. Snowflake, BigQuery, Postgres) pro práci s datovým skladem
TaskFlow API: Moderní, Pythonic způsob
TaskFlow API vám umožňuje psát tasky jako Python funkce s @task, vracet hodnoty, které se předávají přes XCom, a čistě je skládat. Snižuje boilerplate a zlepšuje čitelnost – vřele doporučujeme.
Váš první Airflow DAG (TaskFlow edice)
Níže je uveden minimální ETL příklad pro ilustraci klíčových myšlenek: plánování, TaskFlow, závislosti a předávání dat XCom.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Plánování, Catchup a Backfills
schedule: Cron nebo předvolby (@daily, @hourly).
start_date + catchup: Pokud je catchup=True, Airflow provede backfill spuštění od data zahájení. Pro pipeline ve stylu streamování nastavte catchup=False.
- Manuální backfills: Použijte UI nebo CLI pro opětovné spuštění historických intervalů.
Praktické pravidlo: povolte catchup pro deterministické batch úlohy; zakažte pro real-time pipeline nebo pipeline s omezením rychlosti API.
Bezpečné předávání dat mezi tasky (XCom)
- Malé objekty: vrácené hodnoty s TaskFlow jsou v pořádku.
- Velké payloady: uložte do object storage (S3/GCS) s klíčem v XCom.
- Vyhněte se citlivým datům v XCom; používejte secrets backends (např. Vault) a proměnné prostředí.
Dynamické mapování tasků a fan-out workloady
Airflow může generovat tasky dynamicky za běhu na základě vstupů – ideální pro partitionované datasety nebo multi-tenant úlohy.
- Udržujte DAGy deterministické a idempotentní.
- Oddělte orchestraci (Airflow) od výpočtů (Spark, dbt, datové sklady).
- Používejte TaskFlow API pro přehlednost a XCom hygienu.
- Parametrizujte DAGy; používejte proměnné uvážlivě.
- Monitorujte, upozorňujte a dokumentujte své pipeline.
Jak pracovat s datovými sklady a ML
- Datové sklady: Používejte provider operátory (např. SnowflakeOperator, BigQueryInsertJobOperator) pro SQL úlohy. Ukládejte SQL do souborů nebo verzovaných modulů.
- dbt: Spouštějte dbt přes Bash/KubernetesPodOperator nebo dedikované dbt operátory v providerch.
- ML: Orchestrujte generování features, trénování a batch inference jako samostatné tasky; kešujte artefakty v úložišti a logujte metriky.
Pokročilé plánování: Datasets a závislosti mezi DAGy
- Datasets umožňují jednomu DAGu produkovat logický dataset, který spustí jiný DAG při aktualizaci – čistší než ad-hoc spouštěče.
- Pro starší vzory funguje ExternalTaskSensor, ale datasets jsou deklarativnější.
Bezpečnost a shoda s předpisy
- Používejte role-based access control (RBAC) v UI.
- Izolujte prostředí pro každý tým nebo hranici důvěry.
- Udržujte audit trails prostřednictvím logů a historie změn připojení.
Upgrady a verzování
- Testujte upgrady ve stagingu s workloady podobnými produkčním.
- Připněte a upgradujte providery záměrně.
- Přečtěte si poznámky k vydání pro změny a zastarání specifické pro executor.
Rychlý checklist pro váš první produkční DAG
- Jasné vlastnictví (tag
owner) a nakonfigurované alerty.
retries nastaveny s rozumnými backoffy.
- Idempotentní tasky a explicitní závislosti.
- Malé XCom payloady; velká data v úložišti.
- Logy odeslány do trvalého úložiště; metriky exportovány.
- Plán rolloutu (kanárkový nebo blue/green) a kroky rollbacku.
Příklad: Realistický DAG pro načítání datového skladu
Tento vzor extrahuje denní soubory, validuje je a načítá je do tabulky datového skladu, s dynamickým mapováním pro každou partition a deferrable senzory.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Před povýšením do produkce si projděte osvědčené postupy.
- Prozkoumejte dokumentaci providerů pro vaše systémy (datové sklady, cloudy, ML nástroje).
Mimochodem: Zrychlete tvorbu s AI sidekickem
Stojí za zmínku: pokud navrhujete hodně DAGů, AI asistent, který rozumí kódu, může urychlit boilerplate, generovat TaskFlow stubs a dokonce navrhnout opravy závislostí. Pokud chcete lehkého pomocníka vedle svého editoru a prohlížeče, Sider.AI se může hodit pro rychlé přepisování a vysvětlování kódu během vývoje. Klíčové poznatky
- Používejte Airflow k orchestraci, nikoli k výpočtům.
- Preferujte TaskFlow API pro čisté, testovatelné DAGy.
- Udržujte data mimo XCom; místo toho předávejte reference.
- Používejte deferrable senzory/operátory k úspoře slotů.
- Kontejnerizujte, testujte a propagujte prostřednictvím prostředí.
- Spoléhejte se na oficiální tutoriály a osvědčené postupy jako na svou severku.
FAQ
Q1: Jaký je nejjednodušší způsob, jak se naučit používat Airflow?
Začněte s oficiálním tutoriálem, abyste porozuměli DAGům, taskům, plánování a UI. Poté si vytvořte malou pipeline založenou na TaskFlow a iterujte s průvodcem osvědčenými postupy pro připravenost na produkční prostředí.
Q2: Mám používat TaskFlow API nebo klasické operátory v Airflow?
Používejte TaskFlow API pro většinu Pythonic pipeline, protože je čistší a přirozeně zpracovává XCom returns. Klasické operátory jsou stále skvělé pro non-Python tasky, jako jsou Bash, SQL nebo kontejnerové úlohy.
Q3: Jak předávat velká data mezi Airflow tasky?
Vyhněte se umisťování velkých payloadů do XCom. Ukládejte data do S3/GCS nebo databáze a předávejte pouze reference nebo URI prostřednictvím XCom, abyste udrželi tasky rychlé a spolehlivé.
Q4: Jaký executor bych si měl vybrat pro Airflow v produkci?
Pro elasticitu a izolaci je Kubernetes Executor silnou výchozí volbou. Pro jednodušší nastavení funguje dobře Celery Executor – jen zajistěte automatické škálování, robustní logování a externalizované secrets.
Q5: Jak řešit závislosti mezi více Airflow DAGy?
Používejte Datasets pro deklarativní spouštění mezi DAGy, když jedna pipeline produkuje data pro jinou. Alternativně může ExternalTaskSensor koordinovat spuštění, ale Datasets jsou čistší pro orchestraci řízenou daty.