Kuidas kasutada Airflow'd: praktiline ja terviklik juhend usaldusväärsete andmevoogude loomiseks
Kui te liigutate andmeid või korraldate ML-töid, olete tõenäoliselt kuulnud ühte ja sama refrääni: "Lihtsalt pane see Airflow'sse." Tõsi on see, et Apache Airflow paistab silma, kui vajate nähtavust, usaldusväärsust ja kontrolli keerukate töövoogude üle. Selles praktilises juhendis käsitleme samm-sammult, kuidas Airflow'd kasutada – alates põhimõistetest kuni tootmiseks valmis mustriteni –, et saaksite tarnida voogusid, mida usaldate.
Hoiame selle praktilisena: saate DAG-ide ja ülesannete vaimse mudeli, praktilisi näiteid TaskFlow API-ga, juurutusvalikuid, testimisstrateegiaid ja parimaid praktikaid. Lõpuks jõuate punktist "Ma saan õpetuse käivitada" punkti "Ma saan seda tootmises käivitada".
Märkus: põhjalikumate süvenemiste ja viidete jaoks on ametlik dokumentatsioon suurepärane ja seda uuendatakse regulaarselt.
Mis on Apache Airflow tegelikult?
Airflow on orkestraator – mitte andmetöötlusprotsessor. See ajastab, järjestab ja jälgib tööd, mida te mujal käitate (andmebaasid, andmelaod, Sparki tööd, API-d, konteinerid). Te määratlete töövoogusid DAG-idena (Directed Acyclic Graphs ehk suunatud atsüklilised graafid), mis on lihtsalt Pythoni failid, mis kodeerivad ülesandeid ja nende sõltuvusi. Seejärel täidab Airflow need ülesanded vastavalt teie ajakavale, parameetritele ja keskkonnale.
- DAG: Töövoo määratlus (ülesannete graafik sõltuvustega).
- Ülesanne: Tööühik (Pythoni funktsioon, SQL-i käivitamine, Bashi käsk, välise töö käivitamine jne).
- Operaator: Ülesande liigi mall (nt
PythonOperator, BashOperator, KubernetesPodOperator).
- Ajakava koostaja (Scheduler): Otsustab, mida ja millal käivitada.
- Täideviija (Executor): Käivitab ülesandeid (lokaalselt, Celeryga, Kubernetesega jne).
- UI: Teie juhtimiskeskus käivituste, logide, uuesti proovimiste ja päritolu jaoks.
Alustage ametlike õpetustega kohe, kui olete Airflow installinud; need annavad teile kiiresti suure pildi.
Airflow installimine ja käivitamine õigel viisil
Airflow on paindlik. Valige tee, mis sobib teie etapiga:
- Kohalik arendus (kiirkäivitus):
- Kasutage projekti pakutavat kiirkäivitus-Docker Compose'i. See käivitab veebiserveri, ajakava koostaja, andmebaasi ja palju muud mõistlike vaikesätetega.
- Suurepärane DAG-ide õppimiseks ja itereerimiseks.
- Väike meeskond või lavastus:
- Celery Executor või Kubernetes Executor hallatava Postgresiga.
- Salvestage logid S3/GCS-i ja paketeerige sõltuvused oma pildi või
requirements.txt-ga.
- Kubernetes Executor elastsuse tagamiseks või Celery Executor automaatse skaleerimisega töötajatega.
- Välised saladused (Vault), tugev jälgitavus (logid + mõõdikud) ja sinine/roheline juurutamine uuenduste jaoks.
Nipp: hoidke oma Airflow koodibaasi versioonikontrolli all, konteineriseeritud ja testitud enne reklaamimist. Lehel "Parimad praktikad" kirjeldatakse tootmiseks valmis mustreid.
Põhimõisted, mida igapäevaselt kasutate
DAG-id: teie töövoog koodina
DAG on Pythoni fail, mis määratleb:
- DAG-i metaandmed: id, ajakava, alguskuupäev, sildid.
- Vaikeargumendid: uuesti proovimised, omanikud, SLA-d.
- Ülesanded ja nende sõltuvused.
Mõelge DAG-ile kui "millele" ja "millal", ning ülesannetele kui "kuidas".
Ülesanded ja operaatorid
Operaatorid on monteeritavad elemendid tavaliste ülesannete jaoks. Näited:
- PythonOperator / TaskFlow
@task Pythoni koodi jaoks
- BashOperator shelli käskude jaoks
- SimpleHttpOperator API-de jaoks
- KubernetesPodOperator konteineriseeritud tööde jaoks
- SQL-i pakkujad (nt Snowflake, BigQuery, Postgres) andmelaotööde jaoks
TaskFlow API: kaasaegne, Pythonic viis
TaskFlow API võimaldab teil kirjutada ülesandeid Pythoni funktsioonidena @task-ga, tagastada väärtusi, mis edastatakse XComi kaudu, ja neid puhtalt komponeerida. See vähendab boilerplati ja parandab loetavust – väga soovitatav.
Teie esimene Airflow DAG (TaskFlow Edition)
Allpool on minimaalne ETL-stiilis näide, mis illustreerib peamisi ideid: ajakava koostamine, TaskFlow, sõltuvused ja XComi andmete edastamine.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Ajakava koostamine, Catchup ja Backfillid
schedule: Cron või eelseaded (@daily, @hourly).
start_date + catchup: Kui catchup=True, täidab Airflow käivitused alguskuupäevast alates. Voogedastusstiilis voogude jaoks määrake catchup=False.
- Käsitsi tagasitäitmised: kasutage UI-d või CLI-d ajalooliste intervallide uuesti käivitamiseks.
Praktiline rusikareegel: lubage catchup deterministlike pakettööde jaoks; keelake reaalajas või API kiirusepiiranguga voogude jaoks.
Andmete turvaline edastamine ülesannete vahel (XCom)
- Väikesed objektid: TaskFlow'ga tagastusväärtused on head.
- Suured koormused: salvestage objektide salvestusruumis (S3/GCS) XComi võtmega.
- Vältige tundlikke andmeid XComis; kasutage saladuste taustaprogramme (nt Vault) ja keskkonnamuutujaid.
Dünaamiline ülesannete kaardistamine ja väljundtöökoormused
Airflow saab ülesandeid dünaamiliselt genereerida käitusajal sisendite põhjal – ideaalne partitsioneeritud andmekogumite või mitme rentniku tööde jaoks.
- Hoidke DAG-id deterministlikud ja idempotentsed.
- Eraldage orkestreerimine (Airflow) arvutusest (Spark, dbt, andmelaod).
- Kasutage TaskFlow API-t selguse ja XComi hügieeni tagamiseks.
- Parameetritega DAG-id; kasutage muutujaid mõistlikult.
- Jälgige, hoiatage ja dokumenteerige oma voogusid.
Kuidas töötada andmeladude ja ML-iga
- Andmelaod: kasutage pakkuja operaatoreid (nt SnowflakeOperator, BigQueryInsertJobOperator) SQL-i tööde jaoks. Salvestage SQL-i failidesse või versiooniga moodulitesse.
- dbt: käivitage dbt Bashi/KubernetesPodOperatori või spetsiaalsete dbt operaatorite kaudu pakkujates.
- ML: korraldage funktsioonide genereerimine, koolitus ja pakettide järeldamine eraldi ülesannetena; vahemällu artefaktid salvestusruumis ja logige mõõdikud.
Täpsem ajakava koostamine: andmekogumid ja DAG-idevahelised sõltuvused
- Andmekogumid võimaldavad ühel DAG-il genereerida loogilise andmekogumi, mis käivitab teise DAG-i värskendamisel – puhtam kui ad-hoc käivitajad.
- Pärandmustrite jaoks töötab ExternalTaskSensor, kuid andmekogumid on deklaratiivsemad.
Turvalisus ja vastavus
- Kasutage kasutajaliideses rollipõhist juurdepääsukontrolli (RBAC).
- Eraldage keskkonnad meeskonna või usalduspiiri järgi.
- Hoidke auditeerimisjälgi logide ja ühenduse muutmise ajaloo kaudu.
Uuendused ja versioonimine
- Testige uuendusi lavastuses tootmiskeskkonnaga sarnaste töökoormustega.
- Kinnitage ja uuendage pakkujaid tahtlikult.
- Lugege väljalaskemärkmeid täideviijapõhiste muudatuste ja aegumiste kohta.
Kiire kontrollnimekiri teie esimese tootmis-DAG-i jaoks
- Selge omandiõigus (
owner silt) ja konfigureeritud hoiatused.
retries on seatud mõistlike tagasilöökidega.
- Idempotentsed ülesanded ja selgesõnalised sõltuvused.
- Väikesed XComi koormused; suured andmed salvestusruumis.
- Logid on saadetud vastupidavasse salvestusruumi; mõõdikud on eksporditud.
- Väljalaskekava (kanaari või sinine/roheline) ja tagasipööramise sammud.
Näide: realistlik andmelaadimise DAG
See muster eraldab igapäevaseid faile, valideerib neid ja laadib need andmelaotabelisse, dünaamilise kaardistamisega partitsiooni kohta ja edasilükatavate anduritega.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Enne tootmisse viimist vaadake üle parimad praktikad.
- Uurige oma süsteemide (andmelaod, pilved, ML-i tööriistad) pakkuja dokumente.
Muide: kiirendage autorlust AI abil
Väärib märkimist: kui koostate palju DAG-e, võib koodi mõistev AI-assistent kiirendada boilerplati, genereerida TaskFlow'i stub-e ja isegi soovitada sõltuvusparandusi. Kui soovite kerget abimeest oma redaktori ja brauseri kõrvale, võib Sider.AI olla käepärane kiirete koodi ümberkirjutamiste ja selgituste jaoks arenduse ajal. Peamised järeldused
- Kasutage Airflow'd orkestreerimiseks, mitte arvutamiseks.
- Eelistage TaskFlow API-t puhaste, testitavate DAG-ide jaoks.
- Hoidke andmed XComist väljas; edastage selle asemel viiteid.
- Kasutage edasilükatavaid andureid/operaatoreid, et säästa pesasid.
- Konteineriseerige, testige ja reklaamige keskkondade kaudu.
- Toetuge ametlikele õpetustele ja parimatele tavadele kui oma põhjatähele.
KKK
Q1:Mis on lihtsaim viis Airflow kasutamise õppimiseks?
Alustage ametliku õpetusega, et mõista DAG-e, ülesandeid, ajakava koostamist ja kasutajaliidest. Seejärel looge väike TaskFlow-põhine voog ja itereerige tootmiseks valmisoleku parimate tavade juhendiga.
Q2:Kas ma peaksin Airflow's kasutama TaskFlow API-t või klassikalisi operaatoreid?
Kasutage TaskFlow API-t enamiku Pythonic voogude jaoks, kuna see on puhtam ja käsitleb XComi tagastusi loomulikult. Klassikalised operaatorid on endiselt suurepärased mitte-Pythoni ülesannete jaoks, nagu Bash, SQL või konteineritööd.
Q3:Kuidas ma saan Airflow ülesannete vahel suuri andmeid edastada?
Vältige suurte koormuste paigutamist XComi. Salvestage andmed S3/GCS-i või andmebaasi ja edastage XComi kaudu ainult viiteid või URI-sid, et hoida ülesanded kiired ja usaldusväärsed.
Q4:Millise täideviija ma peaksin Airflow jaoks tootmises valima?
Elastsuse ja isolatsiooni jaoks on Kubernetes Executor tugev vaikeväärtus. Lihtsamate seadistuste jaoks töötab Celery Executor hästi – lihtsalt veenduge, et on olemas automaatne skaleerimine, tugev logimine ja välised saladused.
Q5:Kuidas ma saan hallata sõltuvusi mitme Airflow DAG-i vahel?
Kasutage andmekogumeid deklaratiivsete DAG-idevaheliste käivitajate jaoks, kui üks voog genereerib andmeid teise jaoks. Teise võimalusena võib ExternalTaskSensor käivitusi koordineerida, kuid andmekogumid on andmepõhise orkestreerimise jaoks puhtamad.