Kā lietot Airflow: praktisks, pilnīgs ceļvedis uzticamu datu apstrādes līniju izveidei
Ja jūs pārvietojat datus vai orķestrējat ML darbus, jūs, iespējams, esat dzirdējuši vienu un to pašu atbildi: “Vienkārši ielieciet to Airflow.” Patiesība ir tāda, ka Apache Airflow izceļas, kad jums ir nepieciešama redzamība, uzticamība un kontrole pār sarežģītām darbplūsmām. Šajā praktiskajā ceļvedī mēs soli pa solim iziesim cauri tam, kā lietot Airflow — no pamatjēdzieniem līdz ražošanai gataviem modeļiem —, lai jūs varētu piegādāt datu apstrādes līnijas, kurām uzticaties.
Mēs to padarīsim praktiski pielietojamu: jūs iegūsit mentālu modeli par DAG un uzdevumiem, praktiskus piemērus ar TaskFlow API, izvietošanas iespējas, testēšanas stratēģijas un labāko praksi. Līdz beigām jūs pāriesit no “Es varu palaist apmācību” uz “Es varu palaist šo ražošanā”.
Piezīme: padziļinātai izpētei un atsaucei oficiālie dokumenti ir lieliski un regulāri tiek atjaunināti.
Kas īsti ir Apache Airflow?
Airflow ir orķestrators, nevis datu apstrādātājs. Tas plāno, sakārto un uzrauga darbu, ko veicat citur (datubāzēs, datu noliktavās, Spark darbos, API, konteineros). Jūs definējat darbplūsmas kā DAG (Directed Acyclic Graphs), kas ir tikai Python faili, kas kodē uzdevumus un to atkarības. Pēc tam Airflow izpilda šos uzdevumus atbilstoši jūsu grafikam, parametriem un videi.
- DAG: Darbplūsmas definīcija (uzdevumu grafiks ar atkarībām).
- Uzdevums: Darba vienība (Python funkcija, SQL izpilde, Bash komanda, ārēja darba aktivizēšana utt.).
- Operators: Šablons uzdevuma veidam (piemēram,
PythonOperator, BashOperator, KubernetesPodOperator).
- Plānotājs: Nosaka, ko palaist un kad.
- Izpildītājs: Palaiž uzdevumus (lokāli, ar Celery, Kubernetes utt.).
- UI: Jūsu vadības centrs palaišanai, žurnāliem, atkārtotiem mēģinājumiem un izcelsmei.
Sāciet ar oficiālajām apmācībām, tiklīdz esat instalējis Airflow; tās ātri sniedz jums kopējo priekšstatu.
Airflow pareiza instalēšana un palaišana
Airflow ir elastīgs. Izvēlieties ceļu, kas atbilst jūsu posmam:
- Vietējā izstrāde (ātrais starts):
- Izmantojiet projekta nodrošināto ātrās palaišanas Docker Compose. Tas ar saprātīgiem noklusējuma iestatījumiem palaiž tīmekļa serveri, plānotāju, datubāzi un daudz ko citu.
- Lieliski piemērots DAG apguvei un atkārtošanai.
- Maza komanda vai iestatīšana:
- Celery Executor vai Kubernetes Executor ar pārvaldītu Postgres.
- Glabājiet žurnālus S3/GCS un iepakojiet atkarības ar savu attēlu vai
requirements.txt.
- Kubernetes Executor elastībai vai Celery Executor ar automātisku darbinieku mērogošanu.
- Ārēji noslēpumi (Vault), stabila novērojamība (žurnāli + metrika) un zilas/zaļas izvietošanas jauninājumiem.
Padoms: glabājiet savu Airflow koda bāzi versiju kontrolē, konteinerizētu un testētu pirms reklamēšanas. Lapā “Labākā prakse” ir izklāstīti ražošanai gatavi modeļi.
Pamatjēdzieni, ko izmantosit katru dienu
DAG: Jūsu darbplūsma kā kods
DAG ir Python fails, kas definē:
- DAG metadati: id, grafiks, sākuma datums, tagi.
- Noklusējuma argumenti: atkārtoti mēģinājumi, īpašnieki, SLA.
- Uzdevumi un to atkarības.
Domājiet par DAG kā par “ko” un “kad”, bet par uzdevumiem kā par “kā”.
Uzdevumi un operatori
Operatori ir gatavi elementi bieži sastopamiem uzdevumiem. Piemēri:
- PythonOperator / TaskFlow
@task Python kodam
- BashOperator shell komandām
- KubernetesPodOperator konteinerizētiem darbiem
- SQL nodrošinātāji (piemēram, Snowflake, BigQuery, Postgres) noliktavas darbam
TaskFlow API: Mūsdienīgs, Pythonisks veids
TaskFlow API ļauj rakstīt uzdevumus kā Python funkcijas ar @task, atgriezt vērtības, kas tiek nodotas caur XCom, un tīri tās komponēt. Tas samazina standartizāciju un uzlabo lasāmību — ļoti ieteicams.
Jūsu pirmais Airflow DAG (TaskFlow izdevums)
Zemāk ir minimāls ETL stila piemērs, lai ilustrētu galvenās idejas: plānošanu, TaskFlow, atkarības un XCom datu pārsūtīšanu.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Plānošana, Catchup un Backfills
schedule: Cron vai sākotnējie iestatījumi (@daily, @hourly).
start_date + catchup: Ja catchup=True, Airflow aizpildīs palaišanas no sākuma datuma. Straumēšanas stila datu apstrādes līnijām iestatiet catchup=False.
- Manuāla aizpildīšana: izmantojiet UI vai CLI, lai atkārtoti palaistu vēsturiskos intervālus.
Praktisks īkšķa likums: iespējojiet catchup determinētiem pakešuzdevumiem; atspējojiet reāllaika vai API ātruma ierobežotām datu apstrādes līnijām.
Datu droša pārsūtīšana starp uzdevumiem (XCom)
- Mazi objekti: atgriezt vērtības ar TaskFlow ir labi.
- Lielas kravas: glabājiet objektu krātuvē (S3/GCS) ar atslēgu XCom.
- Izvairieties no sensitīviem datiem XCom; izmantojiet slepeno informāciju (piemēram, Vault) un vides mainīgos.
Dinamiska uzdevumu kartēšana un Fan-out darba slodzes
Airflow var dinamiski ģenerēt uzdevumus izpildlaikā, pamatojoties uz ievadēm — ideāli piemērots sadalītiem datu kopumiem vai vairāku nomnieku darbiem.
- Glabājiet DAG deterministiskus un idempotentus.
- Atvienojiet orķestrēšanu (Airflow) no aprēķiniem (Spark, dbt, noliktavas).
- Izmantojiet TaskFlow API skaidrībai un XCom higiēnai.
- Parametrizējiet DAG; izmantojiet mainīgos apdomīgi.
- Uzraugiet, brīdiniet un dokumentējiet savas datu apstrādes līnijas.
Kā strādāt ar datu noliktavām un ML
- Datu noliktavas: izmantojiet nodrošinātāju operatorus (piemēram, SnowflakeOperator, BigQueryInsertJobOperator) SQL darbiem. Glabājiet SQL failos vai versiju moduļos.
- dbt: aktivizējiet dbt, izmantojot Bash/KubernetesPodOperator vai īpašus dbt operatorus nodrošinātājos.
- ML: orķestrējiet iezīmju ģenerēšanu, apmācību un pakešuzziņu kā atsevišķus uzdevumus; kešatmiņas artefaktus krātuvē un reģistrējiet metriku.
Papildu plānošana: datu kopumi un atkarības starp DAG
- Datu kopumi ļauj vienam DAG ģenerēt loģisku datu kopumu, kas aktivizē citu DAG, kad tas tiek atjaunināts — tīrāks par ad-hoc aktivizētājiem.
- Mantotajiem modeļiem darbojas ExternalTaskSensor, bet datu kopumi ir deklaratīvāki.
Drošība un atbilstība
- Izmantojiet uz lomām balstītu piekļuves kontroli (RBAC) UI.
- Izolējiet vides katrai komandai vai uzticības robežai.
- Glabājiet audita izsekojamību, izmantojot žurnālus un savienojuma izmaiņu vēsturi.
Jauninājumi un versiju kontrole
- Pārbaudiet jauninājumus iestatīšanas vidē ar ražošanai līdzīgām darba slodzēm.
- Fiksējiet un jauniniet nodrošinātājus apzināti.
- Izlasiet laidiena piezīmes par izpildītājam specifiskām izmaiņām un novecošanām.
Ātrs kontrolsaraksts jūsu pirmajam ražošanas DAG
- Skaidrs īpašumtiesības (
owner tags) un konfigurēti brīdinājumi.
retries iestatīts ar saprātīgiem atkāpumiem.
- Idempotenti uzdevumi un skaidras atkarības.
- Mazas XCom kravas; lieli dati krātuvē.
- Žurnāli tiek nosūtīti uz izturīgu krātuvi; metrika tiek eksportēta.
- Ieviešanas plāns (kanārijputniņš vai zils/zaļš) un atcelšanas darbības.
Piemērs: reālistisks noliktavas ielādes DAG
Šis modelis iegūst ikdienas failus, validē tos un ielādē tos noliktavas tabulā, ar dinamisku kartēšanu katram nodalījumam un atliekamiem sensoriem.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Pirms pārejas uz ražošanu pārskatiet labāko praksi.
- Izpētiet nodrošinātāju dokumentus savām sistēmām (noliktavām, mākoņiem, ML rīkiem).
Starp citu: paātriniet autorēšanu ar AI palīgu
Vērts atzīmēt: ja jūs izstrādājat daudz DAG, AI palīgs, kas saprot kodu, var paātrināt standartizāciju, ģenerēt TaskFlow sagataves un pat ieteikt atkarību labojumus. Ja vēlaties vieglu palīgu līdzās savam redaktoram un pārlūkprogrammai, Sider.AI var būt noderīgs ātrai koda pārrakstīšanai un paskaidrojumiem izstrādes laikā. Galvenie secinājumi
- Izmantojiet Airflow, lai orķestrētu, nevis aprēķinātu.
- Dodiet priekšroku TaskFlow API tīriem, testējamiem DAG.
- Glabājiet datus ārpus XCom; tā vietā nododiet atsauces.
- Izmantojiet atliekamos sensorus/operatorus, lai ietaupītu slotus.
- Konteinerizējiet, testējiet un reklamējiet, izmantojot vides.
- Paļaujieties uz oficiālajām apmācībām un labāko praksi kā savu ziemeļu zvaigzni.
BUJ
Q1: Kāds ir vienkāršākais veids, kā iemācīties lietot Airflow?
Sāciet ar oficiālo apmācību, lai saprastu DAG, uzdevumus, plānošanu un UI. Pēc tam izveidojiet nelielu TaskFlow balstītu datu apstrādes līniju un atkārtojiet to ar labākās prakses rokasgrāmatu, lai sagatavotos ražošanai.
Q2: Vai man Airflow jāizmanto TaskFlow API vai klasiskie operatori?
Izmantojiet TaskFlow API lielākajai daļai Python datu apstrādes līniju, jo tas ir tīrāks un dabiski apstrādā XCom atgriešanas vērtības. Klasiskie operatori joprojām ir lieliski piemēroti uzdevumiem, kas nav Python, piemēram, Bash, SQL vai konteineru darbiem.
Q3: Kā es varu pārsūtīt lielus datus starp Airflow uzdevumiem?
Izvairieties ievietot lielas kravas XCom. Glabājiet datus S3/GCS vai datubāzē un nododiet tikai atsauces vai URI, izmantojot XCom, lai uzdevumi būtu ātri un uzticami.
Q4: Kādu izpildītāju man vajadzētu izvēlēties Airflow ražošanā?
Elastībai un izolācijai Kubernetes Executor ir spēcīgs noklusējums. Vienkāršākai iestatīšanai Celery Executor darbojas labi — vienkārši nodrošiniet automātisku mērogošanu, stabilu žurnālēšanu un eksternalizētu slepeno informāciju.
Q5: Kā es varu apstrādāt atkarības starp vairākiem Airflow DAG?
Izmantojiet datu kopumus deklaratīviem starp-DAG aktivizētājiem, kad viena datu apstrādes līnija ģenerē datus citai. Alternatīvi, ExternalTaskSensor var koordinēt palaišanu, bet datu kopumi ir tīrāki datu vadītai orķestrēšanai.