diff --git a/comps/llms/finetuning/README.md b/comps/llms/finetuning/README.md new file mode 100644 index 0000000000..956906ab33 --- /dev/null +++ b/comps/llms/finetuning/README.md @@ -0,0 +1,32 @@ +# Start Funetuning Service Locally + +## Start Ray + +```bash +$ ./start-ray-for-funetuning.sh +``` + +## Start Finetuning Service +```bash +$ ./run.sh +``` + +## Browse FastAPI Web UI for Experiments + +http://localhost:8000/docs + +### Sample Request for Creating Finetuning Job + +```json +{ + "training_file": "file-vGxE9KywnSUkEL6dv9qZxKAF.jsonl", + "model": "meta-llama/Llama-2-7b-chat-hf" +} +``` + +# Dev Notes + +### Test if Ray cluster is working +```bash +$ python -c "import ray; ray.init(); print(ray.cluster_resources())" +``` diff --git a/comps/llms/finetuning/datasets/.gitkeep b/comps/llms/finetuning/datasets/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/comps/llms/finetuning/docker/Dockerfile.finetune b/comps/llms/finetuning/docker/Dockerfile.finetune new file mode 100644 index 0000000000..30b9c61718 --- /dev/null +++ b/comps/llms/finetuning/docker/Dockerfile.finetune @@ -0,0 +1,22 @@ +# Use the same python version with ray +FROM python:3.10.14 + +WORKDIR /root/opea-finetune + +RUN --mount=type=cache,target=/var/cache/apt apt-get update -y \ + && apt-get install -y vim htop net-tools dnsutils \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# COPY ./install-llm-on-ray.sh /tmp/install-llm-on-ray.sh +# RUN --mount=type=cache,target=/root/.cache/pip /tmp/install-llm-on-ray.sh + +COPY ./ . + +RUN --mount=type=cache,target=/root/.cache/pip cd ./llm-on-ray && pip install -v -e .[cpu] --extra-index-url https://download.pytorch.org/whl/cpu --extra-index-url https://pytorch-extension.intel.com/release-whl/stable/cpu/us/ + +RUN --mount=type=cache,target=/root/.cache/pip pip install --no-cache-dir --upgrade -r requirements.txt + +RUN echo 'source $(python -c "import oneccl_bindings_for_pytorch as torch_ccl; print(torch_ccl.cwd)")/env/setvars.sh' >> ~/.bashrc + +CMD ["bash", "-c", "./run.sh"] \ No newline at end of file diff --git a/comps/llms/finetuning/finetune_runner.py b/comps/llms/finetuning/finetune_runner.py new file mode 100644 index 0000000000..ad526c7255 --- /dev/null +++ b/comps/llms/finetuning/finetune_runner.py @@ -0,0 +1,47 @@ +import time +from typing import List +import uuid +from llm_on_ray.finetune.finetune_config import FinetuneConfig +from ray.tune.logger import LoggerCallback + +from ray.tune.callback import Callback +from ray.tune.experiment import Trial +from transformers import TrainingArguments, TrainerState, TrainerControl +from transformers import TrainerCallback + +import argparse + +from pydantic_yaml import parse_yaml_raw_as +from ray.train.base_trainer import TrainingFailedError + +class FineTuneCallback(TrainerCallback): + def __init__(self) -> None: + super().__init__() + + def on_log(self, args: TrainingArguments, state: TrainerState, control: TrainerControl, **kwargs): + print(f"FineTuneCallback:", args, state) + + +def main(): + parser = argparse.ArgumentParser(description="Runner for llm_on_ray-finetune") + parser.add_argument("--config_file", type=str, required=True, default=None) + args = parser.parse_args() + model_config_file = args.config_file + + with open(model_config_file) as f: + finetune_config = parse_yaml_raw_as(FinetuneConfig, f).model_dump() + + callback = FineTuneCallback() + finetune_config["Training"]["callbacks"] = [callback] + + from llm_on_ray.finetune.finetune import main as llm_on_ray_finetune_main + + llm_on_ray_finetune_main(finetune_config) + # try: + # llm_on_ray_finetune_main(finetune_config) + # except TrainingFailedError as e: + # print(e) + + +if __name__ == "__main__": + main() diff --git a/comps/llms/finetuning/handlers.py b/comps/llms/finetuning/handlers.py new file mode 100644 index 0000000000..a2e33faddc --- /dev/null +++ b/comps/llms/finetuning/handlers.py @@ -0,0 +1,154 @@ +import os +import random +import time +from typing import Any, Dict, List, Set +import uuid + +from fastapi import BackgroundTasks, HTTPException +from models import ( + FineTuningJobEvent, + FineTuningJobsRequest, + FineTuningJob, + FineTuningJobList, +) +from llm_on_ray.finetune.finetune_config import FinetuneConfig +from llm_on_ray.finetune.finetune import main +from pydantic_yaml import parse_yaml_raw_as, to_yaml_file +from ray.tune.logger import LoggerCallback +from ray.train.base_trainer import TrainingFailedError +from ray.job_submission import JobSubmissionClient + + +MODEL_CONFIG_FILE_MAP = { + "meta-llama/Llama-2-7b-chat-hf": "./models/llama-2-7b-chat-hf.yaml", + "mistralai/Mistral-7B-v0.1": "./models/mistral-7b-v0.1.yaml", +} + +DATASET_BASE_PATH = "datasets" + +FineTuningJobID = str +CHECK_JOB_STATUS_INTERVAL = 5 # Check every 5 secs + +global ray_client +ray_client: JobSubmissionClient = None + +running_finetuning_jobs: Dict[FineTuningJobID, FineTuningJob] = {} +finetuning_job_to_ray_job: Dict[FineTuningJobID, str] = {} + + +# Add a background task to periodicly update job status +def update_job_status(job_id: FineTuningJobID): + while True: + job_status = ray_client.get_job_status(finetuning_job_to_ray_job[job_id]) + status = str(job_status).lower() + # Ray status "stopped" is OpenAI status "cancelled" + status = "cancelled" if status == "stopped" else status + print(f"Status of job {job_id} is '{status}'") + running_finetuning_jobs[job_id].status = status + if status == "finished" or status == "cancelled" or status == "failed": + break + time.sleep(CHECK_JOB_STATUS_INTERVAL) + + +def handle_create_finetuning_jobs( + request: FineTuningJobsRequest, background_tasks: BackgroundTasks +): + base_model = request.model + train_file = request.training_file + train_file_path = os.path.join(DATASET_BASE_PATH, train_file) + + model_config_file = MODEL_CONFIG_FILE_MAP.get(base_model) + if not model_config_file: + raise HTTPException( + status_code=404, detail=f"Base model '{base_model}' not supported!" + ) + + if not os.path.exists(train_file_path): + raise HTTPException( + status_code=404, detail=f"Training file '{train_file}' not found!" + ) + + with open(model_config_file) as f: + finetune_config = parse_yaml_raw_as(FinetuneConfig, f) + + finetune_config.Dataset.train_file = train_file_path + + job = FineTuningJob( + id=f"ft-job-{uuid.uuid4()}", + model=base_model, + created_at=int(time.time()), + training_file=train_file, + hyperparameters={ + "n_epochs": finetune_config.Training.epochs, + "batch_size": finetune_config.Training.batch_size, + "learning_rate_multiplier": finetune_config.Training.learning_rate, + }, + status="running", + # TODO: Add seed in finetune config + seed=random.randint(0, 1000), + ) + + finetune_config_file = f"jobs/{job.id}.yaml" + to_yaml_file(finetune_config_file, finetune_config) + + global ray_client + ray_client = JobSubmissionClient() if ray_client is None else ray_client + + ray_job_id = ray_client.submit_job( + # Entrypoint shell command to execute + entrypoint=f"python finetune_runner.py --config_file {finetune_config_file}", + # Path to the local directory that contains the script.py file + runtime_env={"working_dir": "./"}, + ) + print(f"Submitted Ray job: {ray_job_id} ...") + + running_finetuning_jobs[job.id] = job + finetuning_job_to_ray_job[job.id] = ray_job_id + + background_tasks.add_task(update_job_status, job.id) + + return job + + +def handle_list_finetuning_jobs(): + finetuning_jobs_list = FineTuningJobList( + data=list(running_finetuning_jobs.values()), has_more=False + ) + + return finetuning_jobs_list + + +def handle_retrieve_finetuning_job(fine_tuning_job_id): + job = running_finetuning_jobs.get(fine_tuning_job_id) + if job is None: + raise HTTPException( + status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!" + ) + return job + + +def handle_cancel_finetuning_job(fine_tuning_job_id): + ray_job_id = finetuning_job_to_ray_job.get(fine_tuning_job_id) + if ray_job_id is None: + raise HTTPException( + status_code=404, detail=f"Fine-tuning job '{fine_tuning_job_id}' not found!" + ) + + global ray_client + ray_client = JobSubmissionClient() if ray_client is None else ray_client + ray_client.stop_job(ray_job_id) + + job = running_finetuning_jobs.get(fine_tuning_job_id) + job.status = "cancelled" + return job + +# def cancel_all_jobs(): +# global ray_client +# ray_client = JobSubmissionClient() if ray_client is None else ray_client +# # stop all jobs +# for job_id in finetuning_job_to_ray_job.values(): +# ray_client.stop_job(job_id) + +# for job_id in running_finetuning_jobs: +# running_finetuning_jobs[job_id].status = "cancelled" +# return running_finetuning_jobs \ No newline at end of file diff --git a/comps/llms/finetuning/jobs/.gitkeep b/comps/llms/finetuning/jobs/.gitkeep new file mode 100644 index 0000000000..e69de29bb2 diff --git a/comps/llms/finetuning/main.py b/comps/llms/finetuning/main.py new file mode 100644 index 0000000000..4e7df9955e --- /dev/null +++ b/comps/llms/finetuning/main.py @@ -0,0 +1,90 @@ +import uvicorn + +from fastapi import BackgroundTasks, FastAPI, Form, Cookie, Header, Response +from pydantic import BaseModel +from models import FineTuningJobsRequest, FineTuningJob, FineTuningJobList +from handlers import ( + handle_create_finetuning_jobs, + handle_list_finetuning_jobs, + handle_retrieve_finetuning_job, + handle_cancel_finetuning_job, +) + +app = FastAPI() + + +@app.post("/v1/fine_tuning/jobs", response_model=FineTuningJob) +def create_finetuning_jobs(request: FineTuningJobsRequest, background_tasks: BackgroundTasks): + return handle_create_finetuning_jobs(request, background_tasks) + # return { + # "object": "fine_tuning.job", + # "id": "ftjob-abc123", + # "model": "davinci-002", + # "created_at": 1692661014, + # "finished_at": 1692661190, + # "fine_tuned_model": "ft:davinci-002:my-org:custom_suffix:7q8mpxmy", + # "organization_id": "org-123", + # "result_files": ["file-abc123"], + # "status": "succeeded", + # "validation_file": None, + # "training_file": "file-abc123", + # "hyperparameters": { + # "n_epochs": 4, + # "batch_size": 1, + # "learning_rate_multiplier": 1.0, + # }, + # "trained_tokens": 5768, + # "integrations": [], + # "seed": 0, + # "estimated_finish": 0, + # } + + +@app.get("/v1/fine_tuning/jobs", response_model=FineTuningJobList) +def list_finetuning_jobs(): + return handle_list_finetuning_jobs() + # return { + # "object": "list", + # "data": [ + # { + # "object": "fine_tuning.job", + # "id": "ftjob-abc123", + # "model": "davinci-002", + # "created_at": 1692661014, + # "finished_at": 1692661190, + # "fine_tuned_model": "ft:davinci-002:my-org:custom_suffix:7q8mpxmy", + # "organization_id": "org-123", + # "result_files": ["file-abc123"], + # "status": "succeeded", + # "training_file": "file-abc123", + # "hyperparameters": { + # "n_epochs": 4, + # "batch_size": 1, + # "learning_rate_multiplier": 1.0, + # }, + # "trained_tokens": 5768, + # "integrations": [], + # "seed": 0, + # "estimated_finish": 0, + # }, + # ], + # "has_more": True, + # } + + +@app.get("/v1/fine_tuning/jobs/{fine_tuning_job_id}", response_model=FineTuningJob) +def retrieve_finetuning_job(fine_tuning_job_id): + job = handle_retrieve_finetuning_job(fine_tuning_job_id) + return job + + +@app.post( + "/v1/fine_tuning/jobs/{fine_tuning_job_id}/cancel", response_model=FineTuningJob +) +def cancel_finetuning_job(fine_tuning_job_id): + job = handle_cancel_finetuning_job(fine_tuning_job_id) + return job + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/comps/llms/finetuning/models.py b/comps/llms/finetuning/models.py new file mode 100644 index 0000000000..bfcdfe81fe --- /dev/null +++ b/comps/llms/finetuning/models.py @@ -0,0 +1,49 @@ +from typing import List, Optional +from datetime import datetime +from pydantic import BaseModel + + +class FineTuningJobsRequest(BaseModel): + training_file: str + model: str + + +class Hyperparameters(BaseModel): + n_epochs: int + batch_size: int + learning_rate_multiplier: float + + +class FineTuningJob(BaseModel): + object: str = "fine_tuning.job" # Set as constant + id: str + model: str + created_at: int + finished_at: int = None + fine_tuned_model: str = None + organization_id: str = None + result_files: List[str] = None + status: str + validation_file: str = None + training_file: str + hyperparameters: Hyperparameters + trained_tokens: int = None + integrations: List[str] = [] # Empty list by default + seed: int + estimated_finish: int = 0 # Set default value to 0 + + +class FineTuningJobList(BaseModel): + object: str = "list" # Set as constant + data: List[FineTuningJob] + has_more: bool + + +class FineTuningJobEvent(BaseModel): + object: str = "fine_tuning.job.event" # Set as constant + id: str + created_at: int + level: str + message: str + data: None = None # No data expected for this event type, set to None + type: str = "message" # Default event type is "message" diff --git a/comps/llms/finetuning/models/llama-2-7b-chat-hf.yaml b/comps/llms/finetuning/models/llama-2-7b-chat-hf.yaml new file mode 100644 index 0000000000..2071618c48 --- /dev/null +++ b/comps/llms/finetuning/models/llama-2-7b-chat-hf.yaml @@ -0,0 +1,37 @@ +General: + base_model: meta-llama/Llama-2-7b-chat-hf + gpt_base_model: false + output_dir: /tmp/llm-ray/output + save_strategy: no + config: + trust_remote_code: false + use_auth_token: null + lora_config: + task_type: CAUSAL_LM + r: 8 + lora_alpha: 32 + lora_dropout: 0.1 + target_modules: + - q_proj + - v_proj + enable_gradient_checkpointing: false +Dataset: + train_file: examples/data/sample_finetune_data_small.jsonl + group: false + validation_file: null + validation_split_percentage: 5 +Training: + optimizer: adamw_torch + batch_size: 2 + epochs: 3 + learning_rate: 1.0e-05 + lr_scheduler: linear + weight_decay: 0.0 + mixed_precision: bf16 + device: cpu + num_training_workers: 2 + resources_per_worker: + CPU: 32 + accelerate_mode: DDP + gradient_accumulation_steps: 1 + logging_steps: 10 diff --git a/comps/llms/finetuning/models/mistral-7b-v0.1.yaml b/comps/llms/finetuning/models/mistral-7b-v0.1.yaml new file mode 100644 index 0000000000..1d0169564b --- /dev/null +++ b/comps/llms/finetuning/models/mistral-7b-v0.1.yaml @@ -0,0 +1,42 @@ +General: + base_model: mistralai/Mistral-7B-v0.1 + gpt_base_model: false + output_dir: /tmp/llm-ray/output + save_strategy: no + config: + trust_remote_code: false + use_auth_token: null + lora_config: + task_type: CAUSAL_LM + r: 8 + lora_alpha: 32 + lora_dropout: 0.1 + target_modules: + - q_proj + - k_proj + - v_proj + - o_proj + - gate_proj + - up_proj + - down_proj + - lm_head + enable_gradient_checkpointing: false +Dataset: + train_file: examples/data/sample_finetune_data_small.jsonl + validation_file: null + validation_split_percentage: 5 +Training: + optimizer: adamw_torch + batch_size: 2 + epochs: 3 + learning_rate: 1.0e-05 + lr_scheduler: linear + weight_decay: 0.0 + mixed_precision: bf16 + device: cpu + num_training_workers: 2 + resources_per_worker: + CPU: 32 + accelerate_mode: DDP + gradient_accumulation_steps: 1 + logging_steps: 10 diff --git a/comps/llms/finetuning/requirements.txt b/comps/llms/finetuning/requirements.txt new file mode 100644 index 0000000000..6f745c3f18 --- /dev/null +++ b/comps/llms/finetuning/requirements.txt @@ -0,0 +1,3 @@ +fastapi +pydantic +uvicorn \ No newline at end of file