كيفية استخدام Airflow: دليل عملي وشامل لبناء خطوط أنابيب بيانات موثوقة
إذا كنت تنقل البيانات أو تنظم مهام تعلم الآلة، فمن المحتمل أنك سمعت نفس العبارة: "فقط ضعها في Airflow". الحقيقة هي أن Apache Airflow يتألق عندما تحتاج إلى رؤية وموثوقية وتحكم في سير العمل المعقد. في هذا الدليل العملي، سنشرح خطوة بخطوة كيفية استخدام Airflow - من المفاهيم الأساسية إلى الأنماط الجاهزة للإنتاج - حتى تتمكن من شحن خطوط الأنابيب التي تثق بها.
سنحافظ على الطابع العملي: ستحصل على نموذج ذهني لـ DAGs والمهام، وأمثلة عملية مع TaskFlow API، وخيارات النشر، واستراتيجيات الاختبار، وأفضل الممارسات. بحلول النهاية، ستنتقل من "يمكنني تشغيل البرنامج التعليمي" إلى "يمكنني تشغيل هذا في الإنتاج".
ملاحظة: للحصول على المزيد من التفاصيل والمرجع، فإن الوثائق الرسمية ممتازة ويتم تحديثها بانتظام.
ما هو Apache Airflow، حقًا؟
Airflow هو مُنسق - وليس معالج بيانات. يقوم بجدولة وترتيب ومراقبة العمل الذي تقوم بتشغيله في أماكن أخرى (قواعد البيانات، والمستودعات، ووظائف Spark، وواجهات برمجة التطبيقات، والحاويات). يمكنك تحديد سير العمل على هيئة DAGs (الرسوم البيانية الدائرية الموجهة)، وهي مجرد ملفات Python تقوم بترميز المهام والتبعيات الخاصة بها. ثم يقوم Airflow بتنفيذ هذه المهام وفقًا للجدول الزمني والمعلمات والبيئة الخاصة بك.
- DAG: تعريف سير العمل (رسم بياني للمهام مع التبعيات).
- المهمة: وحدة عمل (دالة Python، وتنفيذ SQL، وأمر Bash، ومشغل وظيفة خارجي، وما إلى ذلك).
- المشغل: قالب لنوع من المهام (على سبيل المثال،
PythonOperator، BashOperator، KubernetesPodOperator).
- المجدول: يقرر ما الذي يجب تشغيله ومتى.
- المنفذ: يقوم بتشغيل المهام (محليًا، مع Celery، Kubernetes، إلخ).
- واجهة المستخدم: مركز التحكم الخاص بك لعمليات التشغيل والسجلات وإعادة المحاولات والسلالة.
ابدأ بالبرامج التعليمية الرسمية بمجرد تثبيت Airflow؛ فهي تعطيك الصورة الكبيرة بسرعة.
تثبيت وتشغيل Airflow بالطريقة الصحيحة
Airflow مرن. اختر المسار الذي يتطابق مع مرحلتك:
- التطوير المحلي (بداية سريعة):
- استخدم Docker Compose للبدء السريع الذي يوفره المشروع. يقوم بتشغيل خادم الويب والمجدول وقاعدة البيانات والمزيد مع الإعدادات الافتراضية المعقولة.
- رائع للتعلم والتكرار على DAGs.
- Celery Executor أو Kubernetes Executor مع Postgres مُدار.
- قم بتخزين السجلات في S3/GCS وحزم التبعيات مع صورتك أو
requirements.txt.
- Kubernetes Executor للمرونة أو Celery Executor مع العمال ذوي التحجيم التلقائي.
- الأسرار الخارجية (Vault)، والمراقبة القوية (السجلات + المقاييس)، وعمليات النشر الزرقاء/الخضراء للترقيات.
نصيحة: احتفظ بقاعدة كود Airflow الخاصة بك تحت التحكم في الإصدار، وفي حاويات، وتم اختبارها قبل الترويج. تحدد صفحة "أفضل الممارسات" الأنماط الجاهزة للإنتاج.
المفاهيم الأساسية التي ستستخدمها يوميًا
DAGs: سير العمل الخاص بك كرمز
DAG هو ملف Python يحدد:
- بيانات تعريف DAG: المعرف، والجدول الزمني، وتاريخ البدء، والعلامات.
- الحجج الافتراضية: عمليات إعادة المحاولة، والمالكين، واتفاقيات مستوى الخدمة.
- المهام والتبعيات الخاصة بهم.
فكر في DAG على أنه "ماذا" و "متى"، والمهام على أنها "كيف".
المهام والمشغلون
المشغلون عبارة عن وحدات مسبقة الصنع للمهام الشائعة. أمثلة:
@task PythonOperator / TaskFlow لكود Python
- BashOperator لأوامر shell
- SimpleHttpOperator لواجهات برمجة التطبيقات
- KubernetesPodOperator للوظائف المحتواة
- موفرات SQL (مثل Snowflake و BigQuery و Postgres) لعمل المستودعات
TaskFlow API: الطريقة الحديثة والـ Pythonic
تتيح لك TaskFlow API كتابة المهام كوظائف Python مع @task، وإرجاع القيم التي تمر عبر XCom، وتكوينها بشكل نظيف. يقلل من التعليمات البرمجية القياسية ويحسن إمكانية القراءة - موصى به للغاية.
أول Airflow DAG الخاص بك (إصدار TaskFlow)
فيما يلي مثال بسيط على نمط ETL لتوضيح الأفكار الرئيسية: الجدولة، و TaskFlow، والتبعيات، وتمرير بيانات XCom.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
الجدولة والالتقاط والملء الخلفي
schedule: Cron أو الإعدادات المسبقة (@daily، @hourly).
start_date + catchup: إذا كان catchup=True، فسيقوم Airflow بملء عمليات التشغيل من تاريخ البدء. لخطوط الأنابيب ذات نمط البث، قم بتعيين catchup=False.
- عمليات الملء الخلفي اليدوية: استخدم واجهة المستخدم أو CLI لإعادة تشغيل الفترات التاريخية.
قاعدة عملية: قم بتمكين الالتقاط لوظائف الدُفعات الحتمية؛ تعطيل خطوط الأنابيب في الوقت الفعلي أو ذات معدل API المحدود.
تمرير البيانات بين المهام (XCom) بأمان
- الكائنات الصغيرة: قيم الإرجاع مع TaskFlow جيدة.
- الحمولات الكبيرة: قم بتخزينها في تخزين الكائنات (S3/GCS) بمفتاح في XCom.
- تجنب البيانات الحساسة في XCom؛ استخدم الخلفيات السرية (مثل Vault) ومتغيرات البيئة.
تعيين المهام الديناميكي وأحمال العمل الموزعة
يمكن لـ Airflow إنشاء مهام ديناميكيًا في وقت التشغيل بناءً على المدخلات - مثالي لمجموعات البيانات المقسمة أو الوظائف متعددة المستأجرين.
- حافظ على DAGs حتمية وثابتة.
- افصل التنسيق (Airflow) عن الحساب (Spark، dbt، المستودعات).
- استخدم TaskFlow API للوضوح ونظافة XCom.
- قم بتحديد معلمات DAGs؛ استخدم المتغيرات بحكمة.
- راقب خطوط الأنابيب الخاصة بك ونبه إليها وقم بتوثيقها.
كيفية العمل مع مستودعات البيانات والتعلم الآلي
- مستودعات البيانات: استخدم مشغلي الموفر (مثل SnowflakeOperator و BigQueryInsertJobOperator) لوظائف SQL. قم بتخزين SQL في الملفات أو الوحدات النمطية ذات الإصدار.
- dbt: قم بتشغيل dbt عبر Bash/KubernetesPodOperator أو مشغلي dbt المخصصين في الموفرين.
- ML: قم بتنسيق إنشاء الميزات والتدريب والاستدلال الدفعي كمهام منفصلة؛ قم بتخزين القطع الأثرية مؤقتًا في التخزين وتسجيل المقاييس.
الجدولة المتقدمة: مجموعات البيانات والتبعيات عبر DAG
- تتيح مجموعات البيانات لـ DAG واحد إنتاج مجموعة بيانات منطقية تقوم بتشغيل DAG آخر عند تحديثها - أنظف من المشغلات المخصصة.
- بالنسبة للأنماط القديمة، يعمل ExternalTaskSensor، ولكن مجموعات البيانات أكثر تعريفية.
الأمن والامتثال
- استخدم التحكم في الوصول المستند إلى الأدوار (RBAC) في واجهة المستخدم.
- اعزل البيئات لكل فريق أو حدود ثقة.
- احتفظ بمسارات التدقيق عبر السجلات وسجل تغيير الاتصال.
الترقيات والتحكم في الإصدار
- اختبر الترقيات في التدريج بأحمال عمل شبيهة بالإنتاج.
- قم بتثبيت وترقية الموفرين عن قصد.
- اقرأ ملاحظات الإصدار للتغييرات والإهمال الخاصة بالمنفذ.
قائمة تحقق سريعة لأول DAG للإنتاج
- ملكية واضحة (علامة
owner) والتنبيهات التي تم تكوينها.
- تم تعيين
retries مع عمليات إرجاع معقولة.
- مهام ثابتة وتبعيات صريحة.
- حمولة XCom صغيرة؛ بيانات كبيرة في التخزين.
- السجلات التي يتم شحنها إلى التخزين الدائم؛ المقاييس المصدرة.
- خطة طرح (كناري أو أزرق/أخضر) وخطوات التراجع.
مثال: DAG واقعي لتحميل المستودع
يستخرج هذا النمط الملفات اليومية ويتحقق من صحتها ويحملها في جدول مستودع، مع تعيين ديناميكي لكل قسم وأجهزة استشعار قابلة للتأجيل.
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- راجع أفضل الممارسات قبل الترويج للإنتاج.
- استكشف وثائق الموفر لأنظمتك (المستودعات والسحب وأدوات ML).
بالمناسبة: تسريع التأليف مع مساعد الذكاء الاصطناعي
تجدر الإشارة إلى: إذا كنت تصمم الكثير من DAGs، فيمكن لمساعد الذكاء الاصطناعي الذي يفهم التعليمات البرمجية تسريع التعليمات البرمجية القياسية وإنشاء جذور TaskFlow وحتى اقتراح إصلاحات التبعية. إذا كنت تريد مساعدًا خفيف الوزن بجانب المحرر والمتصفح، فيمكن أن يكون Sider.AI مفيدًا لإعادة كتابة التعليمات البرمجية السريعة والشروحات أثناء التطوير. الوجبات الرئيسية
- استخدم Airflow للتنسيق، وليس للحساب.
- فضل TaskFlow API لـ DAGs نظيفة وقابلة للاختبار.
- احتفظ بالبيانات خارج XCom؛ مرر المراجع بدلاً من ذلك.
- استخدم أجهزة الاستشعار/المشغلين القابلين للتأجيل لحفظ الفتحات.
- ضع في حاويات واختبر وقم بالترقية من خلال البيئات.
- اعتمد على البرامج التعليمية الرسمية وأفضل الممارسات كنجمك الشمالي.
الأسئلة الشائعة
س1: ما هي أسهل طريقة لتعلم كيفية استخدام Airflow؟
ابدأ بالبرنامج التعليمي الرسمي لفهم DAGs والمهام والجدولة وواجهة المستخدم. ثم قم ببناء خط أنابيب صغير يعتمد على TaskFlow وكرر مع دليل أفضل الممارسات للاستعداد للإنتاج.
س2: هل يجب علي استخدام TaskFlow API أو المشغلين الكلاسيكيين في Airflow؟
استخدم TaskFlow API لمعظم خطوط أنابيب Pythonic لأنها أنظف وتتعامل مع إرجاع XCom بشكل طبيعي. لا تزال المشغلات الكلاسيكية رائعة للمهام غير Python مثل Bash أو SQL أو وظائف الحاويات.
س3: كيف يمكنني تمرير بيانات كبيرة بين مهام Airflow؟
تجنب وضع حمولات كبيرة في XCom. قم بتخزين البيانات في S3/GCS أو قاعدة بيانات وقم بتمرير المراجع أو URI فقط من خلال XCom للحفاظ على المهام سريعة وموثوقة.
س4: ما هو المنفذ الذي يجب أن أختاره لـ Airflow في الإنتاج؟
للمرونة والعزل، يعتبر Kubernetes Executor هو الافتراضي القوي. بالنسبة للإعدادات الأبسط، يعمل Celery Executor بشكل جيد - فقط تأكد من التحجيم التلقائي والتسجيل القوي والأسرار الخارجية.
س5: كيف يمكنني التعامل مع التبعيات عبر DAGs متعددة في Airflow؟
استخدم مجموعات البيانات لمشغلات DAG المتقاطعة التعريفية عندما ينتج أحد خطوط الأنابيب بيانات لآخر. بدلاً من ذلك، يمكن لـ ExternalTaskSensor تنسيق عمليات التشغيل، ولكن مجموعات البيانات أنظف للتنسيق المستند إلى البيانات.