Comment utiliser Airflow : Un guide pratique de bout en bout pour construire des pipelines de données fiables
Si vous déplacez des données ou orchestrez des tâches de ML, vous avez probablement entendu la même rengaine : « Mets juste ça dans Airflow. » La vérité est qu'Apache Airflow excelle lorsque vous avez besoin de visibilité, de fiabilité et de contrôle sur des workflows complexes. Dans ce guide pratique, nous allons vous expliquer étape par étape comment utiliser Airflow – des concepts de base aux modèles prêts pour la production – afin que vous puissiez livrer des pipelines auxquels vous faites confiance.
Nous resterons pratiques : vous obtiendrez un modèle mental pour les DAG et les tâches, des exemples pratiques avec l'API TaskFlow, des options de déploiement, des stratégies de test et des meilleures pratiques. À la fin, vous passerez de « Je peux exécuter le tutoriel » à « Je peux exécuter ceci en production ».
Remarque : Pour des analyses plus approfondies et des références, la documentation officielle est excellente et régulièrement mise à jour.
Qu'est-ce qu'Apache Airflow, en réalité ?
Airflow est un orchestrateur – pas un processeur de données. Il planifie, ordonne et surveille le travail que vous exécutez ailleurs (bases de données, entrepôts de données, tâches Spark, API, conteneurs). Vous définissez les workflows comme des DAG (graphes orientés acycliques), qui sont simplement des fichiers Python qui encodent les tâches et leurs dépendances. Airflow exécute ensuite ces tâches selon votre planning, vos paramètres et votre environnement.
- DAG : La définition du workflow (graphe des tâches avec les dépendances).
- Tâche : Une unité de travail (fonction Python, exécution SQL, commande Bash, déclencheur de tâche externe, etc.).
- Opérateur : Un modèle pour un type de tâche (par exemple,
PythonOperator, BashOperator, KubernetesPodOperator).
- Planificateur : Décide quoi exécuter et quand.
- Exécuteur : Exécute les tâches (localement, avec Celery, Kubernetes, etc.).
- UI : Votre centre de contrôle pour les exécutions, les logs, les tentatives et la lignée.
Commencez avec les tutoriels officiels une fois que vous avez installé Airflow ; ils vous donnent rapidement une vue d'ensemble.
Installer et exécuter Airflow correctement
Airflow est flexible. Choisissez le chemin qui correspond à votre étape :
- Développement local (démarrage rapide) :
- Utilisez le Docker Compose de démarrage rapide fourni par le projet. Il lance le serveur web, le planificateur, la base de données et plus encore avec des paramètres par défaut corrects.
- Idéal pour apprendre et itérer sur les DAG.
- Petite équipe ou staging :
- Celery Executor ou Kubernetes Executor avec un Postgres géré.
- Stockez les logs dans S3/GCS et packagez les dépendances avec votre image ou
requirements.txt.
- Kubernetes Executor pour l'élasticité ou Celery Executor avec des workers à autoscaling.
- Secrets externes (Vault), une observabilité robuste (logs + métriques) et des déploiements blue/green pour les mises à niveau.
Conseil : Gardez votre codebase Airflow sous contrôle de version, conteneurisée et testée avant la promotion. La page « Meilleures pratiques » décrit les modèles prêts pour la production.
Concepts clés que vous utiliserez quotidiennement
DAG : Votre workflow sous forme de code
Un DAG est un fichier Python qui définit :
- Les métadonnées du DAG : id, planning, date de début, tags.
- Les arguments par défaut : retries, owners, SLAs.
- Les tâches et leurs dépendances.
Considérez un DAG comme le « quoi » et le « quand », et les tâches comme le « comment ».
Tâches et opérateurs
Les opérateurs sont des préfabriqués pour les tâches courantes. Exemples :
- PythonOperator / TaskFlow
@task pour le code Python
- BashOperator pour les commandes shell
- SimpleHttpOperator pour les API
- KubernetesPodOperator pour les tâches conteneurisées
- Fournisseurs SQL (par exemple, Snowflake, BigQuery, Postgres) pour le travail d'entrepôt de données
API TaskFlow : La manière moderne et Pythonique
L'API TaskFlow vous permet d'écrire des tâches en tant que fonctions Python avec @task, de renvoyer des valeurs qui passent via XCom, et de les composer proprement. Elle réduit le boilerplate et améliore la lisibilité – fortement recommandé.
Votre premier DAG Airflow (édition TaskFlow)
Ci-dessous, un exemple minimal de style ETL pour illustrer les idées clés : planning, TaskFlow, dépendances et passage de données XCom.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Planning, Catchup et Backfills
schedule : Cron ou préréglages (@daily, @hourly).
start_date + catchup : Si catchup=True, Airflow effectuera des backfills d'exécutions à partir de la date de début. Pour les pipelines de style streaming, définissez catchup=False.
- Backfills manuels : Utilisez l'UI ou la CLI pour relancer les intervalles historiques.
Règle pratique : activez le catchup pour les tâches batch déterministes ; désactivez-le pour les pipelines en temps réel ou à débit limité par l'API.
Passer des données entre les tâches (XCom) en toute sécurité
- Petits objets : les valeurs de retour avec TaskFlow sont parfaites.
- Gros payloads : stockez-les dans un stockage d'objets (S3/GCS) avec une clé dans XCom.
- Évitez les données sensibles dans XCom ; utilisez des backends de secrets (par exemple, Vault) et des variables d'environnement.
Mapping dynamique des tâches et Workloads Fan-out
Airflow peut générer des tâches dynamiquement au runtime en fonction des entrées – idéal pour les datasets partitionnés ou les tâches multi-tenant.
- Gardez les DAG déterministes et idempotents.
- Séparez l'orchestration (Airflow) du calcul (Spark, dbt, entrepôts de données).
- Utilisez l'API TaskFlow pour la clarté et l'hygiène XCom.
- Paramétrez les DAG ; utilisez les variables avec discernement.
- Surveillez, alertez et documentez vos pipelines.
Comment travailler avec les entrepôts de données et le ML
- Entrepôts de données : Utilisez les opérateurs de fournisseur (par exemple, SnowflakeOperator, BigQueryInsertJobOperator) pour les tâches SQL. Stockez le SQL dans des fichiers ou des modules versionnés.
- dbt : Déclenchez dbt via Bash/KubernetesPodOperator ou des opérateurs dbt dédiés dans les fournisseurs.
- ML : Orchestrez la génération de features, l'entraînement et l'inférence par lots en tant que tâches distinctes ; mettez en cache les artefacts dans le stockage et enregistrez les métriques.
Planning avancé : Datasets et dépendances inter-DAG
- Les Datasets permettent à un DAG de produire un dataset logique qui déclenche un autre DAG lors de sa mise à jour – plus propre que les déclencheurs ad hoc.
- Pour les modèles hérités, ExternalTaskSensor fonctionne, mais les Datasets sont plus déclaratifs.
Sécurité et conformité
- Utilisez le contrôle d'accès basé sur les rôles (RBAC) dans l'UI.
- Isolez les environnements par équipe ou limite de confiance.
- Conservez les pistes d'audit via les logs et l'historique des modifications de connexion.
Mises à niveau et versionnage
- Testez les mises à niveau en staging avec des workloads de type production.
- Épinglez et mettez à niveau les fournisseurs délibérément.
- Lisez les notes de version pour les changements et les dépréciations spécifiques à l'exécuteur.
Une checklist rapide pour votre premier DAG de production
- Propriété claire (tag
owner) et alertes configurées.
retries définis avec des backoffs raisonnables.
- Tâches idempotentes et dépendances explicites.
- Petits payloads XCom ; les données volumineuses dans le stockage.
- Logs envoyés vers un stockage durable ; métriques exportées.
- Plan de déploiement (canari ou blue/green) et étapes de rollback.
Exemple : Un DAG de chargement d'entrepôt de données réaliste
Ce modèle extrait les fichiers quotidiens, les valide et les charge dans une table d'entrepôt de données, avec un mapping dynamique par partition et des sensors différables.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Consultez les meilleures pratiques avant de passer en production.
- Explorez la documentation des fournisseurs pour vos systèmes (entrepôts de données, clouds, outils de ML).
Au fait : Accélérez la création avec un assistant IA
Il est bon de noter que si vous rédigez beaucoup de DAG, un assistant IA qui comprend le code peut accélérer le boilerplate, générer des stubs TaskFlow et même suggérer des corrections de dépendances. Si vous voulez une aide légère à côté de votre éditeur et de votre navigateur, Sider.AI peut être pratique pour des réécritures et des explications rapides de code pendant le développement. Principaux points à retenir
- Utilisez Airflow pour orchestrer, pas pour calculer.
- Préférez l'API TaskFlow pour des DAG propres et testables.
- Gardez les données hors de XCom ; passez plutôt des références.
- Utilisez des sensors/opérateurs différables pour économiser des slots.
- Conteneurisez, testez et promouvez à travers les environnements.
- Fiez-vous aux tutoriels officiels et aux meilleures pratiques comme votre étoile polaire.
FAQ
Q1 : Quelle est la façon la plus simple d'apprendre à utiliser Airflow ?
Commencez par le tutoriel officiel pour comprendre les DAG, les tâches, le planning et l'UI. Ensuite, construisez un petit pipeline basé sur TaskFlow et itérez avec le guide des meilleures pratiques pour la préparation à la production.
Q2 : Dois-je utiliser l'API TaskFlow ou les opérateurs classiques dans Airflow ?
Utilisez l'API TaskFlow pour la plupart des pipelines Pythoniques car elle est plus propre et gère naturellement les retours XCom. Les opérateurs classiques sont toujours excellents pour les tâches non-Python comme Bash, SQL ou les tâches de conteneur.
Q3 : Comment puis-je passer de grandes quantités de données entre les tâches Airflow ?
Évitez de mettre de gros payloads dans XCom. Stockez les données dans S3/GCS ou une base de données et ne passez que des références ou des URIs via XCom pour que les tâches restent rapides et fiables.
Q4 : Quel exécuteur dois-je choisir pour Airflow en production ?
Pour l'élasticité et l'isolation, Kubernetes Executor est un choix par défaut solide. Pour des configurations plus simples, Celery Executor fonctionne bien – assurez-vous juste de l'autoscaling, de logs robustes et de secrets externalisés.
Q5 : Comment puis-je gérer les dépendances entre plusieurs DAG Airflow ?
Utilisez les Datasets pour des déclencheurs inter-DAG déclaratifs lorsqu'un pipeline produit des données pour un autre. Alternativement, ExternalTaskSensor peut coordonner les exécutions, mais les Datasets sont plus propres pour l'orchestration basée sur les données.