Cara Menggunakan Airflow: Panduan Praktis dan Lengkap untuk Membangun Pipeline Data yang Andal
Jika Anda memindahkan data atau mengatur pekerjaan ML, Anda mungkin sering mendengar kalimat ini: “Masukkan saja ke Airflow.” Kenyataannya, Apache Airflow sangat berguna ketika Anda membutuhkan visibilitas, keandalan, dan kontrol atas alur kerja yang kompleks. Dalam panduan praktis ini, kami akan membahas langkah demi langkah cara menggunakan Airflow—mulai dari konsep inti hingga pola siap produksi—sehingga Anda dapat mengirimkan pipeline yang Anda percaya.
Kami akan membuatnya tetap praktis: Anda akan mendapatkan model mental untuk DAG dan task, contoh langsung dengan TaskFlow API, opsi penerapan, strategi pengujian, dan praktik terbaik. Pada akhirnya, Anda akan beralih dari “Saya bisa menjalankan tutorial” menjadi “Saya bisa menjalankan ini di produksi.”
Catatan: Untuk pembahasan dan referensi yang lebih mendalam, dokumentasi resmi sangat baik dan diperbarui secara berkala.
Apa Sebenarnya Apache Airflow Itu?
Airflow adalah orkestrator—bukan pemroses data. Ia menjadwalkan, mengatur urutan, dan memantau pekerjaan yang Anda jalankan di tempat lain (database, warehouse, pekerjaan Spark, API, container). Anda mendefinisikan alur kerja sebagai DAG (Directed Acyclic Graphs), yang hanyalah file Python yang menyandikan task dan dependensinya. Airflow kemudian menjalankan task tersebut sesuai dengan jadwal, parameter, dan lingkungan Anda.
- DAG: Definisi alur kerja (grafik task dengan dependensi).
- Task: Unit pekerjaan (fungsi Python, eksekusi SQL, perintah Bash, pemicu pekerjaan eksternal, dll.).
- Operator: Template untuk jenis task (misalnya,
PythonOperator, BashOperator, KubernetesPodOperator).
- Scheduler: Memutuskan apa yang harus dijalankan dan kapan.
- Executor: Menjalankan task (secara lokal, dengan Celery, Kubernetes, dll.).
- UI: Pusat kontrol Anda untuk menjalankan, log, percobaan ulang, dan silsilah.
Mulailah dengan tutorial resmi setelah Anda menginstal Airflow; tutorial tersebut memberi Anda gambaran besar dengan cepat.
Menginstal dan Menjalankan Airflow dengan Cara yang Benar
Airflow fleksibel. Pilih jalur yang sesuai dengan tahap Anda:
- Pengembangan lokal (mulai cepat):
- Gunakan Docker Compose mulai cepat yang disediakan oleh proyek. Ini menjalankan webserver, scheduler, database, dan lainnya dengan default yang wajar.
- Bagus untuk belajar dan melakukan iterasi pada DAG.
- Celery Executor atau Kubernetes Executor dengan Postgres yang dikelola.
- Simpan log di S3/GCS dan paket dependensi dengan image atau
requirements.txt Anda.
- Kubernetes Executor untuk elastisitas atau Celery Executor dengan pekerja autoscaling.
- Rahasia eksternal (Vault), observabilitas yang kuat (log + metrik), dan penerapan biru/hijau untuk peningkatan.
Tip: Jaga codebase Airflow Anda tetap terkontrol versinya, dikemas dalam container, dan diuji sebelum promosi. Halaman “Praktik Terbaik” menguraikan pola siap produksi.
Konsep Inti yang Akan Anda Gunakan Setiap Hari
DAG: Alur Kerja Anda sebagai Kode
DAG adalah file Python yang mendefinisikan:
- Metadata DAG: id, jadwal, tanggal mulai, tag.
- Argumen default: percobaan ulang, pemilik, SLA.
Anggap DAG sebagai “apa” dan “kapan,” dan task sebagai “bagaimana.”
Task dan Operator
Operator adalah prefab untuk task umum. Contoh:
- PythonOperator / TaskFlow
@task untuk kode Python
- BashOperator untuk perintah shell
- SimpleHttpOperator untuk API
- KubernetesPodOperator untuk pekerjaan containerized
- Penyedia SQL (misalnya, Snowflake, BigQuery, Postgres) untuk pekerjaan warehouse
TaskFlow API: Cara Modern dan Pythonic
TaskFlow API memungkinkan Anda menulis task sebagai fungsi Python dengan @task, mengembalikan nilai yang diteruskan melalui XCom, dan menyusunnya dengan bersih. Ini mengurangi boilerplate dan meningkatkan keterbacaan—sangat direkomendasikan.
DAG Airflow Pertama Anda (Edisi TaskFlow)
Di bawah ini adalah contoh gaya ETL minimal untuk mengilustrasikan ide-ide utama: penjadwalan, TaskFlow, dependensi, dan penerusan data XCom.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Penjadwalan, Catchup, dan Backfill
schedule: Cron atau preset (@daily, @hourly).
start_date + catchup: Jika catchup=True, Airflow akan melakukan backfill run dari tanggal mulai. Untuk pipeline gaya streaming, atur catchup=False.
- Backfill manual: Gunakan UI atau CLI untuk menjalankan ulang interval historis.
Aturan praktis: aktifkan catchup untuk pekerjaan batch deterministik; nonaktifkan untuk pipeline real-time atau API yang dibatasi laju.
Melewatkan Data Antara Task (XCom) dengan Aman
- Objek kecil: nilai yang dikembalikan dengan TaskFlow tidak masalah.
- Payload besar: simpan di penyimpanan objek (S3/GCS) dengan kunci di XCom.
- Hindari data sensitif di XCom; gunakan backend rahasia (misalnya, Vault) dan variabel lingkungan.
Pemetaan Task Dinamis dan Beban Kerja Fan-out
Airflow dapat menghasilkan task secara dinamis saat runtime berdasarkan input—ideal untuk dataset yang dipartisi atau pekerjaan multi-tenant.
- Jaga DAG tetap deterministik dan idempoten.
- Pisahkan orkestrasi (Airflow) dari komputasi (Spark, dbt, warehouse).
- Gunakan TaskFlow API untuk kejelasan dan kebersihan XCom.
- Parameterisasi DAG; gunakan variabel dengan bijak.
- Pantau, beri tahu, dan dokumentasikan pipeline Anda.
Cara Bekerja dengan Data Warehouse dan ML
- Data warehouse: Gunakan operator penyedia (misalnya, SnowflakeOperator, BigQueryInsertJobOperator) untuk pekerjaan SQL. Simpan SQL dalam file atau modul yang diberi versi.
- dbt: Picu dbt melalui Bash/KubernetesPodOperator atau operator dbt khusus di penyedia.
- ML: Atur pembuatan fitur, pelatihan, dan inferensi batch sebagai task terpisah; cache artefak dalam penyimpanan dan catat metrik.
Penjadwalan Tingkat Lanjut: Dataset dan Dependensi Lintas-DAG
- Dataset memungkinkan satu DAG menghasilkan dataset logis yang memicu DAG lain saat diperbarui—lebih bersih daripada pemicu ad-hoc.
- Untuk pola lama, ExternalTaskSensor berfungsi, tetapi dataset lebih deklaratif.
Keamanan dan Kepatuhan
- Gunakan kontrol akses berbasis peran (RBAC) di UI.
- Isolasi lingkungan per tim atau batas kepercayaan.
- Simpan jejak audit melalui log dan riwayat perubahan koneksi.
Peningkatan dan Pembuatan Versi
- Uji peningkatan di staging dengan beban kerja seperti produksi.
- Sematkan dan tingkatkan penyedia dengan sengaja.
- Baca catatan rilis untuk perubahan dan penghentian khusus executor.
Daftar Periksa Cepat untuk DAG Produksi Pertama Anda
- Kepemilikan yang jelas (tag
owner) dan peringatan yang dikonfigurasi.
retries diatur dengan backoff yang wajar.
- Task idempoten dan dependensi eksplisit.
- Payload XCom kecil; data besar dalam penyimpanan.
- Log dikirim ke penyimpanan yang tahan lama; metrik diekspor.
- Rencana peluncuran (kenari atau biru/hijau) dan langkah-langkah rollback.
Contoh: DAG Pemuatan Warehouse Realistis
Pola ini mengekstrak file harian, memvalidasinya, dan memuatnya ke dalam tabel warehouse, dengan pemetaan dinamis per partisi dan sensor yang dapat ditangguhkan.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Tinjau Praktik Terbaik sebelum dipromosikan ke produksi.
- Jelajahi dokumentasi penyedia untuk sistem Anda (warehouse, cloud, alat ML).
Ngomong-ngomong: Percepat penulisan dengan sidekick AI
Perlu dicatat: jika Anda membuat banyak DAG, asisten AI yang memahami kode dapat mempercepat boilerplate, menghasilkan stub TaskFlow, dan bahkan menyarankan perbaikan dependensi. Jika Anda menginginkan helper ringan di samping editor dan browser Anda, Sider.AI dapat berguna untuk penulisan ulang dan penjelasan kode cepat selama pengembangan. Poin-Poin Penting
- Gunakan Airflow untuk mengatur, bukan menghitung.
- Pilih TaskFlow API untuk DAG yang bersih dan dapat diuji.
- Jaga data keluar dari XCom; lewati referensi sebagai gantinya.
- Gunakan sensor/operator yang dapat ditangguhkan untuk menghemat slot.
- Containerize, uji, dan promosikan melalui lingkungan.
- Andalkan tutorial resmi dan praktik terbaik sebagai bintang utara Anda.
FAQ
Q1: Apa cara termudah untuk belajar cara menggunakan Airflow?
Mulailah dengan Tutorial resmi untuk memahami DAG, task, penjadwalan, dan UI. Kemudian bangun pipeline kecil berbasis TaskFlow dan lakukan iterasi dengan panduan praktik terbaik untuk kesiapan produksi.
Q2: Haruskah saya menggunakan TaskFlow API atau operator klasik di Airflow?
Gunakan TaskFlow API untuk sebagian besar pipeline Pythonic karena lebih bersih dan menangani pengembalian XCom secara alami. Operator klasik masih bagus untuk task non-Python seperti Bash, SQL, atau pekerjaan container.
Q3: Bagaimana cara melewatkan data besar antar task Airflow?
Hindari memasukkan payload besar ke dalam XCom. Simpan data di S3/GCS atau database dan lewati hanya referensi atau URI melalui XCom agar task tetap cepat dan andal.
Q4: Executor mana yang harus saya pilih untuk Airflow dalam produksi?
Untuk elastisitas dan isolasi, Kubernetes Executor adalah default yang kuat. Untuk pengaturan yang lebih sederhana, Celery Executor berfungsi dengan baik—pastikan autoscaling, logging yang kuat, dan rahasia eksternal.
Q5: Bagaimana cara menangani dependensi di beberapa DAG Airflow?
Gunakan Dataset untuk pemicu lintas-DAG deklaratif ketika satu pipeline menghasilkan data untuk pipeline lain. Atau, ExternalTaskSensor dapat mengoordinasikan run, tetapi Dataset lebih bersih untuk orkestrasi berbasis data.