Kaip naudoti Airflow: praktiškas, kompleksinis vadovas patikimų duomenų apdorojimo srautų kūrimui
Jei perkeliate duomenis arba orkestruojate ML užduotis, tikriausiai girdėjote tą patį: „Tiesiog įdėkite į Airflow.“ Tiesa ta, kad Apache Airflow puikiai tinka, kai jums reikia sudėtingų darbo srautų matomumo, patikimumo ir kontrolės. Šiame praktiniame vadove žingsnis po žingsnio apžvelgsime, kaip naudoti Airflow – nuo pagrindinių sąvokų iki gamybai paruoštų šablonų – kad galėtumėte pateikti patikimus srautus.
Laikysimės praktiškumo: gausite DAG ir užduočių mentalinį modelį, praktinių pavyzdžių su TaskFlow API, diegimo parinkčių, testavimo strategijų ir geriausios praktikos. Iki galo pereisite nuo „Galiu paleisti mokomąją medžiagą“ iki „Galiu paleisti tai gamyboje“.
Pastaba: išsamesnės informacijos ir nuorodų ieškokite oficialioje dokumentacijoje, kuri yra puiki ir reguliariai atnaujinama.
Kas iš tikrųjų yra Apache Airflow?
Airflow yra orkestratorius, o ne duomenų apdorojimo įrankis. Jis planuoja, tvarko ir stebi darbus, kuriuos vykdote kitur (duomenų bazėse, duomenų saugyklose, Spark užduotyse, API, konteineriuose). Darbo srautus apibrėžiate kaip DAG (Directed Acyclic Graphs – orientuoti acikliniai grafai), kurie yra tiesiog Python failai, koduojantys užduotis ir jų priklausomybes. Tada Airflow vykdo tas užduotis pagal jūsų tvarkaraštį, parametrus ir aplinką.
- DAG: darbo srauto apibrėžimas (užduočių grafas su priklausomybėmis).
- Užduotis: darbo vienetas (Python funkcija, SQL vykdymas, Bash komanda, išorinis užduoties paleidiklis ir t. t.).
- Operatorius: užduoties tipo šablonas (pvz.,
PythonOperator, BashOperator, KubernetesPodOperator).
- Planuoklis: nusprendžia, ką ir kada vykdyti.
- Vykdytojas: vykdo užduotis (vietiškai, su Celery, Kubernetes ir t. t.).
- UI: jūsų valdymo centras vykdymams, žurnalams, pakartotiniams bandymams ir kilmės linijai.
Pradėkite nuo oficialių mokomųjų programų, kai įdiegsite Airflow; jos greitai suteiks jums bendrą vaizdą.
Teisingas Airflow įdiegimas ir paleidimas
Airflow yra lankstus. Pasirinkite kelią, kuris atitinka jūsų etapą:
- Vietinis kūrimas (greita pradžia):
- Naudokite projekto pateiktą greitosios pradžios Docker Compose. Jis paleidžia žiniatinklio serverį, planuoklį, duomenų bazę ir kt. su patikimomis numatytomis reikšmėmis.
- Puikiai tinka mokytis ir kartoti DAG.
- Maža komanda arba parengiamoji aplinka:
- Celery Executor arba Kubernetes Executor su valdomu Postgres.
- Saugokite žurnalus S3/GCS ir supakuokite priklausomybes su savo atvaizdu arba
requirements.txt.
- Kubernetes Executor elastingumui arba Celery Executor su automatiniu darbuotojų mastelio keitimu.
- Išoriniai slaptai duomenys (Vault), patikimas stebėjimas (žurnalai + metrika) ir mėlynos/žalios spalvos diegimai atnaujinimams.
Patarimas: laikykite savo Airflow kodo bazę versijų valdymo sistemoje, konteinerizuotą ir išbandytą prieš perkeliant. „Geriausios praktikos“ puslapyje aprašomi gamybai paruošti šablonai.
Pagrindinės sąvokos, kurias naudosite kasdien
DAG: jūsų darbo srautas kaip kodas
DAG yra Python failas, kuris apibrėžia:
- DAG metaduomenys: ID, tvarkaraštis, pradžios data, žymos.
- Numatytieji argumentai: pakartotiniai bandymai, savininkai, SLA.
- Užduotys ir jų priklausomybės.
Pagalvokite apie DAG kaip apie „ką“ ir „kada“, o apie užduotis – kaip apie „kaip“.
Užduotys ir operatoriai
Operatoriai yra paruoštukai dažnoms užduotims. Pavyzdžiai:
- PythonOperator / TaskFlow
@task Python kodui
- BashOperator shell komandoms
- KubernetesPodOperator konteinerizuotoms užduotims
- SQL teikėjai (pvz., Snowflake, BigQuery, Postgres) duomenų saugyklos darbui
TaskFlow API: modernus, Python stiliaus būdas
TaskFlow API leidžia jums rašyti užduotis kaip Python funkcijas su @task, grąžinti reikšmes, kurios perduodamos per XCom, ir jas švariai komponuoti. Tai sumažina standartinį kodą ir pagerina skaitomumą – labai rekomenduojama.
Jūsų pirmasis Airflow DAG (TaskFlow leidimas)
Žemiau pateiktas minimalus ETL stiliaus pavyzdys, iliustruojantis pagrindines idėjas: planavimą, TaskFlow, priklausomybes ir XCom duomenų perdavimą.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Planavimas, Catchup ir Backfills
schedule: Cron arba iš anksto nustatyti (@daily, @hourly).
start_date + catchup: jei catchup=True, Airflow užpildys vykdymus nuo pradžios datos. Srautinio perdavimo stiliaus srautams nustatykite catchup=False.
- Rankinis užpildymas: naudokite UI arba CLI, kad iš naujo paleistumėte istorinius intervalus.
Praktinė taisyklė: įgalinkite catchup deterministinėms paketinių užduotims; išjunkite realaus laiko arba API greičio apribojimų srautams.
Duomenų perdavimas tarp užduočių (XCom) saugiai
- Maži objektai: grąžinamos reikšmės su TaskFlow yra gerai.
- Dideli duomenų paketai: saugokite objektų saugykloje (S3/GCS) su raktu XCom.
- Venkite slaptų duomenų XCom; naudokite slaptų duomenų galinius serverius (pvz., Vault) ir aplinkos kintamuosius.
Dinaminis užduočių susiejimas ir Fan-out darbo krūviai
Airflow gali generuoti užduotis dinamiškai vykdymo metu, atsižvelgiant į įvestis – idealiai tinka suskaidytiems duomenų rinkiniams arba kelių nuomininkų užduotims.
- Laikykite DAG deterministinius ir idempotentinius.
- Atskirkite orkestravimą (Airflow) nuo skaičiavimo (Spark, dbt, saugyklos).
- Naudokite TaskFlow API aiškumui ir XCom higienai.
- Parametrizuokite DAG; naudokite kintamuosius apdairiai.
- Stebėkite, įspėkite ir dokumentuokite savo srautus.
Kaip dirbti su duomenų saugyklomis ir ML
- Duomenų saugyklos: naudokite teikėjo operatorius (pvz., SnowflakeOperator, BigQueryInsertJobOperator) SQL užduotims. Saugokite SQL failuose arba versijuotuose moduliuose.
- dbt: paleiskite dbt per Bash/KubernetesPodOperator arba specialius dbt operatorius teikėjų programose.
- ML: orkestruokite funkcijų generavimą, mokymą ir paketų išvadą kaip atskiras užduotis; talpinkite artefaktus saugykloje ir registruokite metriką.
Išplėstinis planavimas: duomenų rinkiniai ir priklausomybės tarp DAG
- Duomenų rinkiniai leidžia vienam DAG sukurti loginį duomenų rinkinį, kuris paleidžia kitą DAG, kai jis atnaujinamas – švariau nei ad-hoc paleidikliai.
- Senesniems šablonams ExternalTaskSensor veikia, bet duomenų rinkiniai yra deklaratyvesni.
Saugumas ir atitiktis
- Naudokite vaidmenimis pagrįstą prieigos kontrolę (RBAC) UI.
- Izoliuokite aplinkas pagal komandą arba pasitikėjimo ribą.
- Laikykite audito sekas per žurnalus ir ryšio keitimo istoriją.
Atnaujinimai ir versijų valdymas
- Išbandykite atnaujinimus parengiamojoje aplinkoje su gamybai panašiais darbo krūviais.
- Prisegkite ir atnaujinkite teikėjus apgalvotai.
- Perskaitykite išleidimo pastabas apie vykdytojui būdingus pakeitimus ir pasenimus.
Greitas kontrolinis sąrašas jūsų pirmajam gamybos DAG
- Aiški nuosavybė (
owner žyma) ir sukonfigūruoti įspėjimai.
retries nustatytas su pagrįstais atidėjimais.
- Idempotentinės užduotys ir aiškios priklausomybės.
- Maži XCom duomenų paketai; dideli duomenys saugykloje.
- Žurnalai išsiųsti į patvarią saugyklą; metrika eksportuota.
- Diegimo planas (kanarėlės arba mėlynos/žalios spalvos) ir atšaukimo veiksmai.
Pavyzdys: realistiškas saugyklos įkėlimo DAG
Šis šablonas ištraukia kasdienius failus, patvirtina juos ir įkelia į saugyklos lentelę, su dinaminiu susiejimu pagal skaidinį ir atidedamais jutikliais.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Prieš perkeliant į gamybą, peržiūrėkite geriausią praktiką.
- Išnagrinėkite teikėjo dokumentus savo sistemoms (saugykloms, debesims, ML įrankiams).
Beje: pagreitinkite kūrimą naudodami AI pagalbininką
Verta paminėti: jei kuriate daug DAG, AI asistentas, suprantantis kodą, gali pagreitinti standartinį kodą, generuoti TaskFlow stubus ir netgi pasiūlyti priklausomybės pataisymus. Jei norite lengvo pagalbininko šalia savo redaktoriaus ir naršyklės, Sider.AI gali būti naudinga greitiems kodo perrašymams ir paaiškinimams kūrimo metu. Pagrindiniai dalykai
- Naudokite Airflow orkestravimui, o ne skaičiavimui.
- Pirmenybę teikite TaskFlow API švariems, išbandomiems DAG.
- Laikykite duomenis atokiau nuo XCom; perduokite nuorodas vietoj to.
- Naudokite atidedamus jutiklius/operatorius, kad sutaupytumėte lizdus.
- Konteinerizuokite, išbandykite ir perkelkite per aplinkas.
- Pasikliaukite oficialiomis mokymo programomis ir geriausia praktika kaip savo šiaurine žvaigžde.
DUK
Q1: Koks yra paprasčiausias būdas išmokti naudotis Airflow?
Pradėkite nuo oficialios mokymo programos, kad suprastumėte DAG, užduotis, planavimą ir UI. Tada sukurkite nedidelį TaskFlow pagrįstą srautą ir kartokite su geriausios praktikos vadovu, kad būtumėte pasirengę gamybai.
Q2: Ar turėčiau naudoti TaskFlow API ar klasikinius operatorius Airflow?
Naudokite TaskFlow API daugumai Python stiliaus srautų, nes jis yra švaresnis ir natūraliai tvarko XCom grąžinimus. Klasikiniai operatoriai vis dar puikiai tinka ne Python užduotims, tokioms kaip Bash, SQL arba konteinerių užduotys.
Q3: Kaip perduoti didelius duomenis tarp Airflow užduočių?
Venkite dėti didelius duomenų paketus į XCom. Saugokite duomenis S3/GCS arba duomenų bazėje ir perduokite tik nuorodas arba URI per XCom, kad užduotys būtų greitos ir patikimos.
Q4: Kokį vykdytoją turėčiau pasirinkti Airflow gamyboje?
Dėl elastingumo ir izoliacijos Kubernetes Executor yra stiprus numatytasis. Paprastesnėms sąrankoms Celery Executor veikia gerai – tiesiog užtikrinkite automatinį mastelio keitimą, patikimą registravimą ir išorinius slaptus duomenis.
Q5: Kaip tvarkyti priklausomybes tarp kelių Airflow DAG?
Naudokite duomenų rinkinius deklaratyviems kryžminiams DAG paleidikliams, kai vienas srautas sukuria duomenis kitam. Arba ExternalTaskSensor gali koordinuoti vykdymus, bet duomenų rinkiniai yra švaresni duomenimis pagrįstam orkestravimui.