Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:Product Serving 리뉴얼 #171

Merged
merged 8 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
.vscode/
.idea/
.venv/
__pycache__/
__pycache__/
.ipynb_checkpoints/
mlruns/
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

from airflow.exceptions import AirflowFailException
from operators.slack_notifier import task_fail_slack_alert, task_succ_slack_alert


# slack_notifier 에 선언한 webhook 전송 함수를 활용하여 slack 알림을 제공합니다
default_args = {
'owner': 'kyle',
'depends_on_past': False,
'start_date': datetime(2022, 4, 20),
'retires': 1,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': task_fail_slack_alert, # 실패 알림
# 'on_success_callback': task_succ_slack_alert, # 성공 알림 필요 시 추가
}


def _handle_job_error() -> None:
raise AirflowFailException("Raise Exception.")


with DAG(
dag_id='python_dag_with_slack_webhook',
default_args=default_args,
schedule_interval='30 0 * * *',
tags=['my_dags']
) as dag:
execution_date = "{{ ds }}"

send_slack_noti = PythonOperator(
task_id='raise_exception_and_send_slack_noti',
python_callable=_handle_job_error,
op_args=[execution_date]
)

send_slack_noti
1 change: 1 addition & 0 deletions 01-batch-serving(airflow)/data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
image: postgres:13
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=1234
- POSTGRES_PASSWORD=!boostcamp-aitech!
volumes:
- ./data:/var/lib/postgresql/data
restart: always
Expand All @@ -27,11 +27,11 @@ services:
airflow db init && \
airflow users create \
--username admin \
--password 1234 \
--firstname heumsi \
--lastname jeon \
--password !boostcamp-aitech! \
--firstname kyle \
--lastname byeon \
--role Admin \
--email heumsi@naver.com \
--email snugyun01@gmail.com \
"
restart: on-failure
networks:
Expand Down Expand Up @@ -72,7 +72,7 @@ services:
- airflow-init
image: codercom/code-server:4.0.1
environment:
- PASSWORD=1234
- PASSWORD=!boostcamp-aitech!
- HOST=0.0.0.0
- PORT=8888
volumes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ $ docker run \
--network airflow \
-v $(pwd)/data:/var/lib/postgresql/data \
-e POSTGRES_USER=airflow \
-e POSTGRES_PASSWORD=1234 \
-e POSTGRES_PASSWORD=!boostcamp-aitech! \
postgres:13
```

Expand Down Expand Up @@ -70,11 +70,11 @@ $ docker run \
airflow db init && \
airflow users create \
--username admin \
--password 1234 \
--firstname heumsi \
--lastname jeon \
--password !boostcamp-aitech! \
--firstname seongyun \
--lastname byeon \
--role Admin \
--email heumsi@naver.com \
--email snugyun01@gmail.com \
"
```

Expand Down Expand Up @@ -141,7 +141,7 @@ $ docker run -it --name code-server \
-d \
-v "$(pwd)/dags:/home/coder/project" \
-p 8888:8888 \
-e PASSWORD=1234 \
-e PASSWORD=!boostcamp-aitech! \
-e HOST=0.0.0.0 \
-e PORT=8888 \
codercom/code-server:4.0.2
Expand Down
47 changes: 47 additions & 0 deletions 01-batch-serving(airflow)/operators/slack_notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

# Slack Webhook 제공 Operator 를 먼저 정의합니다
# 1. Connection ID (실습에서 Webserver 를 통해 생성한 값)
SLACK_DAG_CONN_ID = "my_webhook"


# 2. Webhook 함수 정의
def send_message(slack_msg):
return SlackWebhookOperator(
task_id="slack_webhook",
slack_webhook_conn_id=SLACK_DAG_CONN_ID,
message=slack_msg,
username="sample-airflow",
)


# 3. slack alert 함수 정의
# 메시지에 Slack ID 추가해 tag 가능 (ex. <@U022T50D4F5>)
def task_fail_slack_alert(context):
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: `{dag}`
*Execution Time*: {exec_date}
""".format(
task=context.get("task_instance").task_id,
dag=context.get("task_instance").dag_id,
exec_date=context.get("execution_date"),
)

alert = send_message(slack_msg)

return alert.execute(context=context)


def task_succ_slack_alert(context):
slack_msg = f"""
:large_green_circle: Task SUCC.
*Task*: {context.get("task_instance").task_id}
*Dag*: {context.get("task_instance").dag_id}
*Execution Time*: {context.get("execution_date")}
"""

alert = send_message(slack_msg)

return alert.execute(context=context)
1 change: 1 addition & 0 deletions 01-batch-serving(airflow)/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
apache-airflow==2.7.3
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
from contextlib import asynccontextmanager

from fastapi import FastAPI
import uvicorn

app = FastAPI()

items = {}


@app.on_event("startup")
def startup_event():
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Start Up Event")
items["foo"] = {"name": "Fighters"}
items["bar"] = {"name": "Tenders"}

yield

@app.on_event("shutdown")
def shutdown_event():
print("Shutdown Event!")
with open("log.txt", mode="a") as log:
log.write("Application shutdown")


app = FastAPI(lifespan=lifespan)


@app.get("/items/{item_id}")
def read_items(item_id: str):
return items[item_id]


if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)


uvicorn.run(app, host="0.0.0.0", port=8000)
Loading
Loading