কীভাবে Airflow ব্যবহার করবেন: নির্ভরযোগ্য ডেটা পাইপলাইন তৈরির জন্য একটি ব্যবহারিক, এন্ড-টু-এন্ড গাইড
আপনি যদি ডেটা সরান বা ML জবগুলি পরিচালনা করেন, তবে সম্ভবত একই কথা শুনেছেন: “এটা Airflow-এ দিন।” সত্যি কথা হল, জটিল ওয়ার্কফ্লো-এর উপর আপনার যখন দৃশ্যমানতা, নির্ভরযোগ্যতা এবং নিয়ন্ত্রণের প্রয়োজন হয়, তখন Apache Airflow খুবই উপযোগী। এই ব্যবহারিক গাইডে, আমরা Airflow কীভাবে ব্যবহার করতে হয় তা ধাপে ধাপে দেখাবো—কোর ধারণা থেকে শুরু করে প্রোডাকশন-রেডি প্যাটার্ন পর্যন্ত—যাতে আপনি আত্মবিশ্বাসের সাথে পাইপলাইন তৈরি করতে পারেন।
আমরা এটিকে কার্যকরী রাখব: আপনি DAGs এবং task-এর জন্য একটি মানসিক মডেল, TaskFlow API-এর সাথে হাতে-কলমে উদাহরণ, স্থাপনার বিকল্প, পরীক্ষার কৌশল এবং সেরা পদ্ধতি পাবেন। শেষ পর্যন্ত, আপনি "আমি টিউটোরিয়ালটি চালাতে পারি" থেকে "আমি এটি প্রোডাকশনে চালাতে পারি" এই অবস্থানে যেতে পারবেন।
নোট: আরও বিস্তারিত জানার জন্য, অফিসিয়াল ডকুমেন্টেশন চমৎকার এবং নিয়মিত আপডেট করা হয়।
Apache Airflow আসলে কী?
Airflow একটি অর্কেস্ট্রেটর—ডেটা প্রসেসর নয়। এটি আপনার অন্য কোথাও চালানো কাজ (ডাটাবেস, ওয়্যারহাউস, স্পার্ক জব, API, কন্টেইনার) এর সময়সূচী তৈরি করে, সাজায় এবং নিরীক্ষণ করে। আপনি DAGs (ডিরেক্টেড অ্যাসাইক্লিক গ্রাফ) হিসাবে ওয়ার্কফ্লো নির্ধারণ করেন, যা শুধুমাত্র পাইথন ফাইল যা টাস্ক এবং তাদের নির্ভরতা এনকোড করে। Airflow তারপর আপনার সময়সূচী, প্যারামিটার এবং পরিবেশ অনুযায়ী সেই টাস্কগুলি সম্পাদন করে।
- DAG: ওয়ার্কফ্লো সংজ্ঞা (নির্ভরতা সহ টাস্কের গ্রাফ)।
- Task: কাজের একক (পাইথন ফাংশন, SQL এক্সিকিউশন, ব্যাশ কমান্ড, বাহ্যিক জব ট্রিগার ইত্যাদি)।
- Operator: এক ধরনের টাস্কের টেমপ্লেট (যেমন,
PythonOperator, BashOperator, KubernetesPodOperator)।
- Scheduler: কী চালাতে হবে এবং কখন চালাতে হবে তা স্থির করে।
- Executor: টাস্ক চালায় (স্থানীয়ভাবে, Celery, Kubernetes ইত্যাদির সাথে)।
- UI: রান, লগ, রিট্রাই এবং বংশের জন্য আপনার কন্ট্রোল সেন্টার।
Airflow ইনস্টল করার পরে অফিসিয়াল টিউটোরিয়াল দিয়ে শুরু করুন; এটি আপনাকে দ্রুত একটি বড় ধারণা দেবে।
সঠিক উপায়ে Airflow ইনস্টল এবং চালানো
Airflow নমনীয়। আপনার পরিস্থিতির সাথে মেলে এমন পথ বেছে নিন:
- লোকাল ডেভেলপমেন্ট (কুইক স্টার্ট):
- প্রকল্প দ্বারা প্রদত্ত কুইক-স্টার্ট Docker Compose ব্যবহার করুন। এটি ওয়েবসার্ভার, শিডিউলার, ডাটাবেস এবং আরও অনেক কিছু উপযুক্ত ডিফল্ট সেটিংস দিয়ে চালু করে।
- DAGs শেখা এবং পুনরাবৃত্তির জন্য চমৎকার।
- Celery Executor অথবা Kubernetes Executor একটি ম্যানেজড Postgres-এর সাথে।
- S3/GCS-এ লগ স্টোর করুন এবং আপনার ইমেজ বা
requirements.txt দিয়ে নির্ভরতা প্যাকেজ করুন।
- ইলাস্টিসিটির জন্য Kubernetes Executor অথবা অটোস্কেলিং ওয়ার্কার সহ Celery Executor।
- আপগ্রেডের জন্য বাহ্যিক সিক্রেট (Vault), শক্তিশালী পর্যবেক্ষণযোগ্যতা (লগ + মেট্রিক) এবং ব্লু/গ্রিন ডিপ্লয় ব্যবহার করুন।
টিপ: আপনার Airflow কোডবেস সংস্করণ-নিয়ন্ত্রিত, কন্টেইনারাইজড এবং প্রোমোশনের আগে পরীক্ষিত রাখুন। "সেরা পদ্ধতি" পৃষ্ঠা প্রোডাকশন-রেডি প্যাটার্নগুলির রূপরেখা দেয়।
কোর ধারণা যা আপনি প্রতিদিন ব্যবহার করবেন
DAGs: কোড হিসাবে আপনার ওয়ার্কফ্লো
একটি DAG হল একটি পাইথন ফাইল যা সংজ্ঞায়িত করে:
- DAG মেটাডেটা: আইডি, সময়সূচী, শুরুর তারিখ, ট্যাগ।
- ডিফল্ট আর্গুমেন্ট: রিট্রাই, মালিক, SLA।
- টাস্ক এবং তাদের নির্ভরতা।
একটি DAG-কে "কী" এবং "কখন" হিসাবে এবং টাস্কগুলিকে "কীভাবে" হিসাবে চিন্তা করুন।
টাস্ক এবং অপারেটর
অপারেটর হল সাধারণ টাস্কের জন্য প্রিফ্যাব। উদাহরণ:
- পাইথন কোডের জন্য PythonOperator / TaskFlow
@task
- শেল কমান্ডের জন্য BashOperator
- API-এর জন্য SimpleHttpOperator
- কন্টেইনারাইজড জবের জন্য KubernetesPodOperator
- ওয়্যারহাউস কাজের জন্য SQL প্রদানকারী (যেমন, Snowflake, BigQuery, Postgres)
TaskFlow API: আধুনিক, পাইথনিক উপায়
TaskFlow API আপনাকে @task দিয়ে পাইথন ফাংশন হিসাবে টাস্ক লিখতে, XCom-এর মাধ্যমে পাস হওয়া রিটার্ন ভ্যালু এবং সেগুলিকে পরিষ্কারভাবে কম্পোজ করতে দেয়। এটি বয়লারপ্লেট কমায় এবং পঠনযোগ্যতা উন্নত করে—অত্যন্ত প্রস্তাবিত।
আপনার প্রথম Airflow DAG (TaskFlow সংস্করণ)
নিচে মূল ধারণাগুলি চিত্রিত করার জন্য একটি সংক্ষিপ্ত ETL-শৈলীর উদাহরণ দেওয়া হল: সময়সূচী, TaskFlow, নির্ভরতা এবং XCom ডেটা পাসিং।
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="weather_etl_example",
start_date=datetime(2024, 1, 1),
schedule="0 * * * *", # hourly
catchup=False,
tags=.
সময়সূচী, ক্যাচআপ এবং ব্যাকফিল
schedule: ক্রন বা প্রিসেট (@daily, @hourly)।
start_date + catchup: যদি catchup=True হয়, তাহলে Airflow শুরুর তারিখ থেকে ব্যাকফিল চালাবে। স্ট্রিমিং-স্টাইল পাইপলাইনের জন্য, catchup=False সেট করুন।
- ম্যানুয়াল ব্যাকফিল: ঐতিহাসিক ব্যবধানগুলি পুনরায় চালানোর জন্য UI বা CLI ব্যবহার করুন।
ব্যবহারিক নিয়ম: ডিটারমিনিস্টিক ব্যাচ জবের জন্য ক্যাচআপ সক্ষম করুন; রিয়েল-টাইম বা API রেট-সীমিত পাইপলাইনের জন্য অক্ষম করুন।
টাস্কের মধ্যে নিরাপদে ডেটা পাস করা (XCom)
- ছোট অবজেক্ট: TaskFlow এর সাথে রিটার্ন ভ্যালু ঠিক আছে।
- বৃহৎ পেলোড: XCom-এ একটি কী সহ অবজেক্ট স্টোরেজে (S3/GCS) স্টোর করুন।
- XCom-এ সংবেদনশীল ডেটা এড়িয়ে চলুন; সিক্রেট ব্যাকেন্ড (যেমন, Vault) এবং এনভায়রনমেন্ট ভেরিয়েবল ব্যবহার করুন।
ডায়নামিক টাস্ক ম্যাপিং এবং ফ্যান-আউট ওয়ার্কলোড
Airflow ইনপুটের উপর ভিত্তি করে রানটাইমে গতিশীলভাবে টাস্ক তৈরি করতে পারে—বিভাজিত ডেটাসেট বা মাল্টি-টেন্যান্ট জবের জন্য আদর্শ।
- DAGs ডিটারমিনিস্টিক এবং আইডেম্পোটেন্ট রাখুন।
- গণনা (স্পার্ক, dbt, ওয়্যারহাউস) থেকে অর্কেস্ট্রেশন (Airflow) আলাদা করুন।
- স্পষ্টতা এবং XCom স্বাস্থ্যবিধির জন্য TaskFlow API ব্যবহার করুন।
- DAGs প্যারামিটারাইজ করুন; বিচার করে ভেরিয়েবল ব্যবহার করুন।
- আপনার পাইপলাইন নিরীক্ষণ, সতর্ক এবং ডকুমেন্ট করুন।
কীভাবে ডেটা ওয়্যারহাউস এবং ML এর সাথে কাজ করবেন
- ডেটা ওয়্যারহাউস: SQL জবের জন্য প্রদানকারী অপারেটর (যেমন, SnowflakeOperator, BigQueryInsertJobOperator) ব্যবহার করুন। SQL ফাইল বা সংস্করণযুক্ত মডিউলে স্টোর করুন।
- dbt: ব্যাশ/KubernetesPodOperator অথবা প্রদানকারীতে ডেডিকেটেড dbt অপারেটরের মাধ্যমে dbt ট্রিগার করুন।
- ML: ফিচার জেনারেশন, ট্রেনিং এবং ব্যাচ ইনফ্যারেন্সকে পৃথক টাস্ক হিসাবে অর্কেস্ট্রেট করুন; স্টোরেজে আর্টিফ্যাক্ট ক্যাশ করুন এবং মেট্রিক লগ করুন।
অ্যাডভান্সড শিডিউলিং: ডেটাসেট এবং ক্রস-DAG নির্ভরতা
- ডেটাসেট একটি লজিক্যাল ডেটাসেট তৈরি করতে দেয় যা আপডেট হলে অন্য DAG ট্রিগার করে—অ্যাড-হক ট্রিগারের চেয়ে পরিষ্কার।
- পুরানো প্যাটার্নের জন্য, ExternalTaskSensor কাজ করে, কিন্তু ডেটাসেট আরও ডিক্লারেটিভ।
সুরক্ষা এবং সম্মতি
- UI-তে রোল-ভিত্তিক অ্যাক্সেস কন্ট্রোল (RBAC) ব্যবহার করুন।
- দল বা বিশ্বাসের সীমানা অনুসারে পরিবেশকে আলাদা করুন।
- লগ এবং সংযোগ পরিবর্তনের ইতিহাসের মাধ্যমে নিরীক্ষণ ট্রেইল রাখুন।
আপগ্রেড এবং সংস্করণ
- উৎপাদন-এর মতো ওয়ার্কলোড সহ স্টেজিং-এ আপগ্রেড পরীক্ষা করুন।
- উদ্দেশ্যমূলকভাবে প্রদানকারী পিন এবং আপগ্রেড করুন।
- এক্সিকিউটর-নির্দিষ্ট পরিবর্তন এবং বাতিলের জন্য রিলিজ নোট পড়ুন।
আপনার প্রথম প্রোডাকশন DAG-এর জন্য একটি দ্রুত চেকলিস্ট
- পরিষ্কার মালিকানা (
owner ট্যাগ) এবং সতর্কতা কনফিগার করা হয়েছে।
- যুক্তিসঙ্গত ব্যাকঅফ সহ
retries সেট করা হয়েছে।
- আইডেম্পোটেন্ট টাস্ক এবং সুস্পষ্ট নির্ভরতা।
- ছোট XCom পেলোড; স্টোরেজে বড় ডেটা।
- টেকসই স্টোরেজে পাঠানো লগ; মেট্রিক রপ্তানি করা হয়েছে।
- রোলআউট পরিকল্পনা (ক্যানারি বা ব্লু/গ্রিন) এবং রোলব্যাক পদক্ষেপ।
উদাহরণ: একটি বাস্তবসম্মত ওয়্যারহাউস লোড DAG
এই প্যাটার্নটি দৈনিক ফাইলগুলি বের করে, সেগুলিকে যাচাই করে এবং একটি ওয়্যারহাউস টেবিলে লোড করে, যেখানে পার্টিশন প্রতি গতিশীল ম্যাপিং এবং ডিফারএবল সেন্সর রয়েছে।
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.sensors.filesystem import FileSensor
- উৎপাদনে প্রচার করার আগে সেরা পদ্ধতি পর্যালোচনা করুন।
- আপনার সিস্টেমের জন্য প্রদানকারীর ডকুমেন্টেশন অন্বেষণ করুন (ওয়্যারহাউস, ক্লাউড, ML সরঞ্জাম)।
যাইহোক: একটি AI সাইডকিকের সাথে লেখকের গতি বাড়ান
উল্লেখ করার মতো: আপনি যদি প্রচুর DAGs খসড়া করেন, তাহলে কোড বুঝতে পারে এমন একটি AI সহকারী বয়লারপ্লেটকে ত্বরান্বিত করতে, TaskFlow স্টাব তৈরি করতে এবং এমনকি নির্ভরতা ফিক্সের পরামর্শ দিতে পারে। আপনি যদি আপনার সম্পাদক এবং ব্রাউজারের পাশাপাশি একটি হালকা সহকারী চান, তাহলে Sider.AI ডেভেলপমেন্টের সময় দ্রুত কোড রিরাইট এবং ব্যাখ্যার জন্য কাজে আসতে পারে। মূল বিষয়
- গণনা করার জন্য নয়, অর্কেস্ট্রেট করার জন্য Airflow ব্যবহার করুন।
- পরিষ্কার, পরীক্ষাযোগ্য DAGs-এর জন্য TaskFlow API পছন্দ করুন।
- XCom থেকে ডেটা দূরে রাখুন; পরিবর্তে রেফারেন্স পাস করুন।
- স্লট বাঁচাতে ডিফারএবল সেন্সর/অপারেটর ব্যবহার করুন।
- কন্টেইনারাইজ করুন, পরীক্ষা করুন এবং পরিবেশের মাধ্যমে প্রচার করুন।
- আপনার নর্থ স্টার হিসাবে অফিসিয়াল টিউটোরিয়াল এবং সেরা পদ্ধতির উপর নির্ভর করুন।
FAQ
Q1: Airflow কীভাবে ব্যবহার করতে হয় তা শেখার সহজ উপায় কী?
DAGs, টাস্ক, শিডিউলিং এবং UI বুঝতে অফিসিয়াল টিউটোরিয়াল দিয়ে শুরু করুন। তারপর প্রোডাকশন-রেডিনেসের জন্য সেরা পদ্ধতির গাইড দিয়ে একটি ছোট TaskFlow-ভিত্তিক পাইপলাইন তৈরি করুন এবং পুনরাবৃত্তি করুন।
Q2: Airflow-এ আমার TaskFlow API অথবা ক্লাসিক অপারেটর ব্যবহার করা উচিত?
বেশিরভাগ পাইথনিক পাইপলাইনের জন্য TaskFlow API ব্যবহার করুন কারণ এটি পরিষ্কার এবং স্বাভাবিকভাবে XCom রিটার্নগুলি পরিচালনা করে। ক্লাসিক অপারেটরগুলি এখনও ব্যাশ, SQL অথবা কন্টেইনার জবের মতো নন-পাইথন টাস্কের জন্য দুর্দান্ত।
Q3: আমি কীভাবে Airflow টাস্কের মধ্যে বড় ডেটা পাস করব?
XCom-এ বড় পেলোড রাখা এড়িয়ে চলুন। S3/GCS অথবা একটি ডাটাবেসে ডেটা স্টোর করুন এবং টাস্কগুলিকে দ্রুত এবং নির্ভরযোগ্য রাখতে শুধুমাত্র রেফারেন্স অথবা URI XCom-এর মাধ্যমে পাস করুন।
Q4: প্রোডাকশনে Airflow-এর জন্য আমার কোন এক্সিকিউটর বেছে নেওয়া উচিত?
ইলাস্টিসিটি এবং আইসোলেশনের জন্য, Kubernetes Executor একটি শক্তিশালী ডিফল্ট। সরল সেটআপের জন্য, Celery Executor ভালোভাবে কাজ করে—শুধু অটোস্কেলিং, শক্তিশালী লগিং এবং বাহ্যিক সিক্রেট নিশ্চিত করুন।
Q5: আমি কীভাবে একাধিক Airflow DAGs-এর মধ্যে নির্ভরতা পরিচালনা করব?
যখন একটি পাইপলাইন অন্যটির জন্য ডেটা তৈরি করে, তখন ডিক্লারেটিভ ক্রস-DAG ট্রিগারের জন্য ডেটাসেট ব্যবহার করুন। বিকল্পভাবে, ExternalTaskSensor রানগুলি সমন্বিত করতে পারে, তবে ডেটা-চালিত অর্কেস্ট্রেশনের জন্য ডেটাসেটগুলি আরও পরিষ্কার।