Cara Menggunakan Airflow: Panduan Praktikal dan Lengkap untuk Membina Saluran Data yang Boleh Dipercayai
Jika anda memindahkan data atau mengatur tugas ML, anda mungkin sering mendengar: “Masukkan sahaja ke dalam Airflow.” Hakikatnya, Apache Airflow sangat berguna apabila anda memerlukan keterlihatan, kebolehpercayaan dan kawalan ke atas aliran kerja yang kompleks. Dalam panduan praktikal ini, kami akan membimbing anda langkah demi langkah tentang cara menggunakan Airflow—daripada konsep teras hingga corak sedia pengeluaran—supaya anda boleh menghantar saluran yang anda percayai.
Kami akan memastikan ia boleh diambil tindakan: anda akan mendapat model mental untuk DAG dan tugas, contoh praktikal dengan TaskFlow API, pilihan penggunaan, strategi pengujian dan amalan terbaik. Pada akhirnya, anda akan beralih daripada “Saya boleh menjalankan tutorial” kepada “Saya boleh menjalankan ini dalam prod.”
Nota: Untuk penerokaan dan rujukan yang lebih mendalam, dokumen rasmi adalah sangat baik dan dikemas kini secara berkala.
Apakah Sebenarnya Apache Airflow?
Airflow ialah orkestrator—bukan pemproses data. Ia menjadualkan, menyusun dan memantau kerja yang anda jalankan di tempat lain (pangkalan data, gudang data, tugas Spark, API, kontena). Anda mentakrifkan aliran kerja sebagai DAG (Graf Asiklik Terarah), yang hanyalah fail Python yang mengekodkan tugas dan kebergantungannya. Airflow kemudian melaksanakan tugas tersebut mengikut jadual, parameter dan persekitaran anda.
- DAG: Definisi aliran kerja (graf tugas dengan kebergantungan).
- Tugas: Unit kerja (fungsi Python, pelaksanaan SQL, arahan Bash, pencetus tugas luaran, dll.).
- Operator: Templat untuk jenis tugas (cth.,
PythonOperator, BashOperator, KubernetesPodOperator).
- Penjadual: Menentukan perkara yang hendak dijalankan dan bila.
- Pelaksana: Menjalankan tugas (secara tempatan, dengan Celery, Kubernetes, dll.).
- UI: Pusat kawalan anda untuk larian, log, percubaan semula dan salasilah.
Mulakan dengan tutorial rasmi sebaik sahaja anda memasang Airflow; ia memberi anda gambaran besar dengan cepat.
Memasang dan Menjalankan Airflow dengan Cara yang Betul
Airflow adalah fleksibel. Pilih laluan yang sesuai dengan peringkat anda:
- Pembangunan tempatan (permulaan pantas):
- Gunakan Docker Compose permulaan pantas yang disediakan oleh projek. Ia memulakan pelayan web, penjadual, pangkalan data dan banyak lagi dengan lalai yang waras.
- Sangat sesuai untuk mempelajari dan mengulangi DAG.
- Pasukan kecil atau pementasan:
- Pelaksana Celery atau Pelaksana Kubernetes dengan Postgres terurus.
- Simpan log dalam S3/GCS dan pakej kebergantungan dengan imej atau
requirements.txt anda.
- Pelaksana Kubernetes untuk keanjalan atau Pelaksana Celery dengan pekerja autoskala.
- Rahsia luaran (Vault), kebolehcerapan yang teguh (log + metrik) dan penggunaan biru/hijau untuk peningkatan.
Tip: Pastikan pangkalan kod Airflow anda dikawal versi, dikontainerkan dan diuji sebelum promosi. Halaman “Amalan Terbaik” menggariskan corak sedia pengeluaran.
Konsep Teras yang Akan Anda Gunakan Setiap Hari
DAG: Aliran Kerja Anda sebagai Kod
DAG ialah fail Python yang mentakrifkan:
- Metadata DAG: id, jadual, tarikh mula, tag.
- Argumen lalai: percubaan semula, pemilik, SLA.
- Tugas dan kebergantungannya.
Anggap DAG sebagai “apa” dan “bila,” dan tugas sebagai “bagaimana.”
Tugas dan Operator
Operator ialah prefab untuk tugas biasa. Contoh:
- PythonOperator / TaskFlow
@task untuk kod Python
- BashOperator untuk arahan shell
- SimpleHttpOperator untuk API
- KubernetesPodOperator untuk tugas kontainer
- Pembekal SQL (cth., Snowflake, BigQuery, Postgres) untuk kerja gudang
TaskFlow API: Cara Moden dan Pythonik
TaskFlow API membolehkan anda menulis tugas sebagai fungsi Python dengan @task, mengembalikan nilai yang dilalui melalui XCom, dan menyusunnya dengan bersih. Ia mengurangkan boilerplate dan meningkatkan kebolehbacaan—sangat disyorkan.
DAG Airflow Pertama Anda (Edisi TaskFlow)
Di bawah ialah contoh gaya ETL minimal untuk menggambarkan idea utama: penjadualan, TaskFlow, kebergantungan dan penghantaran data XCom.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Penjadualan, Catchup dan Backfill
schedule: Cron atau pratetap (@daily, @hourly).
start_date + catchup: Jika catchup=True, Airflow akan membuat backfill larian dari tarikh mula. Untuk saluran gaya penstriman, tetapkan catchup=False.
- Backfill manual: Gunakan UI atau CLI untuk menjalankan semula selang masa lalu.
Peraturan praktikal: dayakan catchup untuk tugas kelompok deterministik; lumpuhkan untuk saluran masa nyata atau API yang dihadkan kadar.
Menghantar Data Antara Tugas (XCom) dengan Selamat
- Objek kecil: nilai pulangan dengan TaskFlow adalah baik.
- Muatan besar: simpan dalam storan objek (S3/GCS) dengan kunci dalam XCom.
- Elakkan data sensitif dalam XCom; gunakan bahagian belakang rahsia (cth., Vault) dan pemboleh ubah persekitaran.
Pemetaan Tugas Dinamik dan Beban Kerja Fan-out
Airflow boleh menjana tugas secara dinamik pada masa jalanan berdasarkan input—sesuai untuk set data berpartisi atau tugas berbilang penyewa.
- Pastikan DAG deterministik dan idempotent.
- Asingkan orkestrasi (Airflow) daripada pengiraan (Spark, dbt, gudang).
- Gunakan TaskFlow API untuk kejelasan dan kebersihan XCom.
- Parameterkan DAG; gunakan pemboleh ubah dengan bijak.
- Pantau, berikan amaran dan dokumentasikan saluran anda.
Cara Bekerja dengan Gudang Data dan ML
- Gudang data: Gunakan operator pembekal (cth., SnowflakeOperator, BigQueryInsertJobOperator) untuk tugas SQL. Simpan SQL dalam fail atau modul versi.
- dbt: Cetuskan dbt melalui Bash/KubernetesPodOperator atau operator dbt khusus dalam pembekal.
- ML: Aturkan penjanaan ciri, latihan dan inferens kelompok sebagai tugas berasingan; cache artifak dalam storan dan log metrik.
Penjadualan Lanjutan: Set Data dan Kebergantungan Merentas DAG
- Set data membolehkan satu DAG menghasilkan set data logik yang mencetuskan DAG lain apabila dikemas kini—lebih bersih daripada pencetus ad-hoc.
- Untuk corak warisan, ExternalTaskSensor berfungsi, tetapi set data lebih deklaratif.
Keselamatan dan Pematuhan
- Gunakan kawalan akses berasaskan peranan (RBAC) dalam UI.
- Asingkan persekitaran setiap pasukan atau sempadan kepercayaan.
- Simpan jejak audit melalui log dan sejarah perubahan sambungan.
Peningkatan dan Pembahagian Versi
- Uji peningkatan dalam pementasan dengan beban kerja seperti pengeluaran.
- Pin dan tingkatkan pembekal dengan sengaja.
- Baca nota keluaran untuk perubahan dan pengehadan khusus pelaksana.
Senarai Semak Pantas untuk DAG Pengeluaran Pertama Anda
- Pemilikan yang jelas (tag
owner) dan amaran dikonfigurasikan.
retries ditetapkan dengan backoff yang munasabah.
- Tugas Idempotent dan kebergantungan eksplisit.
- Muatan XCom kecil; data besar dalam storan.
- Log dihantar ke storan tahan lama; metrik dieksport.
- Pelan pelancaran (kenari atau biru/hijau) dan langkah pengembalian.
Contoh: DAG Muatan Gudang yang Realistik
Corak ini mengekstrak fail harian, mengesahkannya dan memuatkannya ke dalam jadual gudang, dengan pemetaan dinamik setiap partisi dan penderia boleh tangguh.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Semak Amalan Terbaik sebelum mempromosikan ke pengeluaran.
- Terokai dokumen pembekal untuk sistem anda (gudang, awan, alat ML).
Dengan cara ini: Percepatkan penulisan dengan pembantu AI
Perlu diingatkan: jika anda merangka banyak DAG, pembantu AI yang memahami kod boleh mempercepatkan boilerplate, menjana stub TaskFlow dan juga mencadangkan pembetulan kebergantungan. Jika anda mahukan pembantu ringan di samping editor dan penyemak imbas anda, Sider.AI boleh berguna untuk penulisan semula dan penjelasan kod pantas semasa pembangunan. Perkara Utama
- Gunakan Airflow untuk mengatur, bukan mengira.
- Pilih TaskFlow API untuk DAG yang bersih dan boleh diuji.
- Keluarkan data daripada XCom; hantar rujukan sahaja.
- Gunakan penderia/operator boleh tangguh untuk menjimatkan slot.
- Kontainerkan, uji dan promosikan melalui persekitaran.
- Bergantung pada tutorial rasmi dan amalan terbaik sebagai panduan anda.
Soalan Lazim
S1:Apakah cara paling mudah untuk belajar cara menggunakan Airflow?
Mulakan dengan Tutorial rasmi untuk memahami DAG, tugas, penjadualan dan UI. Kemudian bina saluran kecil berasaskan TaskFlow dan ulangi dengan panduan amalan terbaik untuk kesediaan pengeluaran.
S2:Patutkah saya menggunakan TaskFlow API atau operator klasik dalam Airflow?
Gunakan TaskFlow API untuk kebanyakan saluran Pythonik kerana ia lebih bersih dan mengendalikan pulangan XCom secara semula jadi. Operator klasik masih bagus untuk tugas bukan Python seperti Bash, SQL atau tugas kontena.
S3:Bagaimanakah cara saya menghantar data besar antara tugas Airflow?
Elakkan meletakkan muatan besar dalam XCom. Simpan data dalam S3/GCS atau pangkalan data dan hantar hanya rujukan atau URI melalui XCom untuk memastikan tugas pantas dan boleh dipercayai.
S4:Apakah pelaksana yang patut saya pilih untuk Airflow dalam pengeluaran?
Untuk keanjalan dan pengasingan, Pelaksana Kubernetes ialah lalai yang kukuh. Untuk persediaan yang lebih ringkas, Pelaksana Celery berfungsi dengan baik—hanya pastikan autoskala, pengelogan yang teguh dan rahsia luaran.
S5:Bagaimanakah cara saya mengendalikan kebergantungan merentas berbilang DAG Airflow?
Gunakan Set Data untuk pencetus merentas DAG deklaratif apabila satu saluran menghasilkan data untuk yang lain. Sebagai alternatif, ExternalTaskSensor boleh menyelaraskan larian, tetapi Set Data lebih bersih untuk orkestrasi dipacu data.