Jak używać Airflow: Praktyczny, kompleksowy przewodnik po budowaniu niezawodnych potoków danych
Jeśli przesyłasz dane lub orkiestrujesz zadania ML, prawdopodobnie słyszałeś to samo: „Po prostu umieść to w Airflow”. Prawda jest taka, że Apache Airflow błyszczy, gdy potrzebujesz widoczności, niezawodności i kontroli nad złożonymi przepływami pracy. W tym praktycznym przewodniku krok po kroku pokażemy, jak używać Airflow — od podstawowych koncepcji po wzorce gotowe do produkcji — abyś mógł dostarczać potoki, którym ufasz.
Będziemy działać praktycznie: otrzymasz model mentalny dla DAG-ów i zadań, praktyczne przykłady z TaskFlow API, opcje wdrażania, strategie testowania i najlepsze praktyki. Na koniec przejdziesz od „Mogę uruchomić samouczek” do „Mogę to uruchomić w środowisku produkcyjnym”.
Uwaga: Aby uzyskać bardziej szczegółowe informacje i odniesienia, oficjalna dokumentacja jest doskonała i regularnie aktualizowana.
Czym tak naprawdę jest Apache Airflow?
Airflow to orkiestrator — a nie procesor danych. Planuje, porządkuje i monitoruje pracę, którą wykonujesz gdzie indziej (bazy danych, hurtownie danych, zadania Spark, API, kontenery). Definiujesz przepływy pracy jako DAG-i (Directed Acyclic Graphs), które są po prostu plikami Python, które kodują zadania i ich zależności. Airflow następnie wykonuje te zadania zgodnie z Twoim harmonogramem, parametrami i środowiskiem.
- DAG: Definicja przepływu pracy (graf zadań z zależnościami).
- Zadanie: Jednostka pracy (funkcja Python, wykonanie SQL, polecenie Bash, wyzwalacz zadania zewnętrznego itp.).
- Operator: Szablon dla rodzaju zadania (np.
PythonOperator, BashOperator, KubernetesPodOperator).
- Scheduler: Decyduje, co i kiedy uruchomić.
- Executor: Uruchamia zadania (lokalnie, z Celery, Kubernetes itp.).
- UI: Twoje centrum kontroli dla uruchomień, logów, ponowień i pochodzenia.
Po zainstalowaniu Airflow zacznij od oficjalnych samouczków; szybko dają one ogólny obraz.
Instalacja i uruchamianie Airflow we właściwy sposób
Airflow jest elastyczny. Wybierz ścieżkę, która pasuje do Twojego etapu:
- Lokalne środowisko programistyczne (szybki start):
- Użyj szybkiego startu Docker Compose dostarczonego przez projekt. Uruchamia on serwer WWW, harmonogram, bazę danych i inne elementy z rozsądnymi ustawieniami domyślnymi.
- Świetne do nauki i iteracji na DAG-ach.
- Mały zespół lub środowisko przejściowe:
- Celery Executor lub Kubernetes Executor z zarządzanym Postgres.
- Przechowuj logi w S3/GCS i pakuj zależności za pomocą obrazu lub
requirements.txt.
- Kubernetes Executor dla elastyczności lub Celery Executor z automatycznym skalowaniem workerów.
- Zewnętrzne sekrety (Vault), solidna obserwowalność (logi + metryki) i wdrożenia blue/green do aktualizacji.
Wskazówka: Utrzymuj bazę kodu Airflow pod kontrolą wersji, konteneryzowaną i przetestowaną przed promocją. Strona „Najlepsze praktyki” przedstawia wzorce gotowe do produkcji.
Podstawowe koncepcje, których będziesz używać codziennie
DAG-i: Twój przepływ pracy jako kod
DAG to plik Python, który definiuje:
- Metadane DAG: id, harmonogram, data rozpoczęcia, tagi.
- Domyślne argumenty: ponowienia, właściciele, SLA.
- Zadania i ich zależności.
Pomyśl o DAG jako o „co” i „kiedy”, a o zadaniach jako o „jak”.
Zadania i operatory
Operatory to prefabrykaty dla typowych zadań. Przykłady:
@task / TaskFlow @task dla kodu Python
- BashOperator dla poleceń shell
- SimpleHttpOperator dla API
- KubernetesPodOperator dla zadań w kontenerach
- Dostawcy SQL (np. Snowflake, BigQuery, Postgres) do pracy z hurtownią danych
TaskFlow API: Nowoczesny, Pythonowy sposób
TaskFlow API pozwala pisać zadania jako funkcje Python z @task, zwracać wartości, które są przekazywane przez XCom, i komponować je w czysty sposób. Zmniejsza ilość boilerplate i poprawia czytelność — gorąco polecane.
Twój pierwszy Airflow DAG (edycja TaskFlow)
Poniżej znajduje się minimalny przykład w stylu ETL, który ilustruje kluczowe idee: planowanie, TaskFlow, zależności i przekazywanie danych XCom.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
Planowanie, Catchup i Backfille
schedule: Cron lub presety (@daily, @hourly).
start_date + catchup: Jeśli catchup=True, Airflow wykona backfill od daty rozpoczęcia. Dla potoków w stylu przesyłania strumieniowego ustaw catchup=False.
- Ręczne backfille: Użyj UI lub CLI, aby ponownie uruchomić historyczne interwały.
Praktyczna zasada: włącz catchup dla deterministycznych zadań wsadowych; wyłącz dla potoków czasu rzeczywistego lub API z ograniczeniem szybkości.
Bezpieczne przekazywanie danych między zadaniami (XCom)
- Małe obiekty: zwracane wartości z TaskFlow są w porządku.
- Duże ładunki: przechowuj w pamięci masowej obiektów (S3/GCS) z kluczem w XCom.
- Unikaj wrażliwych danych w XCom; używaj backendów sekretów (np. Vault) i zmiennych środowiskowych.
Dynamiczne mapowanie zadań i workloady fan-out
Airflow może generować zadania dynamicznie w czasie wykonywania na podstawie danych wejściowych — idealne dla partycjonowanych zbiorów danych lub zadań multi-tenant.
- Utrzymuj DAG-i deterministyczne i idempotentne.
- Oddziel orkiestrację (Airflow) od obliczeń (Spark, dbt, hurtownie danych).
- Używaj TaskFlow API dla przejrzystości i higieny XCom.
- Parametryzuj DAG-i; używaj zmiennych rozważnie.
- Monitoruj, ostrzegaj i dokumentuj swoje potoki.
Jak pracować z hurtowniami danych i ML
- Hurtownie danych: Używaj operatorów dostawcy (np. SnowflakeOperator, BigQueryInsertJobOperator) dla zadań SQL. Przechowuj SQL w plikach lub modułach z kontrolą wersji.
- dbt: Wyzwalaj dbt za pomocą Bash/KubernetesPodOperator lub dedykowanych operatorów dbt u dostawców.
- ML: Orkiestruj generowanie cech, trenowanie i wnioskowanie wsadowe jako oddzielne zadania; przechowuj artefakty w pamięci masowej i rejestruj metryki.
Zaawansowane planowanie: Zbiory danych i zależności między DAG-ami
- Zbiory danych pozwalają jednemu DAG-owi produkować logiczny zbiór danych, który wyzwala inny DAG po aktualizacji — czystsze niż doraźne wyzwalacze.
- Dla starszych wzorców ExternalTaskSensor działa, ale zbiory danych są bardziej deklaratywne.
Bezpieczeństwo i zgodność
- Używaj kontroli dostępu opartej na rolach (RBAC) w UI.
- Izoluj środowiska dla każdego zespołu lub granicy zaufania.
- Utrzymuj ścieżki audytu za pomocą logów i historii zmian połączeń.
Aktualizacje i wersjonowanie
- Testuj aktualizacje w środowisku przejściowym z workloadami podobnymi do produkcyjnych.
- Przypinaj i aktualizuj dostawców rozważnie.
- Przeczytaj informacje o wersji, aby dowiedzieć się o zmianach i wycofaniach specyficznych dla executorów.
Szybka lista kontrolna dla Twojego pierwszego produkcyjnego DAG
- Wyraźne określenie właściciela (tag
owner) i skonfigurowane alerty.
- Ustawione
retries z rozsądnymi wycofaniami.
- Idempotentne zadania i jawne zależności.
- Małe ładunki XCom; duże dane w pamięci masowej.
- Logi wysyłane do trwałej pamięci masowej; metryki eksportowane.
- Plan wdrożenia (kanarkowy lub blue/green) i kroki wycofania.
Przykład: Realistyczny DAG ładowania hurtowni danych
Ten wzorzec wyodrębnia codzienne pliki, sprawdza je i ładuje do tabeli hurtowni danych, z dynamicznym mapowaniem na partycję i odraczalnymi sensorami.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- Przejrzyj najlepsze praktyki przed promocją do produkcji.
- Przejrzyj dokumentację dostawcy dla swoich systemów (hurtownie danych, chmury, narzędzia ML).
Przy okazji: Przyspiesz tworzenie dzięki pomocnikowi AI
Warto zauważyć: jeśli tworzysz wiele DAG-ów, asystent AI, który rozumie kod, może przyspieszyć boilerplate, generować stuby TaskFlow, a nawet sugerować poprawki zależności. Jeśli chcesz mieć lekkiego pomocnika obok edytora i przeglądarki, Sider.AI może być przydatny do szybkich przepisów kodu i wyjaśnień podczas programowania. Kluczowe wnioski
- Używaj Airflow do orkiestracji, a nie do obliczeń.
- Preferuj TaskFlow API dla czystych, testowalnych DAG-ów.
- Trzymaj dane z dala od XCom; zamiast tego przekazuj odniesienia.
- Używaj odraczalnych sensorów/operatorów, aby oszczędzać sloty.
- Konteneryzuj, testuj i promuj przez środowiska.
- Polegaj na oficjalnych samouczkach i najlepszych praktykach jako na swojej gwieździe polarnej.
FAQ
P1: Jaki jest najłatwiejszy sposób, aby nauczyć się, jak używać Airflow?
Zacznij od oficjalnego samouczka, aby zrozumieć DAG-i, zadania, planowanie i UI. Następnie zbuduj mały potok oparty na TaskFlow i iteruj z przewodnikiem po najlepszych praktykach, aby przygotować się do produkcji.
P2: Czy powinienem używać TaskFlow API, czy klasycznych operatorów w Airflow?
Używaj TaskFlow API dla większości potoków Pythonowych, ponieważ jest czystsze i naturalnie obsługuje zwroty XCom. Klasyczne operatory są nadal świetne dla zadań innych niż Python, takich jak Bash, SQL lub zadania kontenerowe.
P3: Jak przekazywać duże dane między zadaniami Airflow?
Unikaj umieszczania dużych ładunków w XCom. Przechowuj dane w S3/GCS lub bazie danych i przekazuj tylko odniesienia lub URI przez XCom, aby zadania były szybkie i niezawodne.
P4: Jakiego executora powinienem wybrać dla Airflow w produkcji?
Dla elastyczności i izolacji Kubernetes Executor jest silnym domyślnym wyborem. Dla prostszych konfiguracji Celery Executor działa dobrze — po prostu zapewnij automatyczne skalowanie, solidne logowanie i eksternalizowane sekrety.
P5: Jak obsługiwać zależności między wieloma DAG-ami Airflow?
Używaj zbiorów danych do deklaratywnych wyzwalaczy między DAG-ami, gdy jeden potok produkuje dane dla innego. Alternatywnie, ExternalTaskSensor może koordynować uruchomienia, ale zbiory danych są czystsze dla orkiestracji opartej na danych.