如何使用 Airflow:构建可靠数据管道的实用端到端指南
如果你需要移动数据或编排 ML 任务,你可能已经听过同样的建议:“直接放到 Airflow 里就行了”。 事实上,当你需要对复杂工作流进行可视化、保证可靠性和控制时,Apache Airflow 就能大放异彩。 在这份实用指南中,我们将逐步讲解如何使用 Airflow——从核心概念到生产就绪的模式——以便你可以交付值得信赖的管道。
我们将保持其可操作性:你将获得 DAG 和任务的思维模型、TaskFlow API 的实践示例、部署选项、测试策略和最佳实践。 到最后,你将从“我可以运行教程”转变为“我可以在生产环境中运行它”。
注意:如需更深入的了解和参考,官方文档非常出色且会定期更新。
Apache Airflow 到底是什么?
Airflow 是一个编排器——而不是数据处理器。 它调度、排序和监控你在其他地方运行的工作(数据库、仓库、Spark 作业、API、容器)。 你将工作流定义为 DAG(有向无环图),它们只是对任务及其依赖项进行编码的 Python 文件。 然后,Airflow 根据你的计划、参数和环境执行这些任务。
- 任务:一个工作单元(Python 函数、SQL 执行、Bash 命令、外部作业触发器等)。
- Operator:一种任务的模板(例如,
PythonOperator、BashOperator、KubernetesPodOperator)。
- Executor:运行任务(本地、使用 Celery、Kubernetes 等)。
安装 Airflow 后,首先从官方教程开始;它们可以让你快速了解全局。
以正确的方式安装和运行 Airflow
Airflow 非常灵活。 选择与你的阶段相匹配的路径:
- 使用项目提供的快速入门 Docker Compose。 它使用合理的默认值启动 Web 服务器、调度程序、数据库等。
- 带有托管 Postgres 的 Celery Executor 或 Kubernetes Executor。
- 将日志存储在 S3/GCS 中,并使用你的镜像或
requirements.txt 打包依赖项。
- 用于弹性的 Kubernetes Executor 或带有自动缩放 worker 的 Celery Executor。
- 外部密钥 (Vault)、强大的可观察性(日志 + 指标)以及用于升级的蓝/绿部署。
提示:在升级之前,请对 Airflow 代码库进行版本控制、容器化和测试。“最佳实践”页面概述了生产就绪的模式。
你每天都会使用的核心概念
DAG:你的工作流即代码
DAG 是一个 Python 文件,它定义了:
将 DAG 视为“什么”和“何时”,将任务视为“如何”。
任务和 Operator
Operator 是常见任务的预制件。 例子:
- 用于 Python 代码的 PythonOperator / TaskFlow
@task
- 用于 shell 命令的 BashOperator
- 用于 API 的 SimpleHttpOperator
- 用于容器化作业的 KubernetesPodOperator
- 用于仓库工作的 SQL 提供程序(例如,Snowflake、BigQuery、Postgres)
TaskFlow API:现代的 Pythonic 方式
TaskFlow API 允许你将任务编写为带有 @task 的 Python 函数,返回值通过 XCom 传递,并干净地组合它们。 它减少了样板代码并提高了可读性——强烈推荐。
你的第一个 Airflow DAG(TaskFlow 版本)
下面是一个最小的 ETL 风格示例,用于说明关键思想:调度、TaskFlow、依赖关系和 XCom 数据传递。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
调度、Catchup 和回填
schedule:Cron 或预设(@daily、@hourly)。
start_date + catchup:如果 catchup=True,Airflow 将从开始日期回填运行。 对于流式管道,请设置 catchup=False。
- 手动回填:使用 UI 或 CLI 重新运行历史间隔。
实用的经验法则:为确定性批处理作业启用 catchup;为实时或 API 速率限制的管道禁用。
在任务之间安全地传递数据 (XCom)
- 大型有效负载:使用 XCom 中的密钥存储在对象存储 (S3/GCS) 中。
- 避免在 XCom 中使用敏感数据;使用密钥后端(例如,Vault)和环境变量。
动态任务映射和扇出工作负载
Airflow 可以在运行时根据输入动态生成任务——非常适合分区数据集或多租户作业。
- 将编排 (Airflow) 与计算 (Spark, dbt, 仓库) 分开。
- 使用 TaskFlow API 来提高清晰度和 XCom 卫生。
如何使用数据仓库和 ML
- 数据仓库:使用提供程序 Operator(例如,SnowflakeOperator、BigQueryInsertJobOperator)进行 SQL 作业。 将 SQL 存储在文件或版本控制的模块中。
- dbt:通过 Bash/KubernetesPodOperator 或提供程序中的专用 dbt Operator 触发 dbt。
- ML:将特征生成、训练和批量推理编排为单独的任务;在存储中缓存工件并记录指标。
高级调度:数据集和跨 DAG 依赖项
- 数据集允许一个 DAG 生成一个逻辑数据集,该数据集在更新时触发另一个 DAG——比临时触发器更干净。
- 对于旧模式,ExternalTaskSensor 可以工作,但数据集更具声明性。
安全性和合规性
- 在 UI 中使用基于角色的访问控制 (RBAC)。
升级和版本控制
你的第一个生产 DAG 的快速清单
示例:一个真实的仓库加载 DAG
此模式提取每日文件,验证它们,并将它们加载到仓库表中,每个分区都有动态映射和可延迟的传感器。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- 浏览你的系统(仓库、云、ML 工具)的提供程序文档。
顺便说一句:使用 AI 助手加速创作
值得注意的是:如果你起草大量 DAG,那么了解代码的 AI 助手可以加速样板代码,生成 TaskFlow 存根,甚至建议依赖关系修复。 如果你想要一个轻量级的助手与你的编辑器和浏览器一起使用,Sider.AI 可以方便地在开发过程中进行快速代码重写和解释。 主要收获
- 首选 TaskFlow API 来获得干净、可测试的 DAG。
- 使用可延迟的传感器/Operator 来节省插槽。
FAQ
Q1: 学习如何使用 Airflow 的最简单方法是什么?
首先从官方教程开始,了解 DAG、任务、调度和 UI。 然后构建一个基于 TaskFlow 的小型管道,并使用最佳实践指南进行迭代,以实现生产就绪。
Q2: 我应该在 Airflow 中使用 TaskFlow API 还是经典 Operator?
对于大多数 Pythonic 管道,请使用 TaskFlow API,因为它更简洁并且可以自然地处理 XCom 返回。 经典 Operator 对于非 Python 任务(如 Bash、SQL 或容器作业)仍然很棒。
Q3: 如何在 Airflow 任务之间传递大数据?
避免将大型有效负载放入 XCom。 将数据存储在 S3/GCS 或数据库中,并且仅通过 XCom 传递引用或 URI,以保持任务快速可靠。
Q4: 我应该为生产中的 Airflow 选择哪个执行程序?
为了弹性和隔离,Kubernetes Executor 是一个强大的默认选择。 对于更简单的设置,Celery Executor 也能很好地工作——只需确保自动缩放、强大的日志记录和外部化的密钥。
Q5: 如何处理多个 Airflow DAG 之间的依赖关系?
当一个管道为另一个管道生成数据时,使用数据集进行声明式跨 DAG 触发。 或者,ExternalTaskSensor 可以协调运行,但数据集对于数据驱动的编排更简洁。