Skip to content

Commit

Permalink
feat: Airflow 주석 포함한 파일 (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzsza authored Feb 5, 2024
1 parent 8d00701 commit ee016dd
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 191 deletions.
36 changes: 16 additions & 20 deletions 01-batch-serving(airflow)/dags/01-bash-operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,34 @@
from datetime import datetime, timedelta

default_args = {
'owner': 'kyle',
'depends_on_past': False, # 이전 DAG의 Task가 성공, 실패 여부에 따라 현재 DAG 실행 여부가 결정. False는 과거의 실행 결과 상관없이 매일 실행한다
'start_date': datetime(2024, 1, 1),
'retires': 1, # 실패시 재시도 횟수
'retry_delay': timedelta(minutes=5) # 만약 실패하면 5분 뒤 재실행
# 'priority_weight': 10 # DAG의 우선 순위를 설정할 수 있음
# 'execution_timeout': timedelta(seconds=300), # 실행 타임아웃 : 300초 넘게 실행되면 종료
"owner": "kyle",
"depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행
"start_date": datetime(2024, 1, 1)
}

# with 구문으로 DAG 정의
with DAG(
dag_id='bash_dag',
default_args=default_args,
schedule_interval='@once', # 1번 실행
tags=['my_dags']
dag_id="bash_dag",
default_args=default_args,
schedule_interval="@once",
tags=["my_dags"]
) as dag:
# BashOperator 사용

task1 = BashOperator(
task_id='print_date', # task의 id
bash_command='date' # 실행할 bash command
task_id="print_date", # task의 id
bash_command="date" # 실행할 bash command를 저장
)

task2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=2 # 만약 bash command가 실패하면 2번 재시도한다
task_id="sleep",
bash_command="sleep 5",
retries=2 # 만약 bash command가 실패하면 2번 재시도
)

task3 = BashOperator(
task_id='pwd',
bash_command='pwd'
)

task1 >> task2 # task1 이후에 task2 실행
task1 >> task3 # task1 이후에 task3 실행(2와 3을 병렬로 실행)
task1 >> task2 # task1이 완료되면, task2를 실행
task1 >> task3 # task1이 완료되면, task3을 실행
# task1 >> [task2, task3]
28 changes: 14 additions & 14 deletions 01-batch-serving(airflow)/dags/02-python-operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,33 @@
from datetime import datetime, timedelta

default_args = {
'owner': 'kyle',
'depends_on_past': False, # 이전 DAG의 Task가 성공, 실패 여부에 따라 현재 DAG 실행 여부가 결정. False는 과거의 실행 결과 상관없이 매일 실행한다
'start_date': datetime(2024, 1, 1),
'end_date': datetime(2024, 1, 4),
'retires': 1, # 실패시 재시도 횟수
"owner": "kyle",
"depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행
"start_date": datetime(2024, 1, 1),
"end_date": datetime(2024, 1, 4),
'retries': 1, # 실패시 재시도 횟수
'retry_delay': timedelta(minutes=5) # 만약 실패하면 5분 뒤 재실행
}


# 사용할 함수 정의
def print_current_date():
date_kor = ["월", "화", "수", "목", "금", "토", "일"]
date_now = datetime.now().date()
datetime_weeknum = date_now.weekday()
print(f"{date_now}{date_kor[datetime_weeknum]}요일입니다")


# with 구문으로 DAG 정의
with DAG(
dag_id='python_dag1',
default_args=default_args,
schedule_interval='30 0 * * *', # UTC 시간 기준 0시 30분에 Daily로 실행하겠다! 한국 시간 기준 오전 9시 30분
tags=['my_dags']
dag_id="python_dag1",
default_args=default_args,
schedule_interval="30 0 * * *", # UTC 시간 기준으로 매일 0시 30분에 실행하겠다. 한국 시간으로 9시 30분에 실행!
tags=['my_dags'],
catchup=True
) as dag:

python_task = PythonOperator(
task_id='print_current_date',
python_callable=print_current_date # 실행할 python 함수
task_id="print_current_date",
python_callable=print_current_date
)

python_task

70 changes: 32 additions & 38 deletions 01-batch-serving(airflow)/dags/03-python-operator-with-context.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,51 @@
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 앞선 02-python-operator.py는 "date_now = datetime.now().date()"를 사용하기 때문에
# 언제 실행해도 실행하는 시간 기준으로 실행됨
# Airflow는 Batch 성으로 특정 시간대로 실행하는 도구인데, 위와 같이 now 등을 사용하면 실행하는 시점 기준으로 실행이 됩니다(원래 기대했던 실행 시점이 아닌, 동작 시점)
# Airflow는 항상 현재 최신 작업만 실행하는 것은 아니고, 과거 날짜를 실행해야 하는 경우도 있음(Backfill이란 용어 사용)
# 따라서 코드 상에서 now(), SQL 상에서 current_date() 등을 사용하지 않고, Airflow에서 실행하기로 했던 시간을 넣어줘야 합니다
# execution_date라고 부르다가 logical_date로 수정함
# 앞선 02-python-operator.py는 "date_now = datetime.now().date()"를 사용했기 때문에
# 언제 실행해도 우리가 실행하는 시간 기준으로 실행됨
# Airflow Batch성으로 특정 시간대로 실행하는 도구. now 등을 잘 쓰지 않음. 의도한 시간, 날짜 주입해서 사용
# Airflow로 과거 날짜로 실행해야 하는 경우도 존재. 과거 데이터 마이그레이션
# 코드 상에서 now(), SQL 상에서도 current_date() 사용하지 않고, Airflow에서 실행하기로 했던 시간을 넣어줘야 함
# execution_date, logical_date
# 멱등성 : 연산을 여러 번 적용하더라도 결과가 달라지지 않는 성질

default_args = {
'owner': 'kyle',
'depends_on_past': False, # 이전 DAG의 Task가 성공, 실패 여부에 따라 현재 DAG 실행 여부가 결정. False는 과거의 실행 결과 상관없이 매일 실행한다
'start_date': datetime(2024, 1, 1),
'end_date': datetime(2024, 1, 4),
'retires': 1, # 실패시 재시도 횟수
'retry_delay': timedelta(minutes=5) # 만약 실패하면 5분 뒤 재실행
"owner": "kyle",
"depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행
"start_date": datetime(2024, 1, 1),
"end_date": datetime(2024, 1, 4)
}


def print_current_date_with_context_variable(*args, **kwargs):
def print_current_date_with_context(*args, **kwargs):
"""
{'conf': <airflow.configuration.AirflowConfigParser object at 0x1069e8b00>,
'dag': <DAG: python_dag_with_context>,
'dag_run': <DagRun python_dag_with_context @ 2022-04-20 00:30:00+00:00: scheduled__2022-04-20T00:30:00+00:00,
externally triggered: False>,
'data_interval_end': DateTime(2024, 1, 1, 0, 30, 0, tzinfo=Timezone('UTC')),
'data_interval_start': DateTime(2024, 1, 2, 0, 30, 0, tzinfo=Timezone('UTC')),
'ds': '2024-01-01',
'ds_nodash': '20240101',
'next_execution_date': <Proxy at 0x108658248 with factory <function TaskInstance.get_template_context.<locals>.deprecated_proxy.<locals>.deprecated_func at 0x108654b70>>
}
kwargs: {'conf': <airflow.configuration.AirflowConfigParser object at 0x1037d9730>,
'dag': <DAG: python_dag_with_context>,
'dag_run': <DagRun python_dag_with_context @ 2024-01-01 00:30:00+00:00: scheduled__2024-01-01T00:30:00+00:00,
state:running,
queued_at: 2024-02-05 04:37:43.715262+00:00. externally triggered: False>,
'data_interval_end': DateTime(2024, 1, 2, 0, 30, 0, tzinfo=Timezone('UTC')), 'data_interval_start': DateTime(2024, 1, 1, 0, 30, 0, tzinfo=Timezone('UTC')),
'ds': '2024-01-01', 'ds_nodash': '20240101', 'execution_date': <Proxy at 0x110ad8440 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'execution_date', DateTime(2024, 1, 1, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'expanded_ti_count': None, 'inlets': [], 'logical_date': DateTime(2024, 1, 1, 0, 30, 0, tzinfo=Timezone('UTC')), 'macros': <module 'airflow.macros' from '/Users/kyle/boostcamp-ai-tech/.venv/lib/python3.9/site-packages/airflow/macros/__init__.py'>, 'next_ds': <Proxy at 0x110ade500 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'next_ds', '2024-01-02')>, 'next_ds_nodash': <Proxy at 0x110ae5240 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'next_ds_nodash', '20240102')>, 'next_execution_date': <Proxy at 0x110ae5500 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'next_execution_date', DateTime(2024, 1, 2, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'outlets': [], 'params': {}, 'prev_data_interval_start_success': None, 'prev_data_interval_end_success': None, 'prev_ds': <Proxy at 0x1109393c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'prev_ds', '2023-12-31')>, 'prev_ds_nodash': <Proxy at 0x110ae8040 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'prev_ds_nodash', '20231231')>, 'prev_execution_date': <Proxy at 0x110ae80c0 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'prev_execution_date', DateTime(2023, 12, 31, 0, 30, 0, tzinfo=Timezone('UTC')))>, 'prev_execution_date_success': <Proxy at 0x110ae8100 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'prev_execution_date_success', None)>, 'prev_start_date_success': None, 'run_id': 'scheduled__2024-01-01T00:30:00+00:00', 'task': <Task(PythonOperator): print_current_date_with_context>, 'task_instance': <TaskInstance: python_dag_with_context.print_current_date_with_context scheduled__2024-01-01T00:30:00+00:00 [running]>, 'task_instance_key_str': 'python_dag_with_context__print_current_date_with_context__20240101', 'test_mode': False, 'ti': <TaskInstance: python_dag_with_context.print_current_date_with_context scheduled__2024-01-01T00:30:00+00:00 [running]>, 'tomorrow_ds': <Proxy at 0x110ae8240 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'tomorrow_ds', '2024-01-02')>, 'tomorrow_ds_nodash': <Proxy at 0x110ae8300 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'tomorrow_ds_nodash', '20240102')>, 'triggering_dataset_events': <Proxy at 0x110a95a80 with factory <function TaskInstance.get_template_context.<locals>.get_triggering_events at 0x110a8fca0>>, 'ts': '2024-01-01T00:30:00+00:00', 'ts_nodash': '20240101T003000', 'ts_nodash_with_tz': '20240101T003000+0000', 'var': {'json': None, 'value': None}, 'conn': None, 'yesterday_ds': <Proxy at 0x110ae8400 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'yesterday_ds', '2023-12-31')>, 'yesterday_ds_nodash': <Proxy at 0x110ae8480 with factory functools.partial(<function lazy_mapping_from_context.<locals>._deprecated_proxy_factory at 0x110a40670>, 'yesterday_ds_nodash', '20231231')>, 'templates_dict': None}
"""
print(f"kwargs : {kwargs}")
execution_date = kwargs['ds']
execution_date_nodash = kwargs['ds_nodash']
print(f"kwargs: {kwargs}")
execution_date = kwargs["ds"]
execution_date_nodash = kwargs["ds_nodash"]
print(f"execution_date_nodash : {execution_date_nodash}")
execution_date = datetime.strptime(execution_date, "%Y-%m-%d").date()
date_kor = ["월", "화", "수", "목", "금", "토", "일"]
datetime_weeknum = execution_date.weekday()
print(f"{execution_date}{date_kor[datetime_weeknum]}요일입니다")


# with 구문으로 DAG 정의

with DAG(
dag_id='python_dag_with_context',
default_args=default_args,
schedule_interval='30 0 * * *',
tags=['my_dags']
dag_id="python_dag_with_context",
default_args=default_args,
schedule_interval="30 0 * * *",
tags=['my_dags'],
catchup=True
) as dag:

PythonOperator(
task_id='print_current_date_with_context_variable',
python_callable=print_current_date_with_context_variable,
provide_context=True # True일 경우에 Airflow Task Instance의 Attribute를 Keyword Argument로 Python 함수에서 사용할 수 있음
)

# task가 1개일 경우엔 순서를 명시하지 않아도 실행
task_id="print_current_date_with_context",
python_callable=print_current_date_with_context
)
46 changes: 23 additions & 23 deletions 01-batch-serving(airflow)/dags/04-python-operator-with-jinja.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,45 @@
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# 앞의 03-python-operator-with-context는 provide_context=True 옵션을 주고 Attribute에 접근
# 이 방식이 아닌 Airflow의 Template 방식을 사용. Jinja Template => Flask 자주 활용되는 템플릿
# Python에서는 Template이랑 provide_context=True와 큰 차이를 못 느낄 수도 있지만, SQL Opearator나 다른 오퍼레이터에선 유용하게 사용됨(템플릿)
# 쿼리문(WHERE절)에 Airflow의 execution_date를 인자로 넣고 실행
# Jinja Template : Airflow의 미리 정의된 템플릿. {{ ds }}, {{ ds_nodash }} 라고 정의
# Airflow Operator에 넘겨주면 실행 과정에서 템플릿 기반으로 값이 업데이트됨

# 앞의 03-python-operator-with-context는 kwargs로 여러 정보를 같이 주입. ds, ds_nodash
# Jinja Template 사용하면 ds를 kwargs['ds']. {{ ds }}
# Flask, FastAPI에서도 자주 사용
# Python에서는 kwargs로 접근하시면 빠르게 가능. SQL. 쿼리문에서 WHERE 조건에 exeuction_date="2024-01-01"
# {{ ds }}, {{ ds_nodash }}
# Airflow의 Operator에 template_fields 여기에 있는 값들은 Airflow가 실행 과정에서 {{ ds }} => "2024-01-01" 변환

default_args = {
'owner': 'kyle',
'depends_on_past': False, # 이전 DAG의 Task가 성공, 실패 여부에 따라 현재 DAG 실행 여부가 결정. False는 과거의 실행 결과 상관없이 매일 실행한다
'start_date': datetime(2024, 1, 1),
'end_date': datetime(2024, 1, 4),
'retires': 1, # 실패시 재시도 횟수
'retry_delay': timedelta(minutes=5) # 만약 실패하면 5분 뒤 재실행
"owner": "kyle",
"depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행
"start_date": datetime(2024, 1, 1),
"end_date": datetime(2024, 1, 4)
}


# 사용할 함수 정의
def print_current_date_with_jinja(date):
execution_date = datetime.strptime(date, "%Y-%m-%d").date()
date_kor = ["월", "화", "수", "목", "금", "토", "일"]
datetime_weeknum = execution_date.weekday()
print(f"{execution_date}{date_kor[datetime_weeknum]}요일입니다")

return execution_date

# with 구문으로 DAG 정의
with DAG(
dag_id='python_dag_with_jinja',
default_args=default_args,
schedule_interval='30 0 * * *',
tags=['my_dags']
dag_id="python_dag_with_jinja",
default_args=default_args,
schedule_interval="30 0 * * *",
tags=['my_dags'],
catchup=True
) as dag:
execution_date = "{{ ds }}" # Template 정의
execution_date = "{{ ds }}"

python_task_jinja = PythonOperator(
task_id='print_current_date_with_jinja',
task_id="print_current_date_with_jinja",
python_callable=print_current_date_with_jinja,
op_args=[execution_date]
# op_args=[execution_date]
op_kwargs = {
"date": execution_date
}
)

python_task_jinja
python_task_jinja
Original file line number Diff line number Diff line change
@@ -1,41 +1,34 @@
# slack_notifier에 선언한 webhook 전송 함수를 활용하여 slack 알림을 제공하는 예제
# slack_notifier에 선언한 webhook 전송 함수를 활용해 slack 알림을 제공하는 예제

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from datetime import datetime

from airflow.exceptions import AirflowFailException
from utils.slack_notifier import task_fail_slack_alert, task_succ_slack_alert

default_args = {
'owner': 'kyle',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'end_date': datetime(2024, 1, 4),
'retires': 1,
'retry_delay': timedelta(minutes=5),
"owner": "kyle",
"depends_on_past": False, # 이전 DAG의 Task 성공 여부에 따라서 현재 Task를 실행할지 말지가 결정. False는 과거 Task의 성공 여부와 상관없이 실행
"start_date": datetime(2024, 1, 1),
"end_date": datetime(2024, 1, 4)
}


def _handle_job_error() -> None:
raise AirflowFailException("Raise Exception.")


with DAG(
dag_id='python_dag_with_slack_webhook',
dag_id="python_dag_with_slack_webhook",
default_args=default_args,
schedule_interval='30 0 * * *',
tags=['my_dags'],
catchup=False,
on_failure_callback=task_fail_slack_alert,
# on_success_callback=task_succ_slack_alert # 성공 알림 필요 시 추가
schedule_interval="30 0 * * * ",
tags=["my_dags"],
catchup=True,
on_failure_callback=task_fail_slack_alert
) as dag:
execution_date = "{{ ds }}"


send_slack_noti = PythonOperator(
task_id='raise_exception_and_send_slack_noti',
python_callable=_handle_job_error,
op_args=[execution_date]
task_id="raise_exception_and_send_slack_noti",
python_callable=_handle_job_error
)

send_slack_noti
send_slack_noti
Loading

0 comments on commit ee016dd

Please sign in to comment.