Airflow 사용법: 안정적인 데이터 파이프라인 구축을 위한 실용적인 엔드 투 엔드 가이드
데이터를 이동하거나 ML 작업을 오케스트레이션하는 경우, “그냥 Airflow에 넣어.”라는 말을 자주 들었을 것입니다. 사실, Apache Airflow는 복잡한 워크플로우에 대한 가시성, 안정성 및 제어가 필요할 때 빛을 발합니다. 이 실용적인 가이드에서는 핵심 개념부터 프로덕션 환경에 바로 적용 가능한 패턴까지 Airflow 사용법을 단계별로 안내하여 신뢰할 수 있는 파이프라인을 구축할 수 있도록 돕겠습니다.
실용적인 내용 위주로 진행됩니다. DAG 및 작업에 대한 멘탈 모델, TaskFlow API를 사용한 실습 예제, 배포 옵션, 테스팅 전략 및 모범 사례를 배우게 됩니다. 이 가이드가 끝나면 “튜토리얼은 실행할 수 있어.”에서 “프로덕션 환경에서 실행할 수 있어.”로 실력이 향상될 것입니다.
참고: 더 자세한 내용과 참조 자료는 공식 문서를 참조하십시오. 훌륭하고 정기적으로 업데이트됩니다.
Apache Airflow, 대체 무엇일까요?
Airflow는 데이터 프로세서가 아닌 오케스트레이터입니다. 데이터베이스, 웨어하우스, Spark 작업, API, 컨테이너 등 다른 곳에서 실행하는 작업을 예약, 정렬 및 모니터링합니다. 워크플로우를 DAG(Directed Acyclic Graphs, 방향성 비순환 그래프)로 정의합니다. DAG는 작업과 해당 종속성을 인코딩하는 Python 파일일 뿐입니다. Airflow는 스케줄, 매개변수 및 환경에 따라 이러한 작업을 실행합니다.
- DAG: 워크플로우 정의 (종속성이 있는 작업 그래프).
- 작업: 작업 단위 (Python 함수, SQL 실행, Bash 명령어, 외부 작업 트리거 등).
- Operator: 작업 종류에 대한 템플릿 (예:
PythonOperator, BashOperator, KubernetesPodOperator).
- Scheduler: 무엇을 언제 실행할지 결정합니다.
- Executor: 작업을 실행합니다 (로컬, Celery, Kubernetes 등 사용).
- UI: 실행, 로그, 재시도 및 계보를 위한 제어 센터입니다.
Airflow를 설치한 후 공식 튜토리얼부터 시작하십시오. 전체적인 그림을 빠르게 파악할 수 있습니다.
Airflow를 올바르게 설치하고 실행하는 방법
Airflow는 유연합니다. 단계에 맞는 경로를 선택하십시오.
- 프로젝트에서 제공하는 빠른 시작 Docker Compose를 사용하십시오. 웹 서버, 스케줄러, 데이터베이스 등을 합리적인 기본 설정으로 시작합니다.
- 관리형 Postgres가 있는 Celery Executor 또는 Kubernetes Executor.
- 로그를 S3/GCS에 저장하고 이미지 또는
requirements.txt로 종속성을 패키징합니다.
- 탄력성을 위한 Kubernetes Executor 또는 Autoscaling worker가 있는 Celery Executor.
- 외부 Secrets (Vault), 강력한 관측성 (로그 + 메트릭) 및 업그레이드를 위한 Blue/Green 배포.
팁: Airflow 코드베이스를 버전 관리하고, 컨테이너화하고, 프로모션 전에 테스트하십시오. “모범 사례” 페이지에는 프로덕션 환경에 바로 적용 가능한 패턴이 요약되어 있습니다.
매일 사용할 핵심 개념
DAG: 코드로 작성하는 워크플로우
DAG는 다음을 정의하는 Python 파일입니다.
- DAG 메타데이터: ID, 스케줄, 시작 날짜, 태그.
DAG를 “무엇” 및 “언제”로, 작업을 “어떻게”로 생각하십시오.
작업 및 Operator
Operator는 일반적인 작업을 위한 프리팹입니다. 예:
- Python 코드용
@task가 있는 PythonOperator / TaskFlow
- 컨테이너화된 작업용 KubernetesPodOperator
- 웨어하우스 작업용 SQL 제공자 (예: Snowflake, BigQuery, Postgres)
TaskFlow API: 현대적인 Python 방식
TaskFlow API를 사용하면 @task가 있는 Python 함수로 작업을 작성하고, XCom을 통해 전달되는 값을 반환하고, 깔끔하게 구성할 수 있습니다. 상용구 코드를 줄이고 가독성을 향상시킵니다. 적극 권장합니다.
첫 번째 Airflow DAG (TaskFlow 에디션)
다음은 주요 아이디어를 설명하기 위한 최소한의 ETL 스타일 예제입니다. 스케줄링, TaskFlow, 종속성 및 XCom 데이터 전달.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
스케줄링, Catchup 및 백필
schedule: Cron 또는 프리셋 (@daily, @hourly).
start_date + catchup: catchup=True인 경우, Airflow는 시작 날짜부터 백필을 실행합니다. 스트리밍 스타일 파이프라인의 경우, catchup=False로 설정하십시오.
- 수동 백필: UI 또는 CLI를 사용하여 과거 간격을 다시 실행합니다.
실용적인 경험 법칙: 결정적인 배치 작업에는 Catchup을 활성화하고, 실시간 또는 API 속도 제한 파이프라인에는 비활성화하십시오.
작업 간에 데이터 안전하게 전달 (XCom)
- 작은 객체: TaskFlow를 사용하여 값을 반환해도 괜찮습니다.
- 큰 페이로드: XCom의 키를 사용하여 객체 스토리지 (S3/GCS)에 저장합니다.
- XCom에서 중요한 데이터를 피하십시오. Secrets 백엔드 (예: Vault) 및 환경 변수를 사용하십시오.
동적 작업 매핑 및 팬아웃 워크로드
Airflow는 입력에 따라 런타임에 동적으로 작업을 생성할 수 있습니다. 파티션된 데이터 세트 또는 멀티 테넌트 작업에 이상적입니다.
- 오케스트레이션 (Airflow)을 계산 (Spark, dbt, 웨어하우스)과 분리하십시오.
- 명확성을 위해 TaskFlow API를 사용하고 XCom을 정리하십시오.
- DAG를 매개변수화하십시오. 변수를 신중하게 사용하십시오.
- 파이프라인을 모니터링, 경고 및 문서화하십시오.
데이터 웨어하우스 및 ML 작업 방법
- 데이터 웨어하우스: SQL 작업에 제공자 Operator (예: SnowflakeOperator, BigQueryInsertJobOperator)를 사용하십시오. SQL을 파일 또는 버전 관리된 모듈에 저장하십시오.
- dbt: Bash/KubernetesPodOperator 또는 제공자의 전용 dbt Operator를 통해 dbt를 트리거하십시오.
- ML: 기능 생성, 학습 및 배치 추론을 별도의 작업으로 오케스트레이션하십시오. 스토리지에 아티팩트를 캐싱하고 메트릭을 로깅하십시오.
고급 스케줄링: 데이터 세트 및 DAG 간 종속성
- 데이터 세트를 사용하면 하나의 DAG가 업데이트될 때 다른 DAG를 트리거하는 논리적 데이터 세트를 생성할 수 있습니다. 임시 트리거보다 깔끔합니다.
- 레거시 패턴의 경우, ExternalTaskSensor가 작동하지만 데이터 세트가 더 선언적입니다.
보안 및 규정 준수
- UI에서 역할 기반 액세스 제어 (RBAC)를 사용하십시오.
- 로그 및 연결 변경 기록을 통해 감사 추적을 유지하십시오.
업그레이드 및 버전 관리
- 프로덕션과 유사한 워크로드로 스테이징에서 업그레이드를 테스트하십시오.
- 제공자를 고정하고 신중하게 업그레이드하십시오.
- Executor 관련 변경 사항 및 더 이상 사용되지 않는 기능에 대한 릴리스 노트를 읽으십시오.
첫 번째 프로덕션 DAG를 위한 빠른 체크리스트
- 명확한 소유권 (
owner 태그) 및 경고가 구성되었습니다.
- 합리적인 백오프를 사용하여
retries가 설정되었습니다.
- 작은 XCom 페이로드, 스토리지의 큰 데이터.
- 내구성이 뛰어난 스토리지로 전송된 로그, 내보낸 메트릭.
- 롤아웃 계획 (카나리아 또는 Blue/Green) 및 롤백 단계.
예: 실제 웨어하우스 로드 DAG
이 패턴은 매일 파일을 추출하고, 유효성을 검사하고, 파티션별 동적 매핑 및 연기 가능한 센서를 사용하여 웨어하우스 테이블에 로드합니다.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- 프로덕션으로 승격하기 전에 모범 사례를 검토하십시오.
- 시스템 (웨어하우스, 클라우드, ML 도구)에 대한 제공자 문서를 살펴보십시오.
참고: AI 사이드킥으로 작성 속도 향상
언급할 가치가 있습니다. 많은 DAG를 작성하는 경우 코드를 이해하는 AI 도우미는 상용구 코드를 가속화하고, TaskFlow 스텁을 생성하고, 종속성 수정 사항을 제안할 수도 있습니다. 편집기 및 브라우저와 함께 가벼운 도우미가 필요한 경우, 개발 중에 빠른 코드 재작성 및 설명을 위해 Sider.AI가 유용할 수 있습니다. 주요 내용
- Airflow를 사용하여 오케스트레이션하고 계산하지 마십시오.
- 깔끔하고 테스트 가능한 DAG를 위해 TaskFlow API를 선호하십시오.
- XCom에서 데이터를 제외하고 대신 참조를 전달하십시오.
- 연기 가능한 센서/Operator를 사용하여 슬롯을 절약하십시오.
- 컨테이너화, 테스트 및 환경을 통해 승격하십시오.
- 공식 튜토리얼과 모범 사례를 북극성으로 삼으십시오.
FAQ
Q1:Airflow 사용법을 배우는 가장 쉬운 방법은 무엇입니까?
공식 튜토리얼부터 시작하여 DAG, 작업, 스케줄링 및 UI를 이해하십시오. 그런 다음 작은 TaskFlow 기반 파이프라인을 구축하고 프로덕션 준비를 위해 모범 사례 가이드로 반복하십시오.
Q2:Airflow에서 TaskFlow API 또는 기존 Operator를 사용해야 합니까?
더 깔끔하고 XCom 반환을 자연스럽게 처리하기 때문에 대부분의 Python 파이프라인에 TaskFlow API를 사용하십시오. 기존 Operator는 Bash, SQL 또는 컨테이너 작업과 같은 비 Python 작업에 여전히 좋습니다.
Q3:Airflow 작업 간에 큰 데이터를 어떻게 전달합니까?
XCom에 큰 페이로드를 넣지 마십시오. S3/GCS 또는 데이터베이스에 데이터를 저장하고 작업 속도를 높이고 안정적으로 유지하기 위해 XCom을 통해 참조 또는 URI만 전달하십시오.
Q4:프로덕션에서 Airflow에 어떤 Executor를 선택해야 합니까?
탄력성과 격리를 위해 Kubernetes Executor가 강력한 기본값입니다. 더 간단한 설정의 경우 Celery Executor가 잘 작동합니다. 자동 스케일링, 강력한 로깅 및 외부화된 Secrets를 확인하십시오.
Q5:여러 Airflow DAG에서 종속성을 어떻게 처리합니까?
한 파이프라인이 다른 파이프라인에 대한 데이터를 생성할 때 선언적 교차 DAG 트리거에 데이터 세트를 사용하십시오. 또는 ExternalTaskSensor가 실행을 조정할 수 있지만 데이터 세트는 데이터 기반 오케스트레이션에 더 깔끔합니다.