วิธีใช้งาน Airflow: คู่มือเชิงปฏิบัติแบบครบวงจรสำหรับการสร้าง Data Pipeline ที่เชื่อถือได้
หากคุณย้ายข้อมูลหรือจัดการงาน ML คุณอาจเคยได้ยินคำกล่าวที่ว่า “แค่ใส่ไว้ใน Airflow” ความจริงก็คือ Apache Airflow นั้นโดดเด่นเมื่อคุณต้องการความสามารถในการมองเห็น ความน่าเชื่อถือ และการควบคุม Workflow ที่ซับซ้อน ในคู่มือเชิงปฏิบัตินี้ เราจะแนะนำคุณทีละขั้นตอนเกี่ยวกับวิธีใช้งาน Airflow ตั้งแต่แนวคิดหลักไปจนถึงรูปแบบที่พร้อมใช้งานจริง เพื่อให้คุณสามารถส่ง Pipeline ที่คุณไว้วางใจได้
เราจะเน้นที่การนำไปปฏิบัติได้จริง: คุณจะได้รับแบบจำลองทางความคิดสำหรับ DAG และ Task ตัวอย่างเชิงปฏิบัติกับการใช้ TaskFlow API ตัวเลือกในการปรับใช้ กลยุทธ์การทดสอบ และแนวทางปฏิบัติที่ดีที่สุด เมื่อถึงตอนท้าย คุณจะเปลี่ยนจาก “ฉันสามารถรัน Tutorial ได้” เป็น “ฉันสามารถรันสิ่งนี้ใน Prod ได้”
หมายเหตุ: สำหรับการเจาะลึกและอ้างอิงเพิ่มเติม เอกสารอย่างเป็นทางการนั้นยอดเยี่ยมและมีการปรับปรุงอยู่เสมอ
Apache Airflow คืออะไรกันแน่
Airflow เป็น Orchestrator ไม่ใช่ตัวประมวลผลข้อมูล โดยจะกำหนดตารางเวลา จัดลำดับ และตรวจสอบงานที่คุณรันที่อื่น (ฐานข้อมูล คลังข้อมูล งาน Spark, API, Container) คุณกำหนด Workflow เป็น DAG (Directed Acyclic Graphs) ซึ่งเป็นเพียงไฟล์ Python ที่เข้ารหัส Task และ Dependencies ของ Task จากนั้น Airflow จะ Execute Task เหล่านั้นตามตารางเวลา พารามิเตอร์ และสภาพแวดล้อมของคุณ
- DAG: คำจำกัดความของ Workflow (กราฟของ Task ที่มี Dependencies)
- Task: หน่วยของงาน (ฟังก์ชัน Python, การ Execute SQL, คำสั่ง Bash, ตัว Trigger งานภายนอก ฯลฯ)
- Operator: Template สำหรับ Task ประเภทหนึ่ง (เช่น
PythonOperator, BashOperator, KubernetesPodOperator)
- Scheduler: ตัดสินใจว่าจะรันอะไรและเมื่อใด
- Executor: รัน Task (ในเครื่อง ด้วย Celery, Kubernetes ฯลฯ)
- UI: ศูนย์ควบคุมของคุณสำหรับการรัน Log การ Retry และ Lineage
เริ่มต้นด้วย Tutorial อย่างเป็นทางการเมื่อคุณติดตั้ง Airflow แล้ว Tutorial เหล่านั้นจะให้ภาพรวมที่รวดเร็วแก่คุณ
การติดตั้งและรัน Airflow อย่างถูกวิธี
Airflow มีความยืดหยุ่น เลือก Path ที่ตรงกับ Stage ของคุณ:
- การพัฒนาในเครื่อง (Quick Start):
- ใช้ Quick-Start Docker Compose ที่จัดทำโดย Project ซึ่งจะ Spin Up Webserver, Scheduler, ฐานข้อมูล และอื่นๆ ด้วย Default ที่เหมาะสม
- เหมาะสำหรับการเรียนรู้และ Iteration บน DAG
- Celery Executor หรือ Kubernetes Executor ที่มีการจัดการ Postgres
- จัดเก็บ Log ใน S3/GCS และ Package Dependencies ด้วย Image หรือ
requirements.txt
- Kubernetes Executor สำหรับ Elasticity หรือ Celery Executor ที่มี Autoscaling Worker
- External Secrets (Vault), Observability ที่แข็งแกร่ง (Log + Metrics) และ Blue/Green Deploys สำหรับการ Upgrade
คำแนะนำ: ควบคุม Version ของ Airflow Codebase, Containerize และทดสอบก่อนที่จะ Promote หน้า “Best Practices” จะสรุปรูปแบบที่พร้อมใช้งานจริง
แนวคิดหลักที่คุณจะได้ใช้ทุกวัน
DAG: Workflow ของคุณในรูปแบบ Code
DAG คือไฟล์ Python ที่กำหนด:
- DAG Metadata: ID, Schedule, Start Date, Tag
- Default Args: Retries, Owners, SLA
- Task และ Dependencies ของ Task
คิดว่า DAG เป็น “อะไร” และ “เมื่อไหร่” และ Task เป็น “อย่างไร”
Task และ Operator
Operator คือ Prefab สำหรับ Task ทั่วไป ตัวอย่าง:
- PythonOperator / TaskFlow
@task สำหรับโค้ด Python
- BashOperator สำหรับคำสั่ง Shell
- SimpleHttpOperator สำหรับ API
- KubernetesPodOperator สำหรับ Containerized Job
- SQL Provider (เช่น Snowflake, BigQuery, Postgres) สำหรับงาน Warehouse
TaskFlow API: วิธีการที่ทันสมัยแบบ Pythonic
TaskFlow API ช่วยให้คุณเขียน Task เป็นฟังก์ชัน Python ด้วย @task ส่งคืนค่าที่ส่งผ่าน XCom และ Compose อย่าง Clean ลด Boilerplate และปรับปรุง Readability ขอแนะนำอย่างยิ่ง
Airflow DAG แรกของคุณ (TaskFlow Edition)
ด้านล่างนี้คือตัวอย่างสไตล์ ETL ที่เรียบง่ายเพื่อแสดงแนวคิดหลัก: การ Scheduling, TaskFlow, Dependencies และการส่งผ่านข้อมูล XCom
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
การ Scheduling, Catchup และ Backfill
schedule: Cron หรือ Presets (@daily, @hourly)
start_date + catchup: หาก catchup=True Airflow จะ Backfill การรันจาก Start Date สำหรับ Streaming-Style Pipeline ให้ตั้งค่า catchup=False
- Manual Backfill: ใช้ UI หรือ CLI เพื่อ Rerun ช่วงเวลาในอดีต
หลักการง่ายๆ ที่ควรจำ: เปิดใช้งาน Catchup สำหรับ Deterministic Batch Job ปิดใช้งานสำหรับ Real-Time หรือ API Rate-Limited Pipeline
การส่งผ่านข้อมูลระหว่าง Task (XCom) อย่างปลอดภัย
- Small Object: ค่าที่ส่งคืนด้วย TaskFlow ใช้งานได้ดี
- Large Payload: จัดเก็บใน Object Storage (S3/GCS) ด้วย Key ใน XCom
- หลีกเลี่ยงข้อมูล Sensitive ใน XCom ใช้ Secrets Backend (เช่น Vault) และ Environment Variable
Dynamic Task Mapping และ Fan-Out Workload
Airflow สามารถ Generate Task แบบ Dynamic ได้ใน Runtime ตาม Input ซึ่งเหมาะสำหรับ Partitioned Dataset หรืองาน Multi-Tenant
- ทำให้ DAG เป็น Deterministic และ Idempotent
- แยก Orchestration (Airflow) ออกจากการคำนวณ (Spark, dbt, Warehouse)
- ใช้ TaskFlow API เพื่อความชัดเจนและ XCom Hygiene
- Parameterize DAG ใช้อย่างระมัดระวัง
- Monitor, Alert และ Document Pipeline ของคุณ
วิธีการทำงานกับ Data Warehouse และ ML
- Data Warehouse: ใช้ Provider Operator (เช่น SnowflakeOperator, BigQueryInsertJobOperator) สำหรับ SQL Job จัดเก็บ SQL ในไฟล์หรือ Versioned Module
- dbt: Trigger dbt ผ่าน Bash/KubernetesPodOperator หรือ Dedicated dbt Operator ใน Provider
- ML: Orchestrate Feature Generation, Training และ Batch Inference เป็น Task ที่แยกจากกัน Cache Artifact ใน Storage และ Log Metrics
Advanced Scheduling: Dataset และ Cross-DAG Dependencies
- Dataset ช่วยให้ DAG หนึ่งสร้าง Logical Dataset ที่ Trigger อีก DAG หนึ่งเมื่อมีการอัปเดต ซึ่ง Clean กว่า Ad-Hoc Trigger
- สำหรับ Legacy Pattern ExternalTaskSensor สามารถใช้งานได้ แต่ Dataset นั้น Declarative มากกว่า
Security และ Compliance
- ใช้ Role-Based Access Control (RBAC) ใน UI
- แยก Environment ต่อ Team หรือ Trust Boundary
- เก็บ Audit Trail ผ่าน Log และ Connection Change History
Upgrade และ Versioning
- ทดสอบการ Upgrade ใน Staging ด้วย Production-Like Workload
- Pin และ Upgrade Provider อย่างรอบคอบ
- อ่าน Release Note สำหรับ Executor-Specific Change และ Deprecation
Quick Checklist สำหรับ Production DAG แรกของคุณ
- Clear Ownership (Tag
owner) และกำหนดค่า Alert
- ตั้งค่า
retries ด้วย Backoff ที่เหมาะสม
- Idempotent Task และ Explicit Dependencies
- Small XCom Payload ข้อมูลขนาดใหญ่อยู่ใน Storage
- Log ถูกส่งไปยัง Durable Storage ส่งออก Metrics
- Rollout Plan (Canary หรือ Blue/Green) และขั้นตอน Rollback
ตัวอย่าง: Realistic Warehouse Load DAG
Pattern นี้ Extract ไฟล์รายวัน Validate และ Load ลงใน Warehouse Table พร้อม Dynamic Mapping ต่อ Partition และ Deferrable Sensor
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- ตรวจสอบ Best Practices ก่อนที่จะ Promote ไปยัง Production
- สำรวจเอกสาร Provider สำหรับระบบของคุณ (Warehouse, Cloud, ML Tool)
อีกอย่าง: เร่งความเร็วในการเขียนด้วย AI Sidekick
สิ่งที่ควรทราบ: หากคุณร่าง DAG จำนวนมาก AI Assistant ที่เข้าใจ Code สามารถเร่งความเร็ว Boilerplate สร้าง TaskFlow Stub และแม้แต่แนะนำ Dependency Fix ได้ หากคุณต้องการ Helper ขนาดเล็กควบคู่ไปกับ Editor และ Browser ของคุณ Sider.AI สามารถช่วยในการ Rewrite Code และ Explanation อย่างรวดเร็วระหว่างการพัฒนาได้ ประเด็นสำคัญ
- ใช้ Airflow เพื่อ Orchestrate ไม่ใช่ Compute
- เลือก TaskFlow API สำหรับ DAG ที่ Clean และ Testable
- เก็บข้อมูลไว้นอก XCom ส่งผ่าน Reference แทน
- ใช้ Deferrable Sensor/Operator เพื่อประหยัด Slot
- Containerize ทดสอบ และ Promote ผ่าน Environment
- อ้างอิงจาก Official Tutorial และ Best Practices เป็น North Star ของคุณ
FAQ
Q1: วิธีที่ง่ายที่สุดในการเรียนรู้วิธีใช้งาน Airflow คืออะไร?
เริ่มต้นด้วย Official Tutorial เพื่อทำความเข้าใจ DAG, Task, Scheduling และ UI จากนั้นสร้าง Pipeline ขนาดเล็กที่ใช้ TaskFlow และ Iteration ด้วยคู่มือ Best Practices เพื่อความพร้อมในการใช้งานจริง
Q2: ฉันควรใช้ TaskFlow API หรือ Classic Operator ใน Airflow?
ใช้ TaskFlow API สำหรับ Pythonic Pipeline ส่วนใหญ่ เพราะ Clean กว่าและจัดการ XCom Return ได้อย่างเป็นธรรมชาติ Classic Operator ยังคงยอดเยี่ยมสำหรับ Non-Python Task เช่น Bash, SQL หรืองาน Container
Q3: ฉันจะส่งผ่านข้อมูลขนาดใหญ่ระหว่าง Airflow Task ได้อย่างไร?
หลีกเลี่ยงการใส่ Large Payload ใน XCom จัดเก็บข้อมูลใน S3/GCS หรือฐานข้อมูล และส่งผ่านเฉพาะ Reference หรือ URI ผ่าน XCom เพื่อให้ Task รวดเร็วและเชื่อถือได้
Q4: ฉันควรเลือก Executor ใดสำหรับ Airflow ใน Production?
สำหรับ Elasticity และ Isolation Kubernetes Executor เป็น Default ที่แข็งแกร่ง สำหรับการตั้งค่าที่เรียบง่ายกว่า Celery Executor ทำงานได้ดี เพียงตรวจสอบให้แน่ใจว่ามี Autoscaling, Robust Logging และ Externalized Secret
Q5: ฉันจะจัดการ Dependencies ข้าม Airflow DAG หลายรายการได้อย่างไร?
ใช้ Dataset สำหรับ Declarative Cross-DAG Trigger เมื่อ Pipeline หนึ่งสร้างข้อมูลสำหรับอีก Pipeline หนึ่ง อีกทางเลือกหนึ่ง ExternalTaskSensor สามารถประสานงานการรันได้ แต่ Dataset นั้น Clean กว่าสำหรับการ Orchestration ที่ขับเคลื่อนด้วยข้อมูล