常用
学习地址
https://www.sparkcodehub.com/airflow/fundamentals/introduction
简单例子
simple_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime, timedelta
# with DAG(
# dag_id="simple_dag",
# start_date=datetime(2025, 1, 1),
# schedule_interval="@daily",
# catchup=False,
# ) as dag:
# start = BashOperator(
# task_id="start_task",
# bash_command="echo 'Starting the workflow!'",
# )
# end = BashOperator(
# task_id="end_task",
# bash_command="echo 'Workflow complete!'",
# )
# start >> end
# with DAG(
# dag_id="etl_dag",
# start_date=datetime(2025, 1, 1),
# schedule_interval="@daily",
# catchup=False,
# ) as dag:
# extract = BashOperator(task_id="extract", bash_command="echo 'Extractiong data'")
# transform = BashOperator(
# task_id="transform", bash_command="echo 'Transforming data'"
# )
# load = BashOperator(task_id="load", bash_command="echo 'Loading date'")
# extract >> transform >> load
# def process_data():
# print("Processing data with Python!")
# with DAG(
# dag_id="python_dag",
# start_date=datetime(2025, 1, 1),
# schedule_interval="@daily",
# catchup=False,
# ) as dag:
# start = PythonOperator(task_id="start", python_callable=process_data)
# with DAG(
# dag_id="complex_dag",
# start_date=datetime(2025, 1, 1),
# schedule_interval="@daily",
# catchup=False,
# ) as dag:
# extract = BashOperator(task_id="extract", bash_command="echo 'Extract'")
# transform1 = BashOperator(task_id="transform1", bash_command="echo 'Transform 1'")
# transform2 = BashOperator(task_id="transform2", bash_command="echo 'Transform 2'")
# load = BashOperator(task_id="load", bash_command="echo 'Load'")
# extract >> [transform1, transform2] >> load
# default_args = {
# "retries": 2,
# "retry_delay": timedelta(minutes=2),
# }
# with DAG(
# dag_id="param_dag",
# start_date=datetime(2025, 4, 13),
# schedule_interval="@daily",
# catchup=False,
# max_active_runs=2,
# default_args=default_args,
# description="A daily workflow with custom paramters",
# ) as dag:
# start = BashOperator(task_id="start_task", bash_command="echo 'Staring!'")
# end = BashOperator(task_id="end_stask", bash_command="echo 'Done!'")
# start >> end
# with DAG(
# dag_id="cwd_bash_dag",
# start_date=datetime(2025, 4, 1),
# schedule_interval="@daily",
# catchup=False,
# ) as dag:
# run_cwd = BashOperator(
# task_id="run_cwd",
# bash_command="ls -l",
# cwd="/tmp",
# )
# with DAG(
# dag_id="exit_code_bash_dag",
# start_date=datetime(2025, 4, 1),
# schedule_interval="@daily",
# catchup=False,
# ) as dag:
# run_exit = BashOperator(
# task_id="run_exit",
# bash_command="exit 100",
# skip_exit_code=100,
# )
# def afternoon_task():
# print("Running at 2:30 PM!")
# with DAG(
# dag_id="specific_time_dag",
# start_date=datetime(2025, 1, 1),
# schedule_interval="15 * * * *", # 2:30 PM daily
# catchup=False,
# ) as dag:
# task = PythonOperator(
# task_id="afternoon_task",
# python_callable=afternoon_task,
# )
# def catchup_task(ds):
# print(f"Processing data for {ds}")
# with DAG(
# dag_id="catchup_daily_dag",
# start_date=datetime(2025, 4, 9),
# schedule_interval="0 0 * * *", # Midnight UTC daily
# catchup=True,
# ) as dag:
# task = PythonOperator(
# task_id="catchup_task",
# python_callable=catchup_task,
# op_kwargs={"ds": "{ { ds } }"},
# )
def banch_func(**context):
excution_date = context["execution_date"]
print(excution_date)
if excution_date.weekday() < 5:
return "weekday_task"
else:
return "weekend_task"
with DAG(
dag_id="banching_dag",
start_date=datetime(2025, 4, 14),
schedule_interval="@daily",
catchup=False,
) as dag:
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=banch_func,
provide_context=True,
)
weekday_task = BashOperator(
task_id="weekday_task",
bash_command="echo 'Running on a weekday!'",
)
weekend_task = BashOperator(
task_id="weekend_task",
bash_command="echo 'Running on a weekend!'",
)
branch_task >> [weekday_task, weekend_task]