Skip to content

Commit

Permalink
Module 4 [AirFlow + Kubeflow] (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
truskovskiyk authored Jul 20, 2024
1 parent 67eec27 commit 853d778
Show file tree
Hide file tree
Showing 15 changed files with 570 additions and 3,738 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/module-4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env:
IMAGE_MAIN_TAG: latest

jobs:
push-image:
dagster-image:
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand Down
4 changes: 4 additions & 0 deletions module-3/classic-example/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
build:
docker build -f Dockerfile -t classic-example:latest .

push:
docker tag classic-example:latest ghcr.io/kyryl-opens-ml/classic-example:latest
docker push ghcr.io/kyryl-opens-ml/classic-example:latest

run_dev: build
docker run -it -v ${PWD}:/main classic-example:latest /bin/bash

Expand Down
7 changes: 2 additions & 5 deletions module-4/PRACTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

# H7: Kubeflow + AirFlow pipelines

## Reading list:

## Reading list:

- [Kubeflow pipelines Standalone Deployment](https://www.kubeflow.org/docs/components/pipelines/v1/installation/standalone-deployment/)
- [Kubeflow Pipelines SDK API Reference](https://kubeflow-pipelines.readthedocs.io/en/)
Expand All @@ -17,7 +16,6 @@
- [KubernetesPodOperator](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html)



## Task:

For this task, you will need both a training and an inference pipeline. The training pipeline should include at least the following steps: Load Training Data, Train Model, Save Trained Models. Additional steps may be added as desired. Similarly, the inference pipeline should include at least the following steps: Load Data for Inference, Load Trained Model, Run Inference, Save Inference Results. You may also add extra steps to this pipeline as needed.
Expand All @@ -31,12 +29,11 @@ For this task, you will need both a training and an inference pipeline. The trai
- PR6: Write an Airflow inference pipeline.


## Criteria:
## Criteria:

- 6 PRs merged.



# H8: Dagster

## Reading list:
Expand Down
122 changes: 52 additions & 70 deletions module-4/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

# Reference implementation

***
***

# Setup

Create kind cluster
Create kind cluster

```bash
kind create cluster --name ml-in-production
Expand All @@ -28,11 +28,7 @@ k9s -A

# Airflow

## Deploy airflow locally

```bash
export AIRFLOW_HOME=$PWD/airflow_pipelines
```
Install

```bash
AIRFLOW_VERSION=2.9.2
Expand All @@ -42,107 +38,93 @@ pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}
pip install apache-airflow-providers-cncf-kubernetes==8.3.3
```

1. Run standalone airflow
Run standalone airflow

```
```bash
export AIRFLOW_HOME=./airflow_pipelines
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export WANDB_PROJECT=****************
export WANDB_API_KEY=****************
airflow standalone
```

2. Create storage
Create storage

```bash
kubectl create -f ./airflow_pipelines/volumes.yaml
```
kubectl create -f airflow-volumes.yaml
```

3. Read to run pipelines

- https://madewithml.com/courses/mlops/orchestration/


### References:

- https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html
- https://www.astronomer.io/guides/kubepod-operator/
- https://www.astronomer.io/guides/airflow-passing-data-between-tasks/
Open UI

```bash
open http://0.0.0.0:8080
```

# Kubeflow pipelines

## Deploy kubeflow pipelines

Create directly
Trigger training job.

```
export PIPELINE_VERSION=2.0.3
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION"
```bash
airflow dags trigger training_dag
```

Create yaml and applay with kubectl (better option)
Trigger 5 training jobs.

```bash
for i in {1..5}; do airflow dags trigger training_dag; sleep 1; done
```
export PIPELINE_VERSION=2.0.3
kubectl kustomize "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION" > kfp-yml/res.yaml
kubectl kustomize "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION" > kfp-yml/pipelines.yaml

kubectl create -f kfp-yml/res.yaml
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl create -f kfp-yml/pipelines.yaml
Trigger inference job.

```bash
airflow dags trigger inference_dag
```

Trigger 5 inference jobs.

Access UI and minio
```bash
for i in {1..5}; do airflow dags trigger inference_dag; sleep 1; done
```

### References:

```
kubectl port-forward --address=0.0.0.0 svc/minio-service 9000:9000 -n kubeflow
kubectl port-forward --address=0.0.0.0 svc/ml-pipeline-ui 8888:80 -n kubeflow
```
- [AI + ML examples of DAGs](https://registry.astronomer.io/dags?categoryName=AI+%2B+Machine+Learning&limit=24&sorts=updatedAt%3Adesc)
- [Pass data between tasks](https://www.astronomer.io/docs/learn/airflow-passing-data-between-tasks)


## Create pipelines
# Kubeflow pipelines

Setup env variables
Install

```
```bash
export WANDB_PROJECT=****************
export WANDB_API_KEY=****************
export PIPELINE_VERSION=2.2.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref=$PIPELINE_VERSION"
```


### Training & Inference V2 (2.0.3)

```
python kfp-training-pipeline_v2.py http://0.0.0.0:8080
```
Access UI and minio

```
python kfp-inference-pipeline_v2.py http://0.0.0.0:8080
```bash
kubectl port-forward --address=0.0.0.0 svc/minio-service 9000:9000 -n kubeflow
kubectl port-forward --address=0.0.0.0 svc/ml-pipeline-ui 3000:80 -n kubeflow
```

Create training job.

### Training & Inference V1 (1.8.9)
```bash
python ./kubeflow_pipelines/kfp_training_pipeline.py http://0.0.0.0:3000
```

Create inference job.

```
python kfp-training-pipeline_v1.py http://0.0.0.0:8080
```bash
python kubeflow_pipelines/kfp_inference_pipeline.py http://0.0.0.0:3000
```

```
python kfp-inference-pipeline_v1.py http://0.0.0.0:8080
```

### References

- https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts/#new-pythonic-artifact-syntax



# Dagster


- https://github.com/dagster-io/dagster_llm_finetune
- https://dagster.io/blog/finetuning-llms

- [Create, use, pass, and track ML artifacts](https://www.kubeflow.org/docs/components/pipelines/v2/data-types/artifacts/#new-pythonic-artifact-syntax)
- [Vertex AI](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)
3 changes: 2 additions & 1 deletion module-4/airflow_pipelines/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ logs
airflow.cfg
airflow.db
standalone_admin_password.txt
webserver_config.py
webserver_config.py
airflow-webserver.pid
53 changes: 41 additions & 12 deletions module-4/airflow_pipelines/dags/inference_dag.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,69 @@
import os
from datetime import datetime

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s

DOCKER_IMAGE = "ghcr.io/kyryl-opens-ml/classic-example:main"
STORAGE_NAME = "training-storage"
WANDB_PROJECT = os.getenv("WANDB_PROJECT")
WANDB_API_KEY = os.getenv("WANDB_API_KEY")

volume = k8s.V1Volume(
name="inference-storage",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="inference-storage"),
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="inference-storage"
),
)
volume_mount = k8s.V1VolumeMount(
name="inference-storage", mount_path="/tmp/", sub_path=None
)
volume_mount = k8s.V1VolumeMount(name="inference-storage", mount_path="/tmp/", sub_path=None)

with DAG(start_date=datetime(2021, 1, 1), catchup=False, schedule_interval=None, dag_id="inference_dag") as dag:
with DAG(
start_date=datetime(2021, 1, 1),
catchup=False,
schedule_interval=None,
dag_id="inference_dag",
) as dag:
clean_storage_before_start = KubernetesPodOperator(
name="clean_storage_before_start",
image="kyrylprojector/nlp-sample:latest",
image=DOCKER_IMAGE,
cmds=["rm", "-rf", "/tmp/data/*"],
task_id="clean_storage_before_start",
in_cluster=False,
namespace="default",
startup_timeout_seconds=600,
image_pull_policy="Always",
volumes=[volume],
volume_mounts=[volume_mount],
)

load_data = KubernetesPodOperator(
name="load_data",
image="kyrylprojector/nlp-sample:latest",
cmds=["python", "nlp_sample/cli.py", "load-cola-data", "/tmp/data/"],
image=DOCKER_IMAGE,
cmds=["python", "classic_example/cli.py", "load-sst2-data", "/tmp/data/"],
task_id="load_data",
in_cluster=False,
namespace="default",
startup_timeout_seconds=600,
image_pull_policy="Always",
volumes=[volume],
volume_mounts=[volume_mount],
)

load_model = KubernetesPodOperator(
name="load_model",
image="kyrylprojector/nlp-sample:latest",
cmds=["python", "nlp_sample/cli.py", "load-from-registry", "kfp-pipeline:latest", "/tmp/results/"],
image=DOCKER_IMAGE,
cmds=[
"python",
"classic_example/cli.py",
"load-from-registry",
"airflow-pipeline:latest",
"/tmp/results/",
],
task_id="load_model",
env_vars={"WANDB_PROJECT": "course-27-10-2023-week-3", "WANDB_API_KEY": ""},
env_vars={"WANDB_PROJECT": WANDB_PROJECT, "WANDB_API_KEY": WANDB_API_KEY},
in_cluster=False,
namespace="default",
volumes=[volume],
Expand All @@ -47,10 +72,10 @@

run_inference = KubernetesPodOperator(
name="run_inference",
image="kyrylprojector/nlp-sample:latest",
image=DOCKER_IMAGE,
cmds=[
"python",
"nlp_sample/cli.py",
"classic_example/cli.py",
"run-inference-on-dataframe",
"/tmp/data/test.csv",
"/tmp/results/",
Expand All @@ -59,17 +84,21 @@
task_id="run_inference",
in_cluster=False,
namespace="default",
startup_timeout_seconds=600,
image_pull_policy="Always",
volumes=[volume],
volume_mounts=[volume_mount],
)

clean_up = KubernetesPodOperator(
name="clean_up",
image="kyrylprojector/nlp-sample:latest",
image=DOCKER_IMAGE,
cmds=["rm", "-rf", "/tmp/data/*"],
task_id="clean_up",
in_cluster=False,
namespace="default",
startup_timeout_seconds=600,
image_pull_policy="Always",
volumes=[volume],
volume_mounts=[volume_mount],
trigger_rule="all_done",
Expand Down
Loading

0 comments on commit 853d778

Please sign in to comment.