Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Wu, Xiaochang <[email protected]>
  • Loading branch information
xwu99 committed Jun 24, 2024
1 parent 31c42ca commit b88929c
Show file tree
Hide file tree
Showing 11 changed files with 476 additions and 0 deletions.
32 changes: 32 additions & 0 deletions comps/llms/finetuning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Start Funetuning Service Locally

## Start Ray

```bash
$ ./start-ray-for-funetuning.sh
```

## Start Finetuning Service
```bash
$ ./run.sh
```

## Browse FastAPI Web UI for Experiments

http://localhost:8000/docs

### Sample Request for Creating Finetuning Job

```json
{
"training_file": "file-vGxE9KywnSUkEL6dv9qZxKAF.jsonl",
"model": "meta-llama/Llama-2-7b-chat-hf"
}
```

# Dev Notes

### Test if Ray cluster is working
```bash
$ python -c "import ray; ray.init(); print(ray.cluster_resources())"
```
Empty file.
22 changes: 22 additions & 0 deletions comps/llms/finetuning/docker/Dockerfile.finetune
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use the same python version with ray
FROM python:3.10.14

WORKDIR /root/opea-finetune

RUN --mount=type=cache,target=/var/cache/apt apt-get update -y \
&& apt-get install -y vim htop net-tools dnsutils \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

# COPY ./install-llm-on-ray.sh /tmp/install-llm-on-ray.sh
# RUN --mount=type=cache,target=/root/.cache/pip /tmp/install-llm-on-ray.sh

COPY ./ .

RUN --mount=type=cache,target=/root/.cache/pip cd ./llm-on-ray && pip install -v -e .[cpu] --extra-index-url https://download.pytorch.org/whl/cpu --extra-index-url https://pytorch-extension.intel.com/release-whl/stable/cpu/us/

RUN --mount=type=cache,target=/root/.cache/pip pip install --no-cache-dir --upgrade -r requirements.txt

RUN echo 'source $(python -c "import oneccl_bindings_for_pytorch as torch_ccl; print(torch_ccl.cwd)")/env/setvars.sh' >> ~/.bashrc

CMD ["bash", "-c", "./run.sh"]
47 changes: 47 additions & 0 deletions comps/llms/finetuning/finetune_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import time
from typing import List
import uuid
from llm_on_ray.finetune.finetune_config import FinetuneConfig
from ray.tune.logger import LoggerCallback

from ray.tune.callback import Callback
from ray.tune.experiment import Trial
from transformers import TrainingArguments, TrainerState, TrainerControl
from transformers import TrainerCallback

import argparse

from pydantic_yaml import parse_yaml_raw_as
from ray.train.base_trainer import TrainingFailedError

class FineTuneCallback(TrainerCallback):
def __init__(self) -> None:
super().__init__()

def on_log(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs):
print(f"FineTuneCallback:", args, state)


def main():
parser = argparse.ArgumentParser(description="Runner for llm_on_ray-finetune")
parser.add_argument("--config_file", type=str, required=True, default=None)
args = parser.parse_args()
model_config_file = args.config_file

with open(model_config_file) as f:
finetune_config = parse_yaml_raw_as(FinetuneConfig, f).model_dump()

callback = FineTuneCallback()
finetune_config["Training"]["callbacks"] = [callback]

from llm_on_ray.finetune.finetune import main as llm_on_ray_finetune_main

llm_on_ray_finetune_main(finetune_config)
# try:
# llm_on_ray_finetune_main(finetune_config)
# except TrainingFailedError as e:
# print(e)


if __name__ == "__main__":
main()
154 changes: 154 additions & 0 deletions comps/llms/finetuning/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import os
import random
import time
from typing import Any, Dict, List, Set
import uuid

from fastapi import BackgroundTasks, HTTPException
from models import (
FineTuningJobEvent,
FineTuningJobsRequest,
FineTuningJob,
FineTuningJobList,
)
from llm_on_ray.finetune.finetune_config import FinetuneConfig
from llm_on_ray.finetune.finetune import main
from pydantic_yaml import parse_yaml_raw_as, to_yaml_file
from ray.tune.logger import LoggerCallback
from ray.train.base_trainer import TrainingFailedError
from ray.job_submission import JobSubmissionClient


MODEL_CONFIG_FILE_MAP = {
"meta-llama/Llama-2-7b-chat-hf": "./models/llama-2-7b-chat-hf.yaml",
"mistralai/Mistral-7B-v0.1": "./models/mistral-7b-v0.1.yaml",
}

DATASET_BASE_PATH = "datasets"

FineTuningJobID = str
CHECK_JOB_STATUS_INTERVAL = 5 # Check every 5 secs

global ray_client
ray_client: JobSubmissionClient = None

running_finetuning_jobs: Dict[FineTuningJobID, FineTuningJob] = {}
finetuning_job_to_ray_job: Dict[FineTuningJobID, str] = {}


# Add a background task to periodicly update job status
def update_job_status(job_id: FineTuningJobID):
while True:
job_status = ray_client.get_job_status(finetuning_job_to_ray_job[job_id])
status = str(job_status).lower()
# Ray status "stopped" is OpenAI status "cancelled"
status = "cancelled" if status == "stopped" else status
print(f"Status of job {job_id} is '{status}'")
running_finetuning_jobs[job_id].status = status
if status == "finished" or status == "cancelled" or status == "failed":
break
time.sleep(CHECK_JOB_STATUS_INTERVAL)


def handle_create_finetuning_jobs(
request: FineTuningJobsRequest, background_tasks: BackgroundTasks
):
base_model = request.model
train_file = request.training_file
train_file_path = os.path.join(DATASET_BASE_PATH, train_file)

model_config_file = MODEL_CONFIG_FILE_MAP.get(base_model)
if not model_config_file:
raise HTTPException(
status_code=404, detail=f"Base model '{base_model}' not supported!"
)

if not os.path.exists(train_file_path):
raise HTTPException(
status_code=404, detail=f"Training file '{train_file}' not found!"
)

with open(model_config_file) as f:
finetune_config = parse_yaml_raw_as(FinetuneConfig, f)

finetune_config.Dataset.train_file = train_file_path

job = FineTuningJob(
id=f"ft-job-{uuid.uuid4()}",
model=base_model,
created_at=int(time.time()),
training_file=train_file,
hyperparameters={
"n_epochs": finetune_config.Training.epochs,
"batch_size": finetune_config.Training.batch_size,
"learning_rate_multiplier": finetune_config.Training.learning_rate,
},
status="running",
# TODO: Add seed in finetune config
seed=random.randint(0, 1000),
)

finetune_config_file = f"jobs/{job.id}.yaml"
to_yaml_file(finetune_config_file, finetune_config)

global ray_client
ray_client = JobSubmissionClient() if ray_client is None else ray_client

ray_job_id = ray_client.submit_job(
# Entrypoint shell command to execute
entrypoint=f"python finetune_runner.py --config_file {finetune_config_file}",
# Path to the local directory that contains the script.py file
runtime_env={"working_dir": "./"},
)
print(f"Submitted Ray job: {ray_job_id} ...")

running_finetuning_jobs[job.id] = job
finetuning_job_to_ray_job[job.id] = ray_job_id

background_tasks.add_task(update_job_status, job.id)

return job


def handle_list_finetuning_jobs():
finetuning_jobs_list = FineTuningJobList(
data=list(running_finetuning_jobs.values()), has_more=False
)

return finetuning_jobs_list


def handle_retrieve_finetuning_job(fine_tuning_job_id):
job = running_finetuning_jobs.get(fine_tuning_job_id)
if job is None:
raise HTTPException(
status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!"
)
return job


def handle_cancel_finetuning_job(fine_tuning_job_id):
ray_job_id = finetuning_job_to_ray_job.get(fine_tuning_job_id)
if ray_job_id is None:
raise HTTPException(
status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!"
)

global ray_client
ray_client = JobSubmissionClient() if ray_client is None else ray_client
ray_client.stop_job(ray_job_id)

job = running_finetuning_jobs.get(fine_tuning_job_id)
job.status = "cancelled"
return job

# def cancel_all_jobs():
# global ray_client
# ray_client = JobSubmissionClient() if ray_client is None else ray_client
# # stop all jobs
# for job_id in finetuning_job_to_ray_job.values():
# ray_client.stop_job(job_id)

# for job_id in running_finetuning_jobs:
# running_finetuning_jobs[job_id].status = "cancelled"
# return running_finetuning_jobs
Empty file.
90 changes: 90 additions & 0 deletions comps/llms/finetuning/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import uvicorn

from fastapi import BackgroundTasks, FastAPI, Form, Cookie, Header, Response
from pydantic import BaseModel
from models import FineTuningJobsRequest, FineTuningJob, FineTuningJobList
from handlers import (
handle_create_finetuning_jobs,
handle_list_finetuning_jobs,
handle_retrieve_finetuning_job,
handle_cancel_finetuning_job,
)

app = FastAPI()


@app.post("/v1/fine_tuning/jobs", response_model=FineTuningJob)
def create_finetuning_jobs(request: FineTuningJobsRequest, background_tasks: BackgroundTasks):
return handle_create_finetuning_jobs(request, background_tasks)
# return {
# "object": "fine_tuning.job",
# "id": "ftjob-abc123",
# "model": "davinci-002",
# "created_at": 1692661014,
# "finished_at": 1692661190,
# "fine_tuned_model": "ft:davinci-002:my-org:custom_suffix:7q8mpxmy",
# "organization_id": "org-123",
# "result_files": ["file-abc123"],
# "status": "succeeded",
# "validation_file": None,
# "training_file": "file-abc123",
# "hyperparameters": {
# "n_epochs": 4,
# "batch_size": 1,
# "learning_rate_multiplier": 1.0,
# },
# "trained_tokens": 5768,
# "integrations": [],
# "seed": 0,
# "estimated_finish": 0,
# }


@app.get("/v1/fine_tuning/jobs", response_model=FineTuningJobList)
def list_finetuning_jobs():
return handle_list_finetuning_jobs()
# return {
# "object": "list",
# "data": [
# {
# "object": "fine_tuning.job",
# "id": "ftjob-abc123",
# "model": "davinci-002",
# "created_at": 1692661014,
# "finished_at": 1692661190,
# "fine_tuned_model": "ft:davinci-002:my-org:custom_suffix:7q8mpxmy",
# "organization_id": "org-123",
# "result_files": ["file-abc123"],
# "status": "succeeded",
# "training_file": "file-abc123",
# "hyperparameters": {
# "n_epochs": 4,
# "batch_size": 1,
# "learning_rate_multiplier": 1.0,
# },
# "trained_tokens": 5768,
# "integrations": [],
# "seed": 0,
# "estimated_finish": 0,
# },
# ],
# "has_more": True,
# }


@app.get("/v1/fine_tuning/jobs/{fine_tuning_job_id}", response_model=FineTuningJob)
def retrieve_finetuning_job(fine_tuning_job_id):
job = handle_retrieve_finetuning_job(fine_tuning_job_id)
return job


@app.post(
"/v1/fine_tuning/jobs/{fine_tuning_job_id}/cancel", response_model=FineTuningJob
)
def cancel_finetuning_job(fine_tuning_job_id):
job = handle_cancel_finetuning_job(fine_tuning_job_id)
return job


if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Loading

0 comments on commit b88929c

Please sign in to comment.