نحوه استفاده از Airflow: یک راهنمای عملی و جامع برای ساخت پایپلاینهای داده قابل اعتماد
اگر دادهها را جابهجا میکنید یا کارهای ML را هماهنگ میکنید، احتمالاً این عبارت را شنیدهاید: «فقط آن را در Airflow قرار دهید.» حقیقت این است که Apache Airflow زمانی میدرخشد که به دید، قابلیت اطمینان و کنترل بر گردشکارهای پیچیده نیاز دارید. در این راهنمای عملی، گام به گام نحوه استفاده از Airflow را بررسی خواهیم کرد—از مفاهیم اصلی گرفته تا الگوهای آماده برای تولید—تا بتوانید پایپلاینهایی را که به آنها اعتماد دارید، ارائه دهید.
ما این راهنما را عملی نگه خواهیم داشت: یک مدل ذهنی برای DAGها و تسکها، مثالهای عملی با TaskFlow API، گزینههای استقرار، استراتژیهای تست و بهترین شیوهها را دریافت خواهید کرد. در پایان، از «من میتوانم آموزش را اجرا کنم» به «من میتوانم این را در محیط عملیاتی اجرا کنم» خواهید رسید.
توجه: برای بررسیهای عمیقتر و مرجع، مستندات رسمی عالی و به طور منظم بهروزرسانی میشوند.
Apache Airflow واقعاً چیست؟
Airflow یک هماهنگکننده است—نه یک پردازشگر داده. این ابزار کار شما را که در جای دیگری اجرا میکنید (پایگاههای داده، انبارها، کارهای Spark، APIها، کانتینرها) زمانبندی، مرتب و نظارت میکند. شما گردشکارها را به عنوان DAG (نمودارهای جهتدار بدون دور) تعریف میکنید، که فقط فایلهای پایتون هستند که تسکها و وابستگیهای آنها را رمزگذاری میکنند. سپس Airflow این تسکها را مطابق با برنامه، پارامترها و محیط شما اجرا میکند.
- DAG: تعریف گردشکار (نمودار تسکها با وابستگیها).
- تسک: یک واحد کار (تابع پایتون، اجرای SQL، دستور Bash، تریگر کار خارجی و غیره).
- اپراتور: یک الگو برای نوعی از تسک (به عنوان مثال،
PythonOperator، BashOperator، KubernetesPodOperator).
- زمانبند: تصمیم میگیرد چه چیزی و چه زمانی اجرا شود.
- اجراکننده: تسکها را اجرا میکند (به صورت محلی، با Celery، Kubernetes و غیره).
- UI: مرکز کنترل شما برای اجراها، لاگها، تلاشهای مجدد و تبار.
پس از نصب Airflow، با آموزشهای رسمی شروع کنید. آنها به سرعت تصویر بزرگ را به شما نشان میدهند.
نصب و اجرای صحیح Airflow
Airflow انعطافپذیر است. مسیری را انتخاب کنید که با مرحله شما مطابقت دارد:
- از Docker Compose شروع سریع ارائه شده توسط پروژه استفاده کنید. این ابزار وبسرور، زمانبند، پایگاه داده و موارد دیگر را با تنظیمات پیشفرض مناسب راهاندازی میکند.
- عالی برای یادگیری و تکرار DAGها.
- Celery Executor یا Kubernetes Executor با Postgres مدیریتشده.
- لاگها را در S3/GCS ذخیره کنید و وابستگیها را با تصویر یا
requirements.txt خود بستهبندی کنید.
- Kubernetes Executor برای الاستیسیته یا Celery Executor با کارگران مقیاسپذیر خودکار.
- اسرار خارجی (Vault)، قابلیت مشاهده قوی (لاگها + متریکها) و استقرارهای آبی/سبز برای ارتقاء.
نکته: قبل از ارتقاء، کدبیس Airflow خود را کنترل نسخه، کانتینریزه و تست کنید. صفحه «بهترین شیوهها» الگوهای آماده برای تولید را تشریح میکند.
مفاهیم اصلی که روزانه از آنها استفاده خواهید کرد
DAGها: گردشکار شما به عنوان کد
یک DAG یک فایل پایتون است که موارد زیر را تعریف میکند:
- فراداده DAG: شناسه، برنامه، تاریخ شروع، تگها.
- آرگومانهای پیشفرض: تلاشهای مجدد، صاحبان، SLAها.
- تسکها و وابستگیهای آنها.
به یک DAG به عنوان «چه چیزی» و «چه زمانی» و به تسکها به عنوان «چگونه» فکر کنید.
تسکها و اپراتورها
اپراتورها پیشساختههایی برای تسکهای رایج هستند. مثالها:
- PythonOperator / TaskFlow
@task برای کد پایتون
- BashOperator برای دستورات شل
- SimpleHttpOperator برای APIها
- KubernetesPodOperator برای کارهای کانتینریزه شده
- ارائهدهندگان SQL (به عنوان مثال، Snowflake، BigQuery، Postgres) برای کار انبار
TaskFlow API: روش مدرن و پایتونیک
TaskFlow API به شما امکان میدهد تسکها را به عنوان توابع پایتون با @task بنویسید، مقادیری را برگردانید که از طریق XCom منتقل میشوند و آنها را به طور تمیز ترکیب کنید. این API کد تکراری را کاهش میدهد و خوانایی را بهبود میبخشد—به شدت توصیه میشود.
اولین DAG Airflow شما (ویرایش TaskFlow)
در زیر یک مثال حداقلی به سبک ETL برای نشان دادن ایدههای کلیدی آورده شده است: زمانبندی، TaskFlow، وابستگیها و انتقال داده XCom.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
زمانبندی، Catchup و Backfillها
schedule: Cron یا تنظیمات از پیش تعیین شده (@daily، @hourly).
start_date + catchup: اگر catchup=True، Airflow اجراها را از تاریخ شروع backfill میکند. برای پایپلاینهای سبک استریمینگ، catchup=False را تنظیم کنید.
- Backfillهای دستی: از UI یا CLI برای اجرای مجدد بازههای تاریخی استفاده کنید.
قانون سرانگشتی عملی: catchup را برای کارهای دستهای قطعی فعال کنید. برای پایپلاینهای محدود شده با نرخ API یا بیدرنگ، آن را غیرفعال کنید.
انتقال ایمن داده بین تسکها (XCom)
- اشیاء کوچک: مقادیر بازگشتی با TaskFlow خوب هستند.
- بارهای بزرگ: در فضای ذخیرهسازی اشیاء (S3/GCS) با یک کلید در XCom ذخیره کنید.
- از دادههای حساس در XCom اجتناب کنید. از بکاندهای secrets (به عنوان مثال، Vault) و متغیرهای محیطی استفاده کنید.
نگاشت تسک پویا و بارهای کاری Fan-out
Airflow میتواند تسکها را به صورت پویا در زمان اجرا بر اساس ورودیها تولید کند—ایدهآل برای مجموعهدادههای پارتیشنبندی شده یا کارهای چند مستأجری.
- DAGها را قطعی و idempotent نگه دارید.
- هماهنگسازی (Airflow) را از محاسبات (Spark، dbt، انبارها) جدا کنید.
- از TaskFlow API برای وضوح و بهداشت XCom استفاده کنید.
- DAGها را پارامتری کنید. از متغیرها با احتیاط استفاده کنید.
- پایپلاینهای خود را نظارت، هشدار و مستند کنید.
نحوه کار با انبارهای داده و ML
- انبارهای داده: از اپراتورهای ارائهدهنده (به عنوان مثال، SnowflakeOperator، BigQueryInsertJobOperator) برای کارهای SQL استفاده کنید. SQL را در فایلها یا ماژولهای دارای نسخه ذخیره کنید.
- dbt: dbt را از طریق Bash/KubernetesPodOperator یا اپراتورهای اختصاصی dbt در ارائهدهندگان تریگر کنید.
- ML: تولید ویژگی، آموزش و استنتاج دستهای را به عنوان تسکهای جداگانه هماهنگ کنید. مصنوعات را در فضای ذخیرهسازی کش کنید و متریکها را ثبت کنید.
زمانبندی پیشرفته: مجموعهدادهها و وابستگیهای بین DAG
- مجموعهدادهها به یک DAG اجازه میدهند تا یک مجموعهداده منطقی تولید کند که هنگام بهروزرسانی، DAG دیگری را تریگر میکند—تمیزتر از تریگرهای موردی.
- برای الگوهای قدیمی، ExternalTaskSensor کار میکند، اما مجموعهدادهها اعلانیتر هستند.
امنیت و انطباق
- از کنترل دسترسی مبتنی بر نقش (RBAC) در UI استفاده کنید.
- محیطها را بر اساس تیم یا مرز اعتماد جدا کنید.
- از طریق لاگها و تاریخچه تغییر اتصال، مسیرهای ممیزی را نگه دارید.
ارتقاءها و نسخهسازی
- ارتقاءها را در مرحلهبندی با بارهای کاری شبیه به تولید تست کنید.
- ارائهدهندگان را به طور عمدی پین و ارتقا دهید.
- یادداشتهای انتشار را برای تغییرات و منسوخ شدنهای خاص اجراکننده بخوانید.
یک چکلیست سریع برای اولین DAG تولید شما
- مالکیت واضح (تگ
owner) و هشدارهای پیکربندی شده.
retries با backoffهای منطقی تنظیم شده است.
- تسکهای Idempotent و وابستگیهای صریح.
- بارهای XCom کوچک؛ دادههای بزرگ در فضای ذخیرهسازی.
- لاگها به فضای ذخیرهسازی بادوام ارسال میشوند. متریکها صادر میشوند.
- برنامه استقرار (canary یا آبی/سبز) و مراحل بازگشت.
مثال: یک DAG بارگیری انبار واقعی
این الگو فایلهای روزانه را استخراج میکند، آنها را اعتبارسنجی میکند و آنها را در یک جدول انبار بارگیری میکند، با نگاشت پویا در هر پارتیشن و سنسورهای قابل تعویق.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- قبل از ارتقاء به تولید، بهترین شیوهها را بررسی کنید.
- مستندات ارائهدهنده را برای سیستمهای خود (انبارها، ابرها، ابزارهای ML) بررسی کنید.
به هر حال: سرعت نوشتن را با یک دستیار هوش مصنوعی افزایش دهید
شایان ذکر است: اگر DAGهای زیادی را پیشنویس میکنید، یک دستیار هوش مصنوعی که کد را درک میکند میتواند کد تکراری را تسریع کند، stubهای TaskFlow را تولید کند و حتی اصلاحات وابستگی را پیشنهاد دهد. اگر یک کمککننده سبک وزن در کنار ویرایشگر و مرورگر خود میخواهید، Sider.AI میتواند برای بازنویسی و توضیحات سریع کد در طول توسعه مفید باشد. نکات کلیدی
- از Airflow برای هماهنگسازی استفاده کنید، نه محاسبه.
- TaskFlow API را برای DAGهای تمیز و قابل آزمایش ترجیح دهید.
- دادهها را از XCom خارج نگه دارید. به جای آن، مراجع را ارسال کنید.
- از سنسورها/اپراتورهای قابل تعویق برای صرفهجویی در اسلاتها استفاده کنید.
- کانتینریزه کنید، تست کنید و از طریق محیطها ارتقا دهید.
- به آموزشهای رسمی و بهترین شیوهها به عنوان ستاره شمالی خود تکیه کنید.
سوالات متداول
Q1: سادهترین راه برای یادگیری نحوه استفاده از Airflow چیست؟
با آموزش رسمی شروع کنید تا DAGها، تسکها، زمانبندی و UI را درک کنید. سپس یک پایپلاین کوچک مبتنی بر TaskFlow بسازید و با راهنمای بهترین شیوهها برای آمادگی تولید تکرار کنید.
Q2: آیا باید از TaskFlow API یا اپراتورهای کلاسیک در Airflow استفاده کنم؟
از TaskFlow API برای بیشتر پایپلاینهای پایتونیک استفاده کنید زیرا تمیزتر است و بازگشتهای XCom را به طور طبیعی مدیریت میکند. اپراتورهای کلاسیک هنوز برای تسکهای غیر پایتون مانند Bash، SQL یا کارهای کانتینری عالی هستند.
Q3: چگونه دادههای بزرگ را بین تسکهای Airflow منتقل کنم؟
از قرار دادن بارهای بزرگ در XCom خودداری کنید. دادهها را در S3/GCS یا یک پایگاه داده ذخیره کنید و فقط مراجع یا URIها را از طریق XCom ارسال کنید تا تسکها سریع و قابل اعتماد بمانند.
Q4: چه اجراکنندهای را باید برای Airflow در تولید انتخاب کنم؟
برای الاستیسیته و انزوا، Kubernetes Executor یک پیشفرض قوی است. برای تنظیمات سادهتر، Celery Executor به خوبی کار میکند—فقط از مقیاسبندی خودکار، ثبت قوی و اسرار برونسپاری شده اطمینان حاصل کنید.
Q5: چگونه وابستگیها را در چندین DAG Airflow مدیریت کنم؟
هنگامی که یک پایپلاین دادهها را برای دیگری تولید میکند، از مجموعهدادهها برای تریگرهای بین DAG اعلانی استفاده کنید. از طرف دیگر، ExternalTaskSensor میتواند اجراها را هماهنگ کند، اما مجموعهدادهها برای هماهنگسازی مبتنی بر داده تمیزتر هستند.