Airflow の使い方:信頼性の高いデータパイプラインを構築するための実践的なエンドツーエンドガイド
データの移動や ML ジョブのオーケストレーションを行う場合、「Airflow に入れるだけ」という言葉をよく耳にするでしょう。実際、Apache Airflow は、複雑なワークフローの可視性、信頼性、制御が必要な場合に威力を発揮します。この実践的なガイドでは、Airflow の基本的な概念から本番環境に対応できるパターンまで、ステップバイステップで Airflow の使い方を説明し、信頼できるパイプラインを構築できるようにします。
実践的な内容に焦点を当てます。DAG とタスクのメンタルモデル、TaskFlow API を使用した実践的な例、デプロイオプション、テスト戦略、ベストプラクティスを紹介します。最終的には、「チュートリアルを実行できる」から「本番環境で実行できる」ようになります。
注:より詳細な情報やリファレンスについては、公式ドキュメントが優れており、定期的に更新されています。
Apache Airflow とは一体何か?
Airflow はオーケストレーターであり、データプロセッサーではありません。他の場所(データベース、ウェアハウス、Spark ジョブ、API、コンテナ)で実行する作業をスケジュール、順序付け、監視します。ワークフローは DAG(有向非巡回グラフ)として定義します。これは、タスクとその依存関係をエンコードした Python ファイルにすぎません。Airflow は、スケジュール、パラメータ、環境に従ってこれらのタスクを実行します。
- DAG:ワークフローの定義(依存関係を持つタスクのグラフ)。
- タスク:作業の単位(Python 関数、SQL 実行、Bash コマンド、外部ジョブトリガーなど)。
- Operator:タスクの種類ごとのテンプレート(例:
PythonOperator、BashOperator、KubernetesPodOperator)。
- Scheduler:何をいつ実行するかを決定します。
- Executor:タスクを実行します(ローカル、Celery、Kubernetes など)。
- UI:実行、ログ、再試行、リネージのコントロールセンター。
Airflow をインストールしたら、まず公式チュートリアルから始めてください。全体像をすばやく把握できます。
Airflow を正しくインストールして実行する
Airflow は柔軟です。段階に合わせてパスを選択してください。
- プロジェクトが提供するクイックスタート Docker Compose を使用します。Web サーバー、スケジューラー、データベースなどを適切なデフォルト設定で起動します。
- Celery Executor または Kubernetes Executor とマネージド Postgres。
- ログを S3/GCS に保存し、依存関係をイメージまたは
requirements.txt でパッケージ化します。
- 伸縮性が必要な場合は Kubernetes Executor、オートスケーリングワーカーが必要な場合は Celery Executor。
- 外部シークレット(Vault)、堅牢な可観測性(ログ + メトリクス)、アップグレードのためのブルー/グリーンデプロイ。
ヒント:Airflow コードベースをバージョン管理し、コンテナ化し、プロモーション前にテストしてください。「ベストプラクティス」ページでは、本番環境に対応できるパターンを紹介しています。
日常的に使用するコアコンセプト
DAG:コードとしてのワークフロー
DAG は、以下を定義する Python ファイルです。
- DAG メタデータ:id、スケジュール、開始日、タグ。
DAG を「何を」「いつ」行うか、タスクを「どのように」行うかと考えてください。
タスクと Operator
Operator は、一般的なタスクのプレハブです。例:
- Python コード用の PythonOperator / TaskFlow
@task
- API 用の SimpleHttpOperator
- コンテナ化されたジョブ用の KubernetesPodOperator
- ウェアハウス作業用の SQL プロバイダー(例:Snowflake、BigQuery、Postgres)
TaskFlow API:最新の Pythonic な方法
TaskFlow API を使用すると、@task を使用してタスクを Python 関数として記述し、XCom 経由で渡される値を返し、それらをきれいに構成できます。ボイラープレートが削減され、可読性が向上します。強くお勧めします。
最初の Airflow DAG(TaskFlow エディション)
以下は、スケジューリング、TaskFlow、依存関係、XCom データ渡しという重要な概念を示す最小限の ETL スタイルの例です。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
スケジューリング、Catchup、および Backfill
schedule:Cron またはプリセット(@daily、@hourly)。
start_date + catchup:catchup=True の場合、Airflow は開始日から遡って実行をバックフィルします。ストリーミングスタイルのパイプラインの場合は、catchup=False を設定します。
- 手動バックフィル:UI または CLI を使用して、過去の間隔を再実行します。
実践的な経験則:決定論的なバッチジョブの場合は catchup を有効にし、リアルタイムまたは API レート制限されたパイプラインの場合は無効にします。
タスク間でのデータ受け渡し(XCom)を安全に行う
- 小さなオブジェクト:TaskFlow での戻り値で問題ありません。
- 大きなペイロード:オブジェクトストレージ(S3/GCS)に XCom のキーとともに保存します。
- XCom での機密データを避けてください。シークレットバックエンド(例:Vault)と環境変数を使用します。
動的なタスクマッピングとファンアウトワークロード
Airflow は、入力に基づいて実行時にタスクを動的に生成できます。パーティション分割されたデータセットまたはマルチテナントジョブに最適です。
- オーケストレーション(Airflow)と計算(Spark、dbt、ウェアハウス)を分離します。
- TaskFlow API を使用して、明確さと XCom の衛生状態を保ちます。
- DAG をパラメータ化します。変数を慎重に使用します。
- パイプラインを監視、アラート、およびドキュメント化します。
データウェアハウスと ML の連携方法
- データウェアハウス:SQL ジョブには、プロバイダーオペレーター(例:SnowflakeOperator、BigQueryInsertJobOperator)を使用します。SQL をファイルまたはバージョン管理されたモジュールに保存します。
- dbt:Bash/KubernetesPodOperator またはプロバイダーの専用 dbt オペレーターを介して dbt をトリガーします。
- ML:特徴量生成、トレーニング、およびバッチ推論を個別のタスクとしてオーケストレーションします。ストレージにアーティファクトをキャッシュし、メトリクスをログに記録します。
高度なスケジューリング:データセットとクロス DAG 依存関係
- データセットを使用すると、ある DAG が論理データセットを生成し、更新時に別の DAG をトリガーできます。アドホックトリガーよりもクリーンです。
- レガシーパターンでは、ExternalTaskSensor が機能しますが、データセットの方がより宣言的です。
セキュリティとコンプライアンス
- UI でロールベースのアクセス制御(RBAC)を使用します。
アップグレードとバージョン管理
- 本番環境のようなワークロードでステージング環境でアップグレードをテストします。
- プロバイダーをピン留めして慎重にアップグレードします。
- エグゼキューター固有の変更と非推奨については、リリースノートをお読みください。
最初の本番環境 DAG のクイックチェックリスト
- 明確な所有権(
owner タグ)とアラートが構成されている。
- 適切なバックオフで
retries が設定されている。
- 小さな XCom ペイロード。ストレージ内の大きなデータ。
- 永続ストレージに送信されるログ。エクスポートされるメトリクス。
- ロールアウト計画(カナリアまたはブルー/グリーン)とロールバック手順。
例:現実的なウェアハウスロード DAG
このパターンは、毎日のファイルを抽出し、検証し、パーティションごとの動的マッピングと遅延可能なセンサーを使用して、ウェアハウステーブルにロードします。
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- 本番環境に移行する前に、ベストプラクティスを確認してください。
- システム(ウェアハウス、クラウド、ML ツール)のプロバイダーのドキュメントを参照してください。
ところで:AI サイドキックで作成をスピードアップ
注目すべき点:多くの DAG を作成する場合、コードを理解する AI アシスタントは、ボイラープレートを加速し、TaskFlow スタブを生成し、依存関係の修正を提案することもできます。エディターとブラウザーと一緒に軽量ヘルパーが必要な場合は、開発中に Sider.AI が簡単なコードの書き換えと説明に役立ちます。 主なポイント
- Airflow を使用してオーケストレーションを行い、計算は行わないでください。
- クリーンでテスト可能な DAG には、TaskFlow API を優先します。
- XCom からデータを削除し、代わりに参照を渡します。
- 遅延可能なセンサー/オペレーターを使用して、スロットを節約します。
- コンテナ化、テスト、および環境全体でプロモーションを行います。
- 公式チュートリアルとベストプラクティスを北極星として頼りにしてください。
FAQ
Q1:Airflow の使い方を学ぶ最も簡単な方法は何ですか?
公式チュートリアルから始めて、DAG、タスク、スケジューリング、および UI について理解してください。次に、TaskFlow ベースの小さなパイプラインを構築し、本番環境に対応するためのベストプラクティスガイドを使用して反復処理します。
Q2:Airflow で TaskFlow API と従来のオペレーターのどちらを使用する必要がありますか?
よりクリーンで XCom の戻り値を自然に処理するため、ほとんどの Pythonic パイプラインには TaskFlow API を使用してください。従来のオペレーターは、Bash、SQL、またはコンテナジョブなどの非 Python タスクにも最適です。
Q3:Airflow タスク間で大きなデータを渡すにはどうすればよいですか?
XCom に大きなペイロードを入れないでください。S3/GCS またはデータベースにデータを保存し、タスクを高速かつ信頼性の高い状態に保つために、XCom を介して参照または URI のみを渡します。
Q4:本番環境で Airflow にどのエグゼキューターを選択する必要がありますか?
伸縮性と分離のために、Kubernetes Executor が強力なデフォルトです。より単純なセットアップでは、Celery Executor がうまく機能します。オートスケーリング、堅牢なロギング、および外部化されたシークレットを必ず確認してください。
Q5:複数の Airflow DAG 間で依存関係を処理するにはどうすればよいですか?
あるパイプラインが別のパイプラインのデータを生成する場合、宣言型のクロス DAG トリガーにはデータセットを使用します。または、ExternalTaskSensor で実行を調整できますが、データ駆動型のオーケストレーションにはデータセットの方がクリーンです。