Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Airflow DAG(에러 발생시 Slack 메세지 전송 #179

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,47 +1,41 @@
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

# Slack Webhook 제공 Operator 를 먼저 정의합니다
# 1. Connection ID (실습에서 Webserver 를 통해 생성한 값)
SLACK_DAG_CONN_ID = "my_webhook"


# 2. Webhook 함수 정의
def send_message(slack_msg):
return SlackWebhookOperator(
task_id="slack_webhook",
slack_webhook_conn_id=SLACK_DAG_CONN_ID,
message=slack_msg,
username="Airflow-alert",
)


# 3. slack alert 함수 정의
# 메시지에 Slack ID 추가해 tag 가능 (ex. <@U022T50D4F5>)
def task_fail_slack_alert(context):
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: `{dag}`
*Execution Time*: {exec_date}
""".format(
task=context.get("task_instance").task_id,
dag=context.get("task_instance").dag_id,
exec_date=context.get("execution_date"),
# slack_notifier에 선언한 webhook 전송 함수를 활용하여 slack 알림을 제공하는 예제

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

from airflow.exceptions import AirflowFailException
from utils.slack_notifier import task_fail_slack_alert, task_succ_slack_alert

default_args = {
'owner': 'kyle',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'start_date': datetime(2024, 1, 4),
'retires': 1,
'retry_delay': timedelta(minutes=5),
}


def _handle_job_error() -> None:
raise AirflowFailException("Raise Exception.")


with DAG(
dag_id='python_dag_with_slack_webhook',
default_args=default_args,
schedule_interval='30 0 * * *',
tags=['my_dags'],
catchup=False,
on_failure_callback=task_fail_slack_alert,
# on_success_callback=task_succ_slack_alert # 성공 알림 필요 시 추가
) as dag:
execution_date = "{{ ds }}"

send_slack_noti = PythonOperator(
task_id='raise_exception_and_send_slack_noti',
python_callable=_handle_job_error,
op_args=[execution_date]
)

alert = send_message(slack_msg)

return alert.execute(context=context)


def task_succ_slack_alert(context):
slack_msg = f"""
:large_green_circle: Task SUCC.
*Task*: {context.get("task_instance").task_id}
*Dag*: {context.get("task_instance").dag_id}
*Execution Time*: {context.get("execution_date")}
"""

alert = send_message(slack_msg)

return alert.execute(context=context)
send_slack_noti
Loading