如何使用 Airflow:構建可靠數據管道的實用端到端指南
如果您需要移動數據或協調 ML 任務,您可能經常聽到這樣一句話:「直接放到 Airflow 裡就好。」事實是,當您需要對複雜工作流程的可見性、可靠性和控制力時,Apache Airflow 就能大放異彩。在本實用指南中,我們將逐步引導您如何使用 Airflow——從核心概念到可投入生產的模式——以便您可以交付值得信賴的管道。
我們將保持其可操作性:您將獲得 DAG 和任務的思維模型、TaskFlow API 的實踐範例、部署選項、測試策略和最佳實踐。到最後,您將從「我可以運行教學」變成「我可以在生產環境中運行它」。
注意:如需更深入的探討和參考,官方文檔非常出色並且會定期更新。
Apache Airflow 到底是什麼?
Airflow 是一個協調器——而不是數據處理器。它安排、排序和監控您在其他地方運行的工作(數據庫、數據倉庫、Spark 任務、API、容器)。您將工作流程定義為 DAG(有向無環圖),它們只是編碼任務及其依賴關係的 Python 文件。然後,Airflow 會根據您的排程、參數和環境執行這些任務。
- Task(任務):一個工作單元(Python 函數、SQL 執行、Bash 命令、外部任務觸發器等)。
- Operator(運算符):一種任務的模板(例如,
PythonOperator、BashOperator、KubernetesPodOperator)。
- Scheduler(排程器):決定運行什麼以及何時運行。
- Executor(執行器):運行任務(本地、使用 Celery、Kubernetes 等)。
安裝 Airflow 後,先從官方教程開始;它們能讓您快速了解全貌。
以正確的方式安裝和運行 Airflow
Airflow 非常靈活。選擇與您的階段相符的路徑:
- 使用項目提供的快速入門 Docker Compose。它使用合理的預設值啟動 Web 伺服器、排程器、數據庫等。
- Celery Executor 或 Kubernetes Executor 與託管的 Postgres。
- 將日誌儲存在 S3/GCS 中,並使用您的映像或
requirements.txt 打包依賴項。
- Kubernetes Executor 用於彈性,或 Celery Executor 搭配自動擴展工作節點。
- 外部密鑰(Vault)、強大的可觀察性(日誌 + 指標)以及用於升級的藍/綠部署。
提示:在升級之前,請對 Airflow 代碼庫進行版本控制、容器化和測試。「最佳實踐」頁面概述了可投入生產的模式。
您每天都會使用的核心概念
DAG:將您的工作流程視為程式碼
DAG 是一個 Python 文件,它定義了:
將 DAG 視為「做什麼」和「何時做」,而將任務視為「如何做」。
任務和運算符
運算符是常見任務的預製組件。範例:
- PythonOperator / TaskFlow
@task 用於 Python 程式碼
- SimpleHttpOperator 用於 API
- KubernetesPodOperator 用於容器化任務
- SQL 提供者(例如,Snowflake、BigQuery、Postgres)用於數據倉庫工作
TaskFlow API:現代 Pythonic 方法
TaskFlow API 允許您使用 @task 將任務編寫為 Python 函數,傳回透過 XCom 傳遞的值,並乾淨地組合它們。它減少了樣板程式碼並提高了可讀性——強烈推薦。
您的第一個 Airflow DAG(TaskFlow 版本)
下面是一個最小的 ETL 樣式範例,用於說明關鍵概念:排程、TaskFlow、依賴關係和 XCom 數據傳遞。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
排程、Catchup 和 Backfill
schedule:Cron 或預設值(@daily、@hourly)。
start_date + catchup:如果 catchup=True,Airflow 將從開始日期回填運行。對於串流樣式的管道,請設定 catchup=False。
- 手動回填:使用 UI 或 CLI 重新運行歷史間隔。
實用經驗法則:為確定性的批次任務啟用 catchup;為即時或 API 速率限制的管道停用。
安全地在任務之間傳遞數據 (XCom)
- 小型物件:使用 TaskFlow 傳回值是可以的。
- 大型有效負載:使用 XCom 中的金鑰儲存在物件儲存體 (S3/GCS) 中。
- 避免在 XCom 中使用敏感數據;使用密鑰後端(例如,Vault)和環境變數。
動態任務映射和扇出工作負載
Airflow 可以在運行時根據輸入動態生成任務——非常適合分區數據集或多租戶任務。
- 將協調(Airflow)與計算(Spark、dbt、數據倉庫)分開。
- 使用 TaskFlow API 提高清晰度和 XCom 衛生。
如何使用數據倉庫和 ML
- 數據倉庫:使用提供者運算符(例如,SnowflakeOperator、BigQueryInsertJobOperator)處理 SQL 任務。將 SQL 儲存在檔案或版本控制的模組中。
- dbt:透過 Bash/KubernetesPodOperator 或提供者中專用的 dbt 運算符觸發 dbt。
- ML:協調特徵生成、訓練和批次推論作為單獨的任務;將工件緩存在儲存體中並記錄指標。
進階排程:數據集和跨 DAG 依賴關係
- 數據集讓一個 DAG 產生一個邏輯數據集,該數據集在更新時觸發另一個 DAG——比臨時觸發器更乾淨。
- 對於舊模式,ExternalTaskSensor 可以工作,但數據集更具聲明性。
安全性和合規性
- 在 UI 中使用基於角色的存取控制 (RBAC)。
升級和版本控制
您的第一個生產 DAG 的快速檢查清單
範例:一個實際的數據倉庫加載 DAG
此模式提取每日檔案、驗證它們並將它們載入到數據倉庫表中,每個分區都有動態映射和可延遲的感測器。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- 瀏覽系統(數據倉庫、雲、ML 工具)的提供者文檔。
順便說一句:使用 AI 助手加速編寫
值得注意的是:如果您起草大量 DAG,那麼了解程式碼的 AI 助手可以加速樣板程式碼、生成 TaskFlow 存根,甚至可以建議依賴關係修復。如果您想要一個輕量級的助手在您的編輯器和瀏覽器旁邊,Sider.AI 可以方便地在開發期間進行快速程式碼重寫和解釋。 主要要點
- 首選 TaskFlow API,以獲得乾淨、可測試的 DAG。
常見問題解答
Q1:學習如何使用 Airflow 最簡單的方法是什麼?
從官方教程開始,了解 DAG、任務、排程和 UI。然後,構建一個基於 TaskFlow 的小型管道,並使用最佳實踐指南進行迭代,以實現生產就緒。
Q2:我應該在 Airflow 中使用 TaskFlow API 還是經典運算符?
對於大多數 Pythonic 管道,請使用 TaskFlow API,因為它更乾淨並且可以自然地處理 XCom 傳回值。經典運算符仍然非常適合非 Python 任務,例如 Bash、SQL 或容器任務。
Q3:如何在 Airflow 任務之間傳遞大型數據?
避免將大型有效負載放入 XCom。將數據儲存在 S3/GCS 或數據庫中,並且僅透過 XCom 傳遞參考或 URI,以保持任務快速且可靠。
Q4:我應該為生產中的 Airflow 選擇哪個執行器?
為了獲得彈性和隔離性,Kubernetes Executor 是一個強大的預設選項。對於更簡單的設定,Celery Executor 也能很好地工作——只需確保自動擴展、強大的日誌記錄和外部化的密鑰。
Q5:如何處理多個 Airflow DAG 之間的依賴關係?
當一個管道為另一個管道產生數據時,使用數據集進行聲明式跨 DAG 觸發。或者,ExternalTaskSensor 可以協調運行,但對於數據驅動的協調,數據集更乾淨。