Skip to content

Commit

Permalink
Merge pull request #41 from Health-Informatics-UoN/refactor/apirouter
Browse files Browse the repository at this point in the history
Refactored the app to have the routes in separate files
  • Loading branch information
kuraisle authored Sep 5, 2024
2 parents fa68a12 + acf8757 commit 108bdec
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 347 deletions.
351 changes: 7 additions & 344 deletions Carrot-Assistant/app.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,14 @@
import asyncio
from collections.abc import AsyncGenerator
import json
from enum import Enum
from typing import Optional, List, Dict, Any
import time

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from sse_starlette.sse import EventSourceResponse

import assistant
from components.pipeline import llm_pipeline
from omop import OMOP_match
from options.base_options import BaseOptions
from utils.logging_utils import Logger
from components.embeddings import Embeddings
from components.embeddings import EmbeddingModelName
from routers import pipeline_routes

logger = Logger().make_logger()
app = FastAPI(
title="OMOP concpet Assistant",
title="OMOP concept Assistant",
description="The API to assist in identifying OMOP concepts",
version="0.1.0",
contact={
"name": "Reza Omidvar",
"email": "reza.omidvar@nottingham.ac.uk",
"name": "BRC, University of Nottingham",
"email": "james.mitchell-white1@nottingham.ac.uk",
},
)

Expand All @@ -37,331 +20,11 @@
allow_headers=["*"],
)

# the LLMModel class could be more powerful if we remove the option of using the GPTs.
# Then it can be an Enum of dicts, and the dicts can be unpacked into the arguments for the hf download

class LLMModel(str, Enum):
"""
This enum holds the names of the different models the assistant can use
"""

GPT_3_5_TURBO = "gpt-3.5-turbo-0125"
GPT_4 = "gpt-4"
LLAMA_2_7B = "llama-2-7B-chat"
LLAMA_3_8B = "llama-3-8b"
LLAMA_3_70B = "llama-3-70b"
GEMMA_7B = "gemma-7b"
LLAMA_3_1_8B = "llama-3.1-8b"


class PipelineOptions(BaseModel):
"""
This class holds the options available to the Llettuce pipeline
These are all the options in the BaseOptions parser. The defaults provided here match the default options in BaseOptions. Using a pydantic model means FastAPI can take these as input in the API request
Attributes
----------
llm_model: LLMModel
The name of the LLM used in the pipeline. The permitted values are the possibilities in the LLMModel enum
temperature: float
Temperature supplied to the LLM that tunes the variability of responses
concept_ancestor: bool
If true, the concept_ancestor table of the OMOP vocabularies is queried for the results of an OMOP search. Defaults to false
concept_relationship: bool
If true, the concept_relationship table of the OMOP vocabularies is queried for the results of an OMOP search. Defaults to false
concept_synonym: bool
If true, the concept_synonym table of the OMOP vocabularies is queried when OMOP concepts are fetched. Defaults to false
search_threshold: int
The threshold on fuzzy string matching for returned results
max_separation_descendant: int
The maximum separation to search for concept descendants
max_separation_ancestor: int
The maximum separation to search for concept ancestors
"""

llm_model: LLMModel = LLMModel.LLAMA_3_1_8B
temperature: float = 0
vocabulary_id: str = "RxNorm" # TODO: make multiples possible
concept_ancestor: bool = False
concept_relationship: bool = False
concept_synonym: bool = False
search_threshold: int = 80
max_separation_descendants: int = 1
max_separation_ancestor: int = 1
embeddings_path: str = "concept_embeddings.qdrant"
force_rebuild: bool = False
embed_vocab: List[str] = ["RxNorm", "RxNorm Extension"]
embedding_model: EmbeddingModelName = EmbeddingModelName.BGESMALL
embedding_search_kwargs: dict = {"top_k":5}


class PipelineRequest(BaseModel):
"""
This class takes the format of a request to the API
Attributes
----------
name: str
The drug name sent to a pipeline
pipeline_options: Optional[PipelineOptions]
Optionally, the default values can be overridden by instantiating a PipelineOptions object. If none is supplied, default arguments are used
"""

names: List[str]
pipeline_options: Optional[PipelineOptions] = Field(default_factory=PipelineOptions)


def parse_pipeline_args(base_options: BaseOptions, options: PipelineOptions) -> None:
"""
Use the values of a PipelineOptions object to override defaults
Parameters
----------
base_options: BaseOptions
The base options from the command-line application
options: PipelineOptions
Overrides from an API request
Returns
-------
None
"""
base_options._parser.set_defaults(
llm_model=options.llm_model.value,
temperature=options.temperature,
vocabulary_id=options.vocabulary_id,
concept_ancestor="y" if options.concept_ancestor else "n",
concept_relationship="y" if options.concept_relationship else "n",
concept_synonym="y" if options.concept_synonym else "n",
search_threshold=options.search_threshold,
max_separation_descendants=options.max_separation_descendants,
max_separation_ancestor=options.max_separation_ancestor,
)


async def generate_events(request: PipelineRequest) -> AsyncGenerator[str]:
"""
Generate LLM output and OMOP results for a list of informal names
Parameters
----------
request: PipelineRequest
The request containing the list of informal names.
Workflow
--------
For each informal name:
The first event is to Query the OMOP database for a match
The second event is to fetches relevant concepts from the OMOP database
Finally,The function yields results as they become available,
allowing for real-time streaming.
Conditions
----------
If the OMOP database returns a match, the LLM is not queried
If the OMOP database does not return a match,
the LLM is used to find the formal name and the OMOP database is
queried for the LLM output.
Finally, the function yields the results for real-time streaming.
Yields
------
str
JSON encoded strings of the event results. Two types are yielded:
1. "llm_output": The result from the language model processing.
2. "omop_output": The result from the OMOP database matching.
"""
informal_names = request.names
opt = BaseOptions()
opt.initialize()
parse_pipeline_args(opt, request.pipeline_options)
opt = opt.parse()

print("Received informal names:", informal_names)

# Query OMOP for each informal name

for informal_name in informal_names:
print(f"Querying OMOP for informal name: {informal_name}")
omop_output = OMOP_match.run(opt=opt, search_term=informal_name, logger=logger)

if omop_output and any(concept["CONCEPT"] for concept in omop_output):
print(f"OMOP match found for {informal_name}: {omop_output}")
output = {"event": "omop_output", "data": omop_output}
yield json.dumps(output)
continue

else:
print("No satisfactory OMOP results found for {informal_name}, using LLM...")

# Use LLM to find the formal name and query OMOP for the LLM output

llm_outputs = assistant.run(opt=opt, informal_names=informal_names, logger=logger)
for llm_output in llm_outputs:


print("LLM output for", llm_output["informal_name"], ":", llm_output["reply"])

print("Querying OMOP for LLM output:", llm_output["reply"])

output = {"event": "llm_output", "data": llm_output}
yield json.dumps(output)

# Simulate some delay before sending the next part
await asyncio.sleep(2)

omop_output = OMOP_match.run(
opt=opt, search_term=llm_output["reply"], logger=logger
app.include_router(
router=pipeline_routes.router,
prefix="/pipeline",
)

print("OMOP output for", llm_output["reply"], ":", omop_output)

output = {"event": "omop_output", "data": omop_output}
yield json.dumps(output)


@app.post("/run")
async def run_pipeline(request: PipelineRequest) -> EventSourceResponse:
"""
Call generate_events to run the pipeline
Parameters
----------
request: PipelineRequest
The request containing a list of informal names
Returns
-------
EventSourceResponse
The response containing the events
"""
return EventSourceResponse(generate_events(request))


@app.post("/run_db")
async def run_db(request: PipelineRequest) -> List[Dict[str,Any]]:
"""
Fetch OMOP concepts for a name
Default options can be overridden by the pipeline_options in the request
Parameters
----------
request: PipelineRequest
An API request containing a list of informal names and the options of a pipeline
Returns
-------
dict
Details of OMOP concept(s) fetched from a database query
"""
search_terms = request.names
opt = BaseOptions()
opt.initialize()
parse_pipeline_args(opt, request.pipeline_options)
opt = opt.parse()

omop_outputs = []
for search_term in search_terms:
omop_output = OMOP_match.run(opt=opt, search_term=search_term, logger=logger)
omop_outputs.append({"event": "omop_output", "content": omop_output})

return omop_outputs

@app.post("/run_vector_search")
async def run_vector_search(request: PipelineRequest):
"""
Search a vector database for a name
Default options can be overridden by pipeline_options
A warning: if you don't have a vector database set up under the embeddings_path, this method will build one for you. This takes a while, an hour using 2.8 GHz intel I7, 16 Gb RAM.
Parameters
----------
request: PipelineRequest
An API request containing a list of informal names and the options of a pipeline
Returns
-------
list
Details of OMOP concept(s) fetched from a vector database query
"""
search_terms = request.names
embeddings = Embeddings(
embeddings_path=request.pipeline_options.embeddings_path,
force_rebuild=request.pipeline_options.force_rebuild,
embed_vocab=request.pipeline_options.embed_vocab,
model_name=request.pipeline_options.embedding_model,
search_kwargs=request.pipeline_options.embedding_search_kwargs,
)
return {'event': 'vector_search_output', 'content': embeddings.search(search_terms)}

@app.post("/run_vector_llm_pipeline")
async def vector_llm_pipeline(request: PipelineRequest) -> List:
"""
Run a RAG pipeline that first checks a vector database, then uses an LLM
This has a conditional router in it that checks whether there's an exact match for the term.
If there is an exact match, the vector search results are returned.
If there is not, the vector search results are used for retrieval augmented generation
Parameters
----------
request: PipelineRequest
Returns
-------
list
"""
informal_names = request.names
opt = BaseOptions()
opt.initialize()
parse_pipeline_args(opt, request.pipeline_options)
opt = opt.parse()
# I think this is what they call technical debt
opt.embeddings_path = request.pipeline_options.embeddings_path
opt.force_rebuild = request.pipeline_options.force_rebuild
opt.embed_vocab = request.pipeline_options.embed_vocab
opt.embedding_model = request.pipeline_options.embedding_model
opt.embedding_search_kwargs = request.pipeline_options.embedding_search_kwargs

pl = llm_pipeline(opt=opt, logger=logger).get_rag_assistant()
start = time.time()
pl.warm_up()
logger.info(f"Pipeline warmup in {time.time()-start} seconds")

results = []
run_start = time.time()

for informal_name in informal_names:
start = time.time()
res = pl.run(
{
"query_embedder": {"text": informal_name},
"prompt": {"informal_name": informal_name},
},
include_outputs_from={"retriever", "llm"},
)
inference_time = time.time()-start

def build_output(informal_name, result, inf_time) -> dict:
output = {
"informal_name": informal_name,
"inference_time": inf_time,
}
if 'llm' in result.keys():
output['llm_output'] = result["llm"]["replies"][0].strip()
output["vector_search_output"] = [{"content": doc.content, "score": doc.score} for doc in result["retriever"]["documents"]]
return output

results.append(build_output(informal_name, res, inference_time))

logger.info(f"Complete run in {time.time()-run_start} seconds")
return results


if __name__ == "__main__":
import uvicorn
Expand Down
Loading

0 comments on commit 108bdec

Please sign in to comment.