From 5663e168212ae5044ad18aa285033d8a2e7bf046 Mon Sep 17 00:00:00 2001 From: Eero Tamminen Date: Fri, 6 Dec 2024 05:08:57 +0200 Subject: [PATCH 1/5] Exclude yield/reply time from first token latency metric (#973) While metrics are OK for small number of requests, when megaservice is handling many (hundreds of) _parallel_ requests, it was reporting clearly (~10%) larger first token latency, than the client receiving the tokens from the megaservice. Getting the time before token is yielded, means that reported first token latency can be slightly shorter than it actually is. However, testing with ChatQnA shows latencies to be clearly closer to ones seen by the client (within couple of percent) and typically smaller (i.e. logical). PS. Doing the metrics timing after yielding the token, meant that also time for sending the reply to the client and waiting that to complete, was included to the token time. I suspect that with lot of parallel requests, processing often had switched to other megaservice request processing threads, and getting control back to yielding thread for timing, could be delayed much longer than sending the response to client took. Signed-off-by: Eero Tamminen --- comps/cores/mega/orchestrator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/comps/cores/mega/orchestrator.py b/comps/cores/mega/orchestrator.py index 8a75f9cff..803965f6e 100644 --- a/comps/cores/mega/orchestrator.py +++ b/comps/cores/mega/orchestrator.py @@ -237,8 +237,8 @@ def generate(): ) token_start = time.time() else: - yield chunk token_start = self.metrics.token_update(token_start, is_first) + yield chunk is_first = False self.metrics.request_update(req_start) self.metrics.pending_update(False) @@ -306,7 +306,7 @@ def token_generator(self, sentence: str, token_start: float, is_first: bool, is_ suffix = "\n\n" tokens = re.findall(r"\s?\S+\s?", sentence, re.UNICODE) for token in tokens: - yield prefix + repr(token.replace("\\n", "\n").encode("utf-8")) + suffix token_start = self.metrics.token_update(token_start, is_first) + yield prefix + repr(token.replace("\\n", "\n").encode("utf-8")) + suffix if is_last: yield "data: [DONE]\n\n" From fbf3017afb8d024007f7e3ca545eb9faa7d29399 Mon Sep 17 00:00:00 2001 From: Yao Qing Date: Fri, 6 Dec 2024 13:18:51 +0800 Subject: [PATCH 2/5] Revert mosec embedding microservice to to use synchronous interface. (#971) * Revert mosec embedding microservice to to use synchronous interface. Signed-off-by: Yao, Qing * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add dependency. Signed-off-by: Yao, Qing --------- Signed-off-by: Yao, Qing Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .../mosec/langchain/embedding_mosec.py | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/comps/embeddings/mosec/langchain/embedding_mosec.py b/comps/embeddings/mosec/langchain/embedding_mosec.py index 38e92b5a7..e422d92b6 100644 --- a/comps/embeddings/mosec/langchain/embedding_mosec.py +++ b/comps/embeddings/mosec/langchain/embedding_mosec.py @@ -7,6 +7,7 @@ from typing import List, Optional, Union from langchain_community.embeddings import OpenAIEmbeddings +from langchain_community.embeddings.openai import async_embed_with_retry from comps import ( CustomLogger, @@ -35,7 +36,7 @@ async def _aget_len_safe_embeddings( ) -> List[List[float]]: _chunk_size = chunk_size or self.chunk_size batched_embeddings: List[List[float]] = [] - response = self.client.create(input=texts, **self._invocation_params) + response = await async_embed_with_retry(self, input=texts, **self._invocation_params) if not isinstance(response, dict): response = response.model_dump() batched_embeddings.extend(r["embedding"] for r in response["data"]) @@ -45,7 +46,7 @@ async def _aget_len_safe_embeddings( async def empty_embedding() -> List[float]: nonlocal _cached_empty_embedding if _cached_empty_embedding is None: - average_embedded = self.client.create(input="", **self._invocation_params) + average_embedded = await async_embed_with_retry(self, input="", **self._invocation_params) if not isinstance(average_embedded, dict): average_embedded = average_embedded.model_dump() _cached_empty_embedding = average_embedded["data"][0]["embedding"] @@ -57,6 +58,29 @@ async def get_embedding(e: Optional[List[float]]) -> List[float]: embeddings = await asyncio.gather(*[get_embedding(e) for e in batched_embeddings]) return embeddings + def _get_len_safe_embeddings( + self, texts: List[str], *, engine: str, chunk_size: Optional[int] = None + ) -> List[List[float]]: + _chunk_size = chunk_size or self.chunk_size + batched_embeddings: List[List[float]] = [] + response = self.client.create(input=texts, **self._invocation_params) + if not isinstance(response, dict): + response = response.model_dump() + batched_embeddings.extend(r["embedding"] for r in response["data"]) + + _cached_empty_embedding: Optional[List[float]] = None + + def empty_embedding() -> List[float]: + nonlocal _cached_empty_embedding + if _cached_empty_embedding is None: + average_embedded = self.client.create(input="", **self._invocation_params) + if not isinstance(average_embedded, dict): + average_embedded = average_embedded.model_dump() + _cached_empty_embedding = average_embedded["data"][0]["embedding"] + return _cached_empty_embedding + + return [e if e is not None else empty_embedding() for e in batched_embeddings] + @register_microservice( name="opea_service@embedding_mosec", @@ -68,18 +92,18 @@ async def get_embedding(e: Optional[List[float]]) -> List[float]: output_datatype=EmbedDoc, ) @register_statistics(names=["opea_service@embedding_mosec"]) -async def embedding( +def embedding( input: Union[TextDoc, EmbeddingRequest, ChatCompletionRequest] ) -> Union[EmbedDoc, EmbeddingResponse, ChatCompletionRequest]: if logflag: logger.info(input) start = time.time() if isinstance(input, TextDoc): - embed_vector = await get_embeddings(input.text) + embed_vector = get_embeddings(input.text) embedding_res = embed_vector[0] if isinstance(input.text, str) else embed_vector res = EmbedDoc(text=input.text, embedding=embedding_res) else: - embed_vector = await get_embeddings(input.input) + embed_vector = get_embeddings(input.input) if input.dimensions is not None: embed_vector = [embed_vector[i][: input.dimensions] for i in range(len(embed_vector))] @@ -99,9 +123,9 @@ async def embedding( return res -async def get_embeddings(text: Union[str, List[str]]) -> List[List[float]]: +def get_embeddings(text: Union[str, List[str]]) -> List[List[float]]: texts = [text] if isinstance(text, str) else text - embed_vector = await embeddings.aembed_documents(texts) + embed_vector = embeddings.embed_documents(texts) return embed_vector From 5ed041bdedabefdbc6700037a7f977df7411ba24 Mon Sep 17 00:00:00 2001 From: kkrishTa <56536056+kkrishTa@users.noreply.github.com> Date: Tue, 10 Dec 2024 07:10:44 +0530 Subject: [PATCH 3/5] Feature/elasticsearch vector store integration - Infosys (#972) * Feature/elastic Elasticsearch vectorstore, dataprep and retriever --------- Co-authored-by: Adarsh Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Liang Lv --- .../docker/compose/dataprep-compose.yaml | 4 + .../docker/compose/retrievers-compose.yaml | 4 + .gitignore | 3 + .../elasticsearch/langchain/Dockerfile | 38 ++ .../elasticsearch/langchain/README.md | 130 ++++++ .../elasticsearch/langchain/__init__.py | 2 + .../elasticsearch/langchain/config.py | 23 ++ .../langchain/docker-compose.yaml | 41 ++ .../langchain/prepare_doc_elasticsearch.py | 373 ++++++++++++++++++ .../elasticsearch/langchain/requirements.txt | 30 ++ .../elasticsearch/langchain/Dockerfile | 28 ++ .../elasticsearch/langchain/README.md | 122 ++++++ .../elasticsearch/langchain/config.py | 18 + .../langchain/docker_compose_retriever.yaml | 33 ++ .../elasticsearch/langchain/requirements.txt | 14 + .../langchain/retriever_elasticsearch.py | 105 +++++ comps/vectorstores/elasticsearch/README.md | 13 + comps/vectorstores/elasticsearch/__init__.py | 2 + .../elasticsearch/docker-compose.yml | 16 + .../test_dataprep_elasticsearch_langchain.sh | 137 +++++++ ...test_retrievers_elasticsearch_langchain.sh | 93 +++++ 21 files changed, 1229 insertions(+) create mode 100644 comps/dataprep/elasticsearch/langchain/Dockerfile create mode 100644 comps/dataprep/elasticsearch/langchain/README.md create mode 100644 comps/dataprep/elasticsearch/langchain/__init__.py create mode 100644 comps/dataprep/elasticsearch/langchain/config.py create mode 100644 comps/dataprep/elasticsearch/langchain/docker-compose.yaml create mode 100644 comps/dataprep/elasticsearch/langchain/prepare_doc_elasticsearch.py create mode 100644 comps/dataprep/elasticsearch/langchain/requirements.txt create mode 100644 comps/retrievers/elasticsearch/langchain/Dockerfile create mode 100644 comps/retrievers/elasticsearch/langchain/README.md create mode 100644 comps/retrievers/elasticsearch/langchain/config.py create mode 100644 comps/retrievers/elasticsearch/langchain/docker_compose_retriever.yaml create mode 100644 comps/retrievers/elasticsearch/langchain/requirements.txt create mode 100644 comps/retrievers/elasticsearch/langchain/retriever_elasticsearch.py create mode 100644 comps/vectorstores/elasticsearch/README.md create mode 100644 comps/vectorstores/elasticsearch/__init__.py create mode 100644 comps/vectorstores/elasticsearch/docker-compose.yml create mode 100644 tests/dataprep/test_dataprep_elasticsearch_langchain.sh create mode 100644 tests/retrievers/test_retrievers_elasticsearch_langchain.sh diff --git a/.github/workflows/docker/compose/dataprep-compose.yaml b/.github/workflows/docker/compose/dataprep-compose.yaml index 7908e8c26..6e887d6cf 100644 --- a/.github/workflows/docker/compose/dataprep-compose.yaml +++ b/.github/workflows/docker/compose/dataprep-compose.yaml @@ -63,3 +63,7 @@ services: build: dockerfile: comps/dataprep/multimedia2text/audio2text/Dockerfile image: ${REGISTRY:-opea}/dataprep-audio2text:${TAG:-latest} + dataprep-elasticsearch: + build: + dockerfile: comps/dataprep/elasticsearch/langchain/Dockerfile + image: ${REGISTRY:-opea}/dataprep-elasticsearch:${TAG:-latest} diff --git a/.github/workflows/docker/compose/retrievers-compose.yaml b/.github/workflows/docker/compose/retrievers-compose.yaml index 7b89ce9bf..dc49b95d5 100644 --- a/.github/workflows/docker/compose/retrievers-compose.yaml +++ b/.github/workflows/docker/compose/retrievers-compose.yaml @@ -47,3 +47,7 @@ services: build: dockerfile: comps/retrievers/neo4j/llama_index/Dockerfile image: ${REGISTRY:-opea}/retriever-neo4j-llamaindex:${TAG:-latest} + retriever-elasticsearch: + build: + dockerfile: comps/retrievers/elasticsearch/langchain/Dockerfile + image: ${REGISTRY:-opea}/retriever-elasticsearch:${TAG:-latest} diff --git a/.gitignore b/.gitignore index 1d1e0a389..875e50bee 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ __pycache__ *.egg-info/ .DS_Store +.idea/ +.venv/ +build/ \ No newline at end of file diff --git a/comps/dataprep/elasticsearch/langchain/Dockerfile b/comps/dataprep/elasticsearch/langchain/Dockerfile new file mode 100644 index 000000000..016a5d04d --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/Dockerfile @@ -0,0 +1,38 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +FROM python:3.11-slim + +ENV LANG=C.UTF-8 + +ARG ARCH="cpu" + +RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ + build-essential \ + default-jre \ + libgl1-mesa-glx \ + libjemalloc-dev + +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ + +USER user + +COPY comps /home/user/comps + +RUN pip install --no-cache-dir --upgrade pip setuptools && \ + if [ ${ARCH} = "cpu" ]; then pip install --no-cache-dir torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \ + pip install --no-cache-dir -r /home/user/comps/dataprep/elasticsearch/langchain/requirements.txt + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +USER root + +RUN mkdir -p /home/user/comps/dataprep/elasticsearch/langchain/uploaded_files && chown -R user /home/user/comps/dataprep/elasticsearch/langchain/uploaded_files + +USER user + +WORKDIR /home/user/comps/dataprep/elasticsearch/langchain + +ENTRYPOINT ["python", "prepare_doc_elasticsearch.py"] diff --git a/comps/dataprep/elasticsearch/langchain/README.md b/comps/dataprep/elasticsearch/langchain/README.md new file mode 100644 index 000000000..296a1d6db --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/README.md @@ -0,0 +1,130 @@ +# Dataprep Microservice with Elasticsearch + +## ๐Ÿš€1. Start Microservice with Python๏ผˆOption 1๏ผ‰ + +### 1.1 Install Requirements + +```bash +pip install -r requirements.txt +``` + +### 1.2 Setup Environment Variables + +```bash +export ES_CONNECTION_STRING=http://localhost:9200 +export INDEX_NAME=${your_index_name} +``` + +### 1.3 Start Elasticsearch + +Please refer to this [readme](../../../vectorstores/elasticsearch/README.md). + +### 1.4 Start Document Preparation Microservice for Elasticsearch with Python Script + +Start document preparation microservice for Elasticsearch with below command. + +```bash +python prepare_doc_elastic.py +``` + +## ๐Ÿš€2. Start Microservice with Docker (Option 2) + +### 2.1 Start Elasticsearch + +Please refer to this [readme](../../../vectorstores/elasticsearch/README.md). + +### 2.2 Setup Environment Variables + +```bash +export ES_CONNECTION_STRING=http://localhost:9200 +export INDEX_NAME=${your_index_name} +``` + +### 2.3 Build Docker Image + +```bash +cd GenAIComps +docker build -t opea/dataprep-elasticsearch:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/dataprep/elasticsearch/langchain/Dockerfile . +``` + +### 2.4 Run Docker with CLI (Option A) + +```bash +docker run --name="dataprep-elasticsearch" -p 6011:6011 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/dataprep-elastic:latest +``` + +### 2.5 Run with Docker Compose (Option B) + +```bash +cd comps/dataprep/elasticsearch/langchain +docker compose -f docker-compose-dataprep-elastic.yaml up -d +``` + +## ๐Ÿš€3. Consume Microservice + +### 3.1 Consume Upload API + +Once document preparation microservice for Elasticsearch is started, user can use below command to invoke the +microservice to convert the document to embedding and save to the database. + +```bash +curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"path":"/path/to/document"}' \ + http://localhost:6011/v1/dataprep +``` + +### 3.2 Consume get_file API + +To get uploaded file structures, use the following command: + +```bash +curl -X POST \ + -H "Content-Type: application/json" \ + http://localhost:6011/v1/dataprep/get_file +``` + +Then you will get the response JSON like this: + +```json +[ + { + "name": "uploaded_file_1.txt", + "id": "uploaded_file_1.txt", + "type": "File", + "parent": "" + }, + { + "name": "uploaded_file_2.txt", + "id": "uploaded_file_2.txt", + "type": "File", + "parent": "" + } +] +``` + +### 4.3 Consume delete_file API + +To delete uploaded file/link, use the following command. + +The `file_path` here should be the `id` get from `/v1/dataprep/get_file` API. + +```bash +# delete link +curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"file_path": "https://www.ces.tech/.txt"}' \ + http://localhost:6011/v1/dataprep/delete_file + +# delete file +curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"file_path": "uploaded_file_1.txt"}' \ + http://localhost:6011/v1/dataprep/delete_file + +# delete all files and links +curl -X POST \ + -H "Content-Type: application/json" \ + -d '{"file_path": "all"}' \ + http://localhost:6011/v1/dataprep/delete_file +``` diff --git a/comps/dataprep/elasticsearch/langchain/__init__.py b/comps/dataprep/elasticsearch/langchain/__init__.py new file mode 100644 index 000000000..916f3a44b --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/dataprep/elasticsearch/langchain/config.py b/comps/dataprep/elasticsearch/langchain/config.py new file mode 100644 index 000000000..3167e480d --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/config.py @@ -0,0 +1,23 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os + +ES_CONNECTION_STRING = os.getenv("ES_CONNECTION_STRING", "http://localhost:9200") +UPLOADED_FILES_PATH = os.getenv("UPLOADED_FILES_PATH", "./uploaded_files/") + +# Embedding model +EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5") + +# TEI Embedding endpoints +TEI_ENDPOINT = os.getenv("TEI_ENDPOINT", "") + +# Vector Index Configuration +INDEX_NAME = os.getenv("INDEX_NAME", "rag-elastic") + +# chunk parameters +CHUNK_SIZE = os.getenv("CHUNK_SIZE", 1500) +CHUNK_OVERLAP = os.getenv("CHUNK_OVERLAP", 100) + +# Logging enabled +LOG_FLAG = os.getenv("LOGFLAG", False) diff --git a/comps/dataprep/elasticsearch/langchain/docker-compose.yaml b/comps/dataprep/elasticsearch/langchain/docker-compose.yaml new file mode 100644 index 000000000..01d818eac --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/docker-compose.yaml @@ -0,0 +1,41 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +version: "3" +services: + elasticsearch-vector-db: + hostname: db + container_name: elasticsearch-vector-db + image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + ports: + - "9200:9200" + - "9300:9300" + restart: always + ipc: host + environment: + - ES_JAVA_OPTS=-Xms1g -Xmx1g + - discovery.type=single-node + - xpack.security.enabled=false + - bootstrap.memory_lock=false + - no_proxy= ${no_proxy} + - http_proxy= ${http_proxy} + - https_proxy= ${https_proxy} + + dataprep-elasticsearch: + image: opea/dataprep-elasticsearch:latest + container_name: dataprep-elasticsearch + ports: + - "6011:6011" + ipc: host + environment: + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + ES_CONNECTION_STRING: ${ES_CONNECTION_STRING} + INDEX_NAME: ${INDEX_NAME} + TEI_ENDPOINT: ${TEI_ENDPOINT} + HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN} + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/comps/dataprep/elasticsearch/langchain/prepare_doc_elasticsearch.py b/comps/dataprep/elasticsearch/langchain/prepare_doc_elasticsearch.py new file mode 100644 index 000000000..115207de5 --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/prepare_doc_elasticsearch.py @@ -0,0 +1,373 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import json +import os +from pathlib import Path +from typing import List, Optional, Union + +from config import ( + CHUNK_OVERLAP, + CHUNK_SIZE, + EMBED_MODEL, + ES_CONNECTION_STRING, + INDEX_NAME, + LOG_FLAG, + TEI_ENDPOINT, + UPLOADED_FILES_PATH, +) +from elasticsearch import Elasticsearch +from fastapi import Body, File, Form, HTTPException, UploadFile +from langchain.text_splitter import HTMLHeaderTextSplitter, RecursiveCharacterTextSplitter +from langchain_community.embeddings import HuggingFaceBgeEmbeddings +from langchain_core.documents import Document +from langchain_elasticsearch import ElasticsearchStore +from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings + +from comps import CustomLogger, DocPath, opea_microservices, register_microservice +from comps.dataprep.utils import ( + create_upload_folder, + document_loader, + encode_filename, + get_file_structure, + get_separators, + get_tables_result, + parse_html, + remove_folder_with_ignore, + save_content_to_local_disk, +) + +logger = CustomLogger(__name__) + + +def create_index() -> None: + if not es_client.indices.exists(index=INDEX_NAME): + es_client.indices.create(index=INDEX_NAME) + + +def get_embedder() -> Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]: + """Obtain required Embedder.""" + + if TEI_ENDPOINT: + return HuggingFaceEndpointEmbeddings(model=TEI_ENDPOINT) + else: + return HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) + + +def get_elastic_store(embedder: Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]) -> ElasticsearchStore: + """Get Elasticsearch vector store.""" + + return ElasticsearchStore(index_name=INDEX_NAME, embedding=embedder, es_connection=es_client) + + +def delete_embeddings(doc_name: str) -> bool: + """Delete documents from Elasticsearch.""" + + try: + if doc_name == "all": + if LOG_FLAG: + logger.info("Deleting all documents from vectorstore") + + query = {"query": {"match_all": {}}} + else: + if LOG_FLAG: + logger.info(f"Deleting {doc_name} from vectorstore") + + query = {"query": {"match": {"metadata.doc_name": {"query": doc_name, "operator": "AND"}}}} + + es_client.delete_by_query(index=INDEX_NAME, body=query) + + return True + + except Exception as e: + if LOG_FLAG: + logger.info(f"An unexpected error occurred: {e}") + + return False + + +def search_by_filename(file_name: str) -> bool: + """Search Elasticsearch by file name.""" + + query = {"query": {"match": {"metadata.doc_name": {"query": file_name, "operator": "AND"}}}} + results = es_client.search(index=INDEX_NAME, body=query) + + if LOG_FLAG: + logger.info(f"[ search by file ] searched by {file_name}") + logger.info(f"[ search by file ] {len(results['hits'])} results: {results}") + + return results["hits"]["total"]["value"] > 0 + + +def ingest_doc_to_elastic(doc_path: DocPath) -> None: + """Ingest documents to Elasticsearch.""" + + path = doc_path.path + file_name = path.split("/")[-1] + if LOG_FLAG: + logger.info(f"Parsing document {path}, file name: {file_name}.") + + if path.endswith(".html"): + headers_to_split_on = [ + ("h1", "Header 1"), + ("h2", "Header 2"), + ("h3", "Header 3"), + ] + text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on) + else: + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=doc_path.chunk_size, + chunk_overlap=doc_path.chunk_overlap, + add_start_index=True, + separators=get_separators(), + ) + + content = document_loader(path) + + structured_types = [".xlsx", ".csv", ".json", "jsonl"] + _, ext = os.path.splitext(path) + + if ext in structured_types: + chunks = content + else: + chunks = text_splitter.split_text(content) + + if doc_path.process_table and path.endswith(".pdf"): + table_chunks = get_tables_result(path, doc_path.table_strategy) + chunks = chunks + table_chunks + + if LOG_FLAG: + logger.info(f"Done preprocessing. Created {len(chunks)} chunks of the original file.") + + batch_size = 32 + num_chunks = len(chunks) + + metadata = dict({"doc_name": str(file_name)}) + + for i in range(0, num_chunks, batch_size): + batch_chunks = chunks[i : i + batch_size] + batch_texts = batch_chunks + + documents = [Document(page_content=text, metadata=metadata) for text in batch_texts] + _ = es_store.add_documents(documents=documents) + if LOG_FLAG: + logger.info(f"Processed batch {i // batch_size + 1}/{(num_chunks - 1) // batch_size + 1}") + + +async def ingest_link_to_elastic(link_list: List[str]) -> None: + """Ingest data scraped from website links into Elasticsearch.""" + + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=CHUNK_SIZE, + chunk_overlap=CHUNK_OVERLAP, + add_start_index=True, + separators=get_separators(), + ) + + batch_size = 32 + + for link in link_list: + content = parse_html([link])[0][0] + if LOG_FLAG: + logger.info(f"[ ingest link ] link: {link} content: {content}") + + encoded_link = encode_filename(link) + save_path = UPLOADED_FILES_PATH + encoded_link + ".txt" + doc_path = UPLOADED_FILES_PATH + link + ".txt" + if LOG_FLAG: + logger.info(f"[ ingest link ] save_path: {save_path}") + + await save_content_to_local_disk(save_path, content) + + chunks = text_splitter.split_text(content) + + num_chunks = len(chunks) + metadata = [dict({"doc_name": str(doc_path)})] + + for i in range(0, num_chunks, batch_size): + batch_chunks = chunks[i : i + batch_size] + batch_texts = batch_chunks + + documents = [Document(page_content=text, metadata=metadata) for text in batch_texts] + _ = es_store.add_documents(documents=documents) + + if LOG_FLAG: + logger.info(f"Processed batch {i // batch_size + 1}/{(num_chunks - 1) // batch_size + 1}") + + +@register_microservice(name="opea_service@prepare_doc_elastic", endpoint="/v1/dataprep", host="0.0.0.0", port=6011) +async def ingest_documents( + files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), + link_list: Optional[str] = Form(None), + chunk_size: int = Form(1500), + chunk_overlap: int = Form(100), + process_table: bool = Form(False), + table_strategy: str = Form("fast"), +): + """Ingest documents for RAG.""" + + if LOG_FLAG: + logger.info(f"files:{files}") + logger.info(f"link_list:{link_list}") + + if files and link_list: + raise HTTPException(status_code=400, detail="Provide either a file or a string list, not both.") + + if files: + if not isinstance(files, list): + files = [files] + + if not os.path.exists(UPLOADED_FILES_PATH): + Path(UPLOADED_FILES_PATH).mkdir(parents=True, exist_ok=True) + + for file in files: + encode_file = encode_filename(file.filename) + save_path = UPLOADED_FILES_PATH + encode_file + filename = save_path.split("/")[-1] + + try: + exists = search_by_filename(filename) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Failed when searching in Elasticsearch for file {file.filename}.", + ) + + if exists: + if LOG_FLAG: + logger.info(f"[ upload ] File {file.filename} already exists.") + + raise HTTPException( + status_code=400, + detail=f"Uploaded file {file.filename} already exists. Please change file name.", + ) + + await save_content_to_local_disk(save_path, file) + + ingest_doc_to_elastic( + DocPath( + path=save_path, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + process_table=process_table, + table_strategy=table_strategy, + ) + ) + if LOG_FLAG: + logger.info(f"Successfully saved file {save_path}") + + result = {"status": 200, "message": "Data preparation succeeded"} + + if LOG_FLAG: + logger.info(result) + return result + + if link_list: + try: + link_list = json.loads(link_list) # Parse JSON string to list + if not isinstance(link_list, list): + raise HTTPException(status_code=400, detail="link_list should be a list.") + + await ingest_link_to_elastic(link_list) + + if LOG_FLAG: + logger.info(f"Successfully saved link list {link_list}") + + result = {"status": 200, "message": "Data preparation succeeded"} + + if LOG_FLAG: + logger.info(result) + return result + + except json.JSONDecodeError: + raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") + + raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") + + +@register_microservice( + name="opea_service@prepare_doc_elastic", + endpoint="/v1/dataprep/get_file", + host="0.0.0.0", + port=6011, +) +async def rag_get_file_structure(): + """Obtain uploaded file list.""" + + if LOG_FLAG: + logger.info("[ dataprep - get file ] start to get file structure") + + if not Path(UPLOADED_FILES_PATH).exists(): + if LOG_FLAG: + logger.info("No file uploaded, return empty list.") + return [] + + file_content = get_file_structure(UPLOADED_FILES_PATH) + + if LOG_FLAG: + logger.info(file_content) + + return file_content + + +@register_microservice( + name="opea_service@prepare_doc_elastic", + endpoint="/v1/dataprep/delete_file", + host="0.0.0.0", + port=6011, +) +async def delete_single_file(file_path: str = Body(..., embed=True)): + """Delete file according to `file_path`. + + `file_path`: + - specific file path (e.g. /path/to/file.txt) + - folder path (e.g. /path/to/folder) + - "all": delete all files uploaded + """ + if file_path == "all": + if LOG_FLAG: + logger.info("[dataprep - del] delete all files") + remove_folder_with_ignore(UPLOADED_FILES_PATH) + assert delete_embeddings(file_path) + if LOG_FLAG: + logger.info("[dataprep - del] successfully delete all files.") + create_upload_folder(UPLOADED_FILES_PATH) + if LOG_FLAG: + logger.info({"status": True}) + return {"status": True} + + delete_path = Path(UPLOADED_FILES_PATH + "/" + encode_filename(file_path)) + + if LOG_FLAG: + logger.info(f"[dataprep - del] delete_path: {delete_path}") + + if delete_path.exists(): + # delete file + if delete_path.is_file(): + try: + assert delete_embeddings(file_path) + delete_path.unlink() + except Exception as e: + if LOG_FLAG: + logger.info(f"[dataprep - del] fail to delete file {delete_path}: {e}") + logger.info({"status": False}) + return {"status": False} + # delete folder + else: + if LOG_FLAG: + logger.info("[dataprep - del] delete folder is not supported for now.") + logger.info({"status": False}) + return {"status": False} + if LOG_FLAG: + logger.info({"status": True}) + return {"status": True} + else: + raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.") + + +if __name__ == "__main__": + es_client = Elasticsearch(hosts=ES_CONNECTION_STRING) + es_store = get_elastic_store(get_embedder()) + create_upload_folder(UPLOADED_FILES_PATH) + create_index() + opea_microservices["opea_service@prepare_doc_elastic"].start() diff --git a/comps/dataprep/elasticsearch/langchain/requirements.txt b/comps/dataprep/elasticsearch/langchain/requirements.txt new file mode 100644 index 000000000..95bfc46be --- /dev/null +++ b/comps/dataprep/elasticsearch/langchain/requirements.txt @@ -0,0 +1,30 @@ +beautifulsoup4 +cairosvg +docarray[full] +docx2txt +easyocr +elasticsearch +fastapi +huggingface_hub +langchain +langchain-community +langchain-elasticsearch +langchain-huggingface +langchain-text-splitters +markdown +numpy +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +pandas +Pillow +prometheus-fastapi-instrumentator +pymupdf +pytesseract +python-bidi +python-docx +python-pptx +sentence_transformers +shortuuid +unstructured[all-docs] +uvicorn diff --git a/comps/retrievers/elasticsearch/langchain/Dockerfile b/comps/retrievers/elasticsearch/langchain/Dockerfile new file mode 100644 index 000000000..6c7bb903f --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/Dockerfile @@ -0,0 +1,28 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +FROM python:3.11-slim + +ARG ARCH="cpu" + +RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \ + libgl1-mesa-glx \ + libjemalloc-dev + +RUN useradd -m -s /bin/bash user && \ + mkdir -p /home/user && \ + chown -R user /home/user/ + +COPY comps /home/user/comps + +USER user + +RUN pip install --no-cache-dir --upgrade pip setuptools && \ + if [ ${ARCH} = "cpu" ]; then pip install --no-cache-dir torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \ + pip install --no-cache-dir -r /home/user/comps/retrievers/elasticsearch/langchain/requirements.txt + +ENV PYTHONPATH=$PYTHONPATH:/home/user + +WORKDIR /home/user/comps/retrievers/elasticsearch/langchain + +ENTRYPOINT ["python", "retriever_elasticsearch.py"] diff --git a/comps/retrievers/elasticsearch/langchain/README.md b/comps/retrievers/elasticsearch/langchain/README.md new file mode 100644 index 000000000..455f8c783 --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/README.md @@ -0,0 +1,122 @@ +# Retriever Microservice + +This retriever microservice is a highly efficient search service designed for handling and retrieving embedding vectors. +It operates by receiving an embedding vector as input and conducting a similarity search against vectors stored in a +VectorDB database. Users must specify the VectorDB's URL and the index name, and the service searches within that index +to find documents with the highest similarity to the input vector. + +The service primarily utilizes similarity measures in vector space to rapidly retrieve contentually similar documents. +The vector-based retrieval approach is particularly suited for handling large datasets, offering fast and accurate +search results that significantly enhance the efficiency and quality of information retrieval. + +Overall, this microservice provides robust backend support for applications requiring efficient similarity searches, +playing a vital role in scenarios such as recommendation systems, information retrieval, or any other context where +precise measurement of document similarity is crucial. + +## ๐Ÿš€1. Start Microservice with Python (Option 1) + +To start the retriever microservice, you must first install the required python packages. + +### 1.1 Install Requirements + +```bash +pip install -r requirements.txt +``` + +### 1.2 Start TEI Service + +```bash +model=BAAI/bge-base-en-v1.5 +volume=$PWD/data +docker run -d -p 6060:80 -v $volume:/data -e http_proxy=$http_proxy -e https_proxy=$https_proxy --pull always ghcr.io/huggingface/text-embeddings-inference:cpu-1.5 --model-id $model +``` + +### 1.3 Verify the TEI Service + +Health check the embedding service with: + +```bash +curl 127.0.0.1:6060/embed \ + -X POST \ + -d '{"inputs":"What is Deep Learning?"}' \ + -H 'Content-Type: application/json' +``` + +### 1.4 Setup VectorDB Service + +You need to setup your own VectorDB service (Elasticsearch in this example), and ingest your knowledge documents into +the vector database. + +As for Elasticsearch, you could start a docker container using the following commands. +Remember to ingest data into it manually. + +```bash +docker run -d --name vectorstore-elasticsearch -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" -p 9200:9200 -p 9300:9300 docker.elastic.co/elasticsearch/elasticsearch:8.16.0 +``` + +### 1.5 Start Retriever Service + +```bash +export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060" +python retriever_elasticsearch.py +``` + +## ๐Ÿš€2. Start Microservice with Docker (Option 2) + +### 2.1 Setup Environment Variables + +```bash +export EMBED_MODEL="BAAI/bge-base-en-v1.5" +export ES_CONNECTION_STRING="http://localhost:9200" +export INDEX_NAME=${your_index_name} +export TEI_EMBEDDING_ENDPOINT="http://${your_ip}:6060" +``` + +### 2.2 Build Docker Image + +```bash +cd comps/retrievers/elasticsearch/langchain +docker build -t opea/retriever-elasticsearch:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/retrievers/elasticsearch/langchain/Dockerfile . +``` + +To start a docker container, you have two options: + +- A. Run Docker with CLI +- B. Run Docker with Docker Compose + +You can choose one as needed. + +### 2.3 Run Docker with CLI (Option A) + +```bash +docker run -d --name="retriever-elasticsearch" -p 7000:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/retriever-elasticsearch:latest +``` + +### 2.4 Run Docker with Docker Compose (Option B) + +```bash +cd comps/retrievers/elasticsearch/langchain +docker compose -f docker_compose_retriever.yaml up -d +``` + +## ๐Ÿš€3. Consume Retriever Service + +### 3.1 Check Service Status + +```bash +curl http://localhost:7000/v1/health_check \ + -X GET \ + -H 'Content-Type: application/json' +``` + +### 3.2 Consume Embedding Service + +To consume the Retriever Microservice, you can generate a mock embedding vector of length 768 with Python. + +```bash +export your_embedding=$(python -c "import random; embedding = [random.uniform(-1, 1) for _ in range(768)]; print(embedding)") +curl http://${your_ip}:7000/v1/retrieval \ + -X POST \ + -d "{\"text\":\"What is the revenue of Nike in 2023?\",\"embedding\":${your_embedding}}" \ + -H 'Content-Type: application/json' +``` diff --git a/comps/retrievers/elasticsearch/langchain/config.py b/comps/retrievers/elasticsearch/langchain/config.py new file mode 100644 index 000000000..a6e44747b --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/config.py @@ -0,0 +1,18 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os + +# Embedding model +EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5") + +ES_CONNECTION_STRING = os.getenv("ES_CONNECTION_STRING", "http://localhost:9200") + +# TEI Embedding endpoints +TEI_ENDPOINT = os.getenv("TEI_ENDPOINT", "") + +# Vector Index Configuration +INDEX_NAME = os.getenv("INDEX_NAME", "rag-elastic") + +# Logging enabled +LOG_FLAG = os.getenv("LOGFLAG", False) diff --git a/comps/retrievers/elasticsearch/langchain/docker_compose_retriever.yaml b/comps/retrievers/elasticsearch/langchain/docker_compose_retriever.yaml new file mode 100644 index 000000000..7ca8d36fa --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/docker_compose_retriever.yaml @@ -0,0 +1,33 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +version: "3.8" + +services: + tei_xeon_service: + image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.2 + container_name: tei-xeon-server + ports: + - "6060:80" + volumes: + - "./data:/data" + shm_size: 1g + command: --model-id ${RETRIEVE_MODEL_ID} + retriever: + image: opea/retriever-elasticsearch:latest + container_name: retriever-elasticsearch + ports: + - "7000:7000" + ipc: host + environment: + http_proxy: ${http_proxy} + https_proxy: ${https_proxy} + INDEX_NAME: ${INDEX_NAME} + TEI_ENDPOINT: ${TEI_ENDPOINT} + ES_CONNECTION_STRING: ${ES_CONNECTION_STRING} + HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN} + restart: unless-stopped + +networks: + default: + driver: bridge diff --git a/comps/retrievers/elasticsearch/langchain/requirements.txt b/comps/retrievers/elasticsearch/langchain/requirements.txt new file mode 100644 index 000000000..9ce5afcb6 --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/requirements.txt @@ -0,0 +1,14 @@ +docarray[full] +easyocr +fastapi +langchain-community +langchain-elasticsearch +langchain-huggingface +opentelemetry-api +opentelemetry-exporter-otlp +opentelemetry-sdk +prometheus-fastapi-instrumentator==7.0.0 +pymupdf +sentence_transformers +shortuuid +uvicorn diff --git a/comps/retrievers/elasticsearch/langchain/retriever_elasticsearch.py b/comps/retrievers/elasticsearch/langchain/retriever_elasticsearch.py new file mode 100644 index 000000000..9f26d4d31 --- /dev/null +++ b/comps/retrievers/elasticsearch/langchain/retriever_elasticsearch.py @@ -0,0 +1,105 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import time +from typing import Union + +from config import EMBED_MODEL, ES_CONNECTION_STRING, INDEX_NAME, LOG_FLAG, TEI_ENDPOINT +from elasticsearch import Elasticsearch +from langchain_community.embeddings import HuggingFaceBgeEmbeddings +from langchain_elasticsearch import ElasticsearchStore +from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings + +from comps import ( + CustomLogger, + EmbedDoc, + SearchedDoc, + ServiceType, + TextDoc, + opea_microservices, + register_microservice, + register_statistics, + statistics_dict, +) + +logger = CustomLogger(__name__) + + +def create_index() -> None: + if not es_client.indices.exists(index=INDEX_NAME): + es_client.indices.create(index=INDEX_NAME) + + +def get_embedder() -> Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]: + """Obtain required Embedder.""" + + if TEI_ENDPOINT: + return HuggingFaceEndpointEmbeddings(model=TEI_ENDPOINT) + else: + return HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) + + +def get_elastic_store(embedder: Union[HuggingFaceEndpointEmbeddings, HuggingFaceBgeEmbeddings]) -> ElasticsearchStore: + """Get Elasticsearch vector store.""" + + return ElasticsearchStore(index_name=INDEX_NAME, embedding=embedder, es_connection=es_client) + + +@register_microservice( + name="opea_service@retriever_elasticsearch", + service_type=ServiceType.RETRIEVER, + endpoint="/v1/retrieval", + host="0.0.0.0", + port=7000, +) +@register_statistics(names=["opea_service@retriever_elasticsearch"]) +async def retrieve(input: EmbedDoc) -> SearchedDoc: + """Retrieve documents based on similarity search type.""" + if LOG_FLAG: + logger.info(input) + start = time.time() + + if input.search_type == "similarity": + docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores( + embedding=input.embedding, k=input.k + ) + search_res = [doc for doc, _ in docs_and_similarities] + + elif input.search_type == "similarity_distance_threshold": + if input.distance_threshold is None: + raise ValueError("distance_threshold must be provided for " + "similarity_distance_threshold retriever") + docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores( + embedding=input.embedding, k=input.k + ) + search_res = [doc for doc, similarity in docs_and_similarities if similarity > input.distance_threshold] + + elif input.search_type == "similarity_score_threshold": + docs_and_similarities = vector_db.similarity_search_by_vector_with_relevance_scores(query=input.text, k=input.k) + search_res = [doc for doc, similarity in docs_and_similarities if similarity > input.score_threshold] + + elif input.search_type == "mmr": + search_res = vector_db.max_marginal_relevance_search( + query=input.text, k=input.k, fetch_k=input.fetch_k, lambda_mult=input.lambda_mult + ) + + else: + raise ValueError(f"{input.search_type} not valid") + + searched_docs = [] + for r in search_res: + searched_docs.append(TextDoc(text=r.page_content)) + result = SearchedDoc(retrieved_docs=searched_docs, initial_query=input.text) + + statistics_dict["opea_service@retriever_elasticsearch"].append_latency(time.time() - start, None) + + if LOG_FLAG: + logger.info(result) + + return result + + +if __name__ == "__main__": + es_client = Elasticsearch(hosts=ES_CONNECTION_STRING) + vector_db = get_elastic_store(get_embedder()) + create_index() + opea_microservices["opea_service@retriever_elasticsearch"].start() diff --git a/comps/vectorstores/elasticsearch/README.md b/comps/vectorstores/elasticsearch/README.md new file mode 100644 index 000000000..1104f9f4a --- /dev/null +++ b/comps/vectorstores/elasticsearch/README.md @@ -0,0 +1,13 @@ +# Start Elasticsearch server + +## 1. Download Elasticsearch image + +```bash +docker pull docker.elastic.co/elasticsearch/elasticsearch:8.16.0 +``` + +## 2. Run Elasticsearch service + +```bash +docker run -p 9200:9200 -p 9300:9300 -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" \ docker.elastic.co/elasticsearch/elasticsearch:8.16.0 +``` diff --git a/comps/vectorstores/elasticsearch/__init__.py b/comps/vectorstores/elasticsearch/__init__.py new file mode 100644 index 000000000..916f3a44b --- /dev/null +++ b/comps/vectorstores/elasticsearch/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/vectorstores/elasticsearch/docker-compose.yml b/comps/vectorstores/elasticsearch/docker-compose.yml new file mode 100644 index 000000000..2ba8e04f2 --- /dev/null +++ b/comps/vectorstores/elasticsearch/docker-compose.yml @@ -0,0 +1,16 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +version: "3" +services: + elasticsearch-vector-db: + image: docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + container_name: elasticsearch-vector-db + ports: + - "9200:9200" + - "9300:9300" + environment: + - ES_JAVA_OPTS=-Xms1g -Xmx1g + - discovery.type=single-node + - xpack.security.enabled=false + - bootstrap.memory_lock=false diff --git a/tests/dataprep/test_dataprep_elasticsearch_langchain.sh b/tests/dataprep/test_dataprep_elasticsearch_langchain.sh new file mode 100644 index 000000000..a68bbac9f --- /dev/null +++ b/tests/dataprep/test_dataprep_elasticsearch_langchain.sh @@ -0,0 +1,137 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +ip_address=$(hostname -I | awk '{print $1}') +dataprep_service_port=6011 + +function build_docker_images() { + cd $WORKPATH + echo $WORKPATH + # piull elasticsearch image + docker pull docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + + # build dataprep image for elasticsearch + docker build --no-cache -t opea/dataprep-elasticsearch:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f $WORKPATH/comps/dataprep/elasticsearch/langchain/Dockerfile . + if [ $? -ne 0 ]; then + echo "opea/dataprep-elasticsearch built fail" + exit 1 + else + echo "opea/dataprep-elasticsearch built successful" + fi +} + +function start_service() { + # elasticsearch + elasticsearch_port=9200 + docker run -d --name test-comps-vectorstore-elasticsearch -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" -p $elasticsearch_port:9200 -p 9300:9300 docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + export ES_CONNECTION_STRING="http://${ip_address}:${elasticsearch_port}" + sleep 10s + + # data-prep + INDEX_NAME="test-elasticsearch" + docker run -d --name="test-comps-dataprep-elasticsearch" -p $dataprep_service_port:6011 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME opea/dataprep-elasticsearch:comps + sleep 15s + + bash ./tests/utils/wait-for-it.sh $ip_address:$dataprep_service_port -s -t 100 -- echo "Dataprep service up" + DATAPREP_UP=$? + if [ ${DATAPREP_UP} -ne 0 ]; then + echo "Could not start Dataprep service." + return 1 + fi + + sleep 5s + bash ./tests/utils/wait-for-it.sh ${ip_address}:$dataprep_service_port -s -t 1 -- echo "Dataprep service still up" + DATAPREP_UP=$? + if [ ${DATAPREP_UP} -ne 0 ]; then + echo "Dataprep service crashed." + return 1 + fi +} + +function validate_microservice() { + cd $LOG_PATH + + # test /v1/dataprep + URL="http://${ip_address}:$dataprep_service_port/v1/dataprep" + echo "Deep learning is a subset of machine learning that utilizes neural networks with multiple layers to analyze various levels of abstract data representations. It enables computers to identify patterns and make decisions with minimal human intervention by learning from large amounts of data." > $LOG_PATH/dataprep_file.txt + + HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -F 'files=@./dataprep_file.txt' -H 'Content-Type: multipart/form-data' "$URL") + if [ "$HTTP_STATUS" -eq 200 ]; then + echo "[ dataprep ] HTTP status is 200. Checking content..." + cp ./dataprep_file.txt ./dataprep_file2.txt + local CONTENT=$(curl -s -X POST -F 'files=@./dataprep_file2.txt' -H 'Content-Type: multipart/form-data' "$URL" | tee ${LOG_PATH}/dataprep.log) + + if echo "$CONTENT" | grep -q "Data preparation succeeded"; then + echo "[ dataprep ] Content is as expected." + else + echo "[ dataprep ] Content does not match the expected result: $CONTENT" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep.log + exit 1 + fi + else + echo "[ dataprep ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep.log + exit 1 + fi + + # test /v1/dataprep/get_file + URL="http://${ip_address}:$dataprep_service_port/v1/dataprep/get_file" + HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -H 'Content-Type: application/json' "$URL") + if [ "$HTTP_STATUS" -eq 200 ]; then + echo "[ dataprep - file ] HTTP status is 200. Checking content..." + local CONTENT=$(curl -s -X POST -H 'Content-Type: application/json' "$URL" | tee ${LOG_PATH}/dataprep_file.log) + + if echo "$CONTENT" | grep -q '{"name":'; then + echo "[ dataprep - file ] Content is as expected." + else + echo "[ dataprep - file ] Content does not match the expected result: $CONTENT" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep_file.log + exit 1 + fi + else + echo "[ dataprep - file ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep_file.log + exit 1 + fi + + # test /v1/dataprep/delete_file + URL="http://${ip_address}:$dataprep_service_port/v1/dataprep/delete_file" + HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X POST -d '{"file_path": "dataprep_file.txt"}' -H 'Content-Type: application/json' "$URL") + if [ "$HTTP_STATUS" -eq 200 ]; then + echo "[ dataprep - del ] HTTP status is 200." + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep_del.log + else + echo "[ dataprep - del ] HTTP status is not 200. Received status was $HTTP_STATUS" + docker logs test-comps-dataprep-elasticsearch >> ${LOG_PATH}/dataprep_del.log + exit 1 + fi +} + +function stop_docker() { + cid=$(docker ps -aq --filter "name=test-comps-vectorstore-elasticsearch*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi + + cid=$(docker ps -aq --filter "name=test-comps-dataprep-elasticsearch*") + if [[ ! -z "$cid" ]]; then docker stop $cid && docker rm $cid && sleep 1s; fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo y | docker system prune + +} + +main diff --git a/tests/retrievers/test_retrievers_elasticsearch_langchain.sh b/tests/retrievers/test_retrievers_elasticsearch_langchain.sh new file mode 100644 index 000000000..53db592b5 --- /dev/null +++ b/tests/retrievers/test_retrievers_elasticsearch_langchain.sh @@ -0,0 +1,93 @@ +#!/bin/bash +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +set -x + +WORKPATH=$(dirname "$PWD") +LOG_PATH="$WORKPATH/tests" +ip_address=$(hostname -I | awk '{print $1}') + +function build_docker_images() { + cd $WORKPATH + docker build --no-cache -t opea/retriever-elasticsearch:comps --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/retrievers/elasticsearch/langchain/Dockerfile . + if [ $? -ne 0 ]; then + echo "opea/retriever-elasticsearch built fail" + exit 1 + else + echo "opea/retriever-elasticsearch built successful" + fi +} + +function start_service() { + # elasticsearch + elasticsearch_port=9200 + docker run -d --name "test-comps-retriever-elasticsearch-vectorstore" -e ES_JAVA_OPTS="-Xms1g -Xmx1g" -e "discovery.type=single-node" -e "xpack.security.enabled=false" -p $elasticsearch_port:9200 -p 9300:9300 docker.elastic.co/elasticsearch/elasticsearch:8.16.0 + export ES_CONNECTION_STRING="http://${ip_address}:${elasticsearch_port}" + sleep 10s + + # elasticsearch retriever + INDEX_NAME="test-elasticsearch" + retriever_port=7000 + docker run -d --name="test-comps-retriever-elasticsearch-ms" -p $retriever_port:7000 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e ES_CONNECTION_STRING=$ES_CONNECTION_STRING -e INDEX_NAME=$INDEX_NAME opea/retriever-elasticsearch:comps + sleep 15s + + bash ./tests/utils/wait-for-it.sh ${ip_address}:$retriever_port -s -t 100 -- echo "Retriever up" + RETRIEVER_UP=$? + if [ ${RETRIEVER_UP} -ne 0 ]; then + echo "Could not start Retriever." + return 1 + fi + + sleep 5s + bash ./tests/utils/wait-for-it.sh ${ip_address}:$retriever_port -s -t 1 -- echo "Retriever still up" + RETRIEVER_UP=$? + if [ ${RETRIEVER_UP} -ne 0 ]; then + echo "Retriever crashed." + return 1 + fi +} + +function validate_microservice() { + retriever_port=7000 + test_embedding=$(python3 -c "import random; embedding = [random.uniform(-1, 1) for _ in range(768)]; print(embedding)") + + + result=$(http_proxy='' + curl http://${ip_address}:$retriever_port/v1/retrieval \ + -X POST \ + -d "{\"text\":\"test\",\"embedding\":${test_embedding}}" \ + -H 'Content-Type: application/json') + if [[ $result == *"retrieved_docs"* ]]; then + echo "Result correct." + else + echo "Result wrong. Received was $result" + docker logs test-comps-retriever-elasticsearch-vectorstore >> ${LOG_PATH}/vectorstore.log + docker logs test-comps-retriever-elasticsearch-tei-endpoint >> ${LOG_PATH}/tei-endpoint.log + docker logs test-comps-retriever-elasticsearch-ms >> ${LOG_PATH}/retriever-elasticsearch.log + exit 1 + fi +} + +function stop_docker() { + cid_retrievers=$(docker ps -aq --filter "name=test-comps-retriever-elasticsearch*") + if [[ ! -z "$cid_retrievers" ]]; then + docker stop $cid_retrievers && docker rm $cid_retrievers && sleep 1s + fi +} + +function main() { + + stop_docker + + build_docker_images + start_service + + validate_microservice + + stop_docker + echo y | docker system prune + +} + +main From ddd372d3e40d6945abb1f89c5316f97252f1a5a4 Mon Sep 17 00:00:00 2001 From: "Wang, Kai Lawrence" <109344418+wangkl2@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:19:56 +0800 Subject: [PATCH 4/5] Remove enforce-eager to enable HPU graphs for better vLLM perf (#954) * remove enforce-eager to enable HPU graphs Signed-off-by: Wang, Kai Lawrence * Increase the llm max timeout in ci for fully warmup Signed-off-by: Wang, Kai Lawrence --------- Signed-off-by: Wang, Kai Lawrence --- .../faq-generation/vllm/langchain/docker_compose_llm.yaml | 2 +- .../llms/summarization/vllm/langchain/docker_compose_llm.yaml | 2 +- .../vllm/langchain/dependency/launch_vllm_service.sh | 2 +- .../text-generation/vllm/langchain/docker_compose_llm.yaml | 2 +- .../vllm/llama_index/dependency/launch_vllm_service.sh | 2 +- .../text-generation/vllm/llama_index/docker_compose_llm.yaml | 2 +- .../test_llms_text-generation_vllm_langchain_on_intel_hpu.sh | 4 ++-- .../test_llms_text-generation_vllm_llamaindex_on_intel_hpu.sh | 4 ++-- 8 files changed, 10 insertions(+), 10 deletions(-) diff --git a/comps/llms/faq-generation/vllm/langchain/docker_compose_llm.yaml b/comps/llms/faq-generation/vllm/langchain/docker_compose_llm.yaml index 8ed64dd97..8b26dd751 100644 --- a/comps/llms/faq-generation/vllm/langchain/docker_compose_llm.yaml +++ b/comps/llms/faq-generation/vllm/langchain/docker_compose_llm.yaml @@ -23,7 +23,7 @@ services: cap_add: - SYS_NICE ipc: host - command: --enforce-eager --model $LLM_MODEL_ID --tensor-parallel-size 1 --host 0.0.0.0 --port 80 + command: --model $LLM_MODEL_ID --tensor-parallel-size 1 --host 0.0.0.0 --port 80 llm: image: opea/llm-faqgen-vllm:latest container_name: llm-faqgen-server diff --git a/comps/llms/summarization/vllm/langchain/docker_compose_llm.yaml b/comps/llms/summarization/vllm/langchain/docker_compose_llm.yaml index b93fd8030..720e7056d 100644 --- a/comps/llms/summarization/vllm/langchain/docker_compose_llm.yaml +++ b/comps/llms/summarization/vllm/langchain/docker_compose_llm.yaml @@ -23,7 +23,7 @@ services: cap_add: - SYS_NICE ipc: host - command: --enforce-eager --model $LLM_MODEL_ID --tensor-parallel-size 1 --host 0.0.0.0 --port 80 + command: --model $LLM_MODEL_ID --tensor-parallel-size 1 --host 0.0.0.0 --port 80 llm: image: opea/llm-docsum-vllm:latest container_name: llm-docsum-vllm-server diff --git a/comps/llms/text-generation/vllm/langchain/dependency/launch_vllm_service.sh b/comps/llms/text-generation/vllm/langchain/dependency/launch_vllm_service.sh index 6f6a7d211..83ecd6753 100644 --- a/comps/llms/text-generation/vllm/langchain/dependency/launch_vllm_service.sh +++ b/comps/llms/text-generation/vllm/langchain/dependency/launch_vllm_service.sh @@ -38,7 +38,7 @@ volume=$PWD/data # Build the Docker run command based on hardware mode if [ "$hw_mode" = "hpu" ]; then - docker run -d --rm --runtime=habana --name="vllm-service" -p $port_number:80 -e HABANA_VISIBLE_DEVICES=all -e OMPI_MCA_btl_vader_single_copy_mechanism=none --cap-add=sys_nice --ipc=host -e HTTPS_PROXY=$https_proxy -e HTTP_PROXY=$https_proxy -e HF_TOKEN=${HF_TOKEN} opea/vllm-gaudi:latest --enforce-eager --model $model_name --tensor-parallel-size $parallel_number --host 0.0.0.0 --port 80 --block-size $block_size --max-num-seqs $max_num_seqs --max-seq_len-to-capture $max_seq_len_to_capture + docker run -d --rm --runtime=habana --name="vllm-service" -p $port_number:80 -e HABANA_VISIBLE_DEVICES=all -e OMPI_MCA_btl_vader_single_copy_mechanism=none --cap-add=sys_nice --ipc=host -e HTTPS_PROXY=$https_proxy -e HTTP_PROXY=$https_proxy -e HF_TOKEN=${HF_TOKEN} opea/vllm-gaudi:latest --model $model_name --tensor-parallel-size $parallel_number --host 0.0.0.0 --port 80 --block-size $block_size --max-num-seqs $max_num_seqs --max-seq_len-to-capture $max_seq_len_to_capture else docker run -d --rm --name="vllm-service" -p $port_number:80 --network=host -v $volume:/data -e HTTPS_PROXY=$https_proxy -e HTTP_PROXY=$https_proxy -e HF_TOKEN=${HF_TOKEN} -e VLLM_CPU_KVCACHE_SPACE=40 opea/vllm-cpu:latest --model $model_name --host 0.0.0.0 --port 80 fi diff --git a/comps/llms/text-generation/vllm/langchain/docker_compose_llm.yaml b/comps/llms/text-generation/vllm/langchain/docker_compose_llm.yaml index e817c9f35..077ceee8b 100644 --- a/comps/llms/text-generation/vllm/langchain/docker_compose_llm.yaml +++ b/comps/llms/text-generation/vllm/langchain/docker_compose_llm.yaml @@ -23,7 +23,7 @@ services: cap_add: - SYS_NICE ipc: host - command: --enforce-eager --model $LLM_MODEL --tensor-parallel-size 1 --host 0.0.0.0 --port 80 + command: --model $LLM_MODEL --tensor-parallel-size 1 --host 0.0.0.0 --port 80 llm: image: opea/llm-vllm:latest container_name: llm-vllm-gaudi-server diff --git a/comps/llms/text-generation/vllm/llama_index/dependency/launch_vllm_service.sh b/comps/llms/text-generation/vllm/llama_index/dependency/launch_vllm_service.sh index d3363aa40..c8b2790b4 100644 --- a/comps/llms/text-generation/vllm/llama_index/dependency/launch_vllm_service.sh +++ b/comps/llms/text-generation/vllm/llama_index/dependency/launch_vllm_service.sh @@ -38,7 +38,7 @@ volume=$PWD/data # Build the Docker run command based on hardware mode if [ "$hw_mode" = "hpu" ]; then - docker run -d --rm --runtime=habana --name="vllm-service" -p $port_number:80 -e HABANA_VISIBLE_DEVICES=all -e OMPI_MCA_btl_vader_single_copy_mechanism=none --cap-add=sys_nice --ipc=host -e HTTPS_PROXY=$https_proxy -e HTTP_PROXY=$https_proxy -e HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} opea/vllm-gaudi:latest --enforce-eager --model $model_name --tensor-parallel-size $parallel_number --host 0.0.0.0 --port 80 --block-size $block_size --max-num-seqs $max_num_seqs --max-seq_len-to-capture $max_seq_len_to_capture + docker run -d --rm --runtime=habana --name="vllm-service" -p $port_number:80 -e HABANA_VISIBLE_DEVICES=all -e OMPI_MCA_btl_vader_single_copy_mechanism=none --cap-add=sys_nice --ipc=host -e HTTPS_PROXY=$https_proxy -e HTTP_PROXY=$https_proxy -e HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} opea/vllm-gaudi:latest --model $model_name --tensor-parallel-size $parallel_number --host 0.0.0.0 --port 80 --block-size $block_size --max-num-seqs $max_num_seqs --max-seq_len-to-capture $max_seq_len_to_capture else docker run -d --rm --name="vllm-service" -p $port_number:80 --network=host -v $volume:/data -e HTTPS_PROXY=$https_proxy -e HTTP_PROXY=$https_proxy -e HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} -e VLLM_CPU_KVCACHE_SPACE=40 opea/vllm-cpu:latest --model $model_name --host 0.0.0.0 --port 80 fi diff --git a/comps/llms/text-generation/vllm/llama_index/docker_compose_llm.yaml b/comps/llms/text-generation/vllm/llama_index/docker_compose_llm.yaml index eeed7d19a..6bfc0d500 100644 --- a/comps/llms/text-generation/vllm/llama_index/docker_compose_llm.yaml +++ b/comps/llms/text-generation/vllm/llama_index/docker_compose_llm.yaml @@ -23,7 +23,7 @@ services: cap_add: - SYS_NICE ipc: host - command: --enforce-eager --model $LLM_MODEL --tensor-parallel-size 1 --host 0.0.0.0 --port 80 + command: --model $LLM_MODEL --tensor-parallel-size 1 --host 0.0.0.0 --port 80 llm: image: opea/llm-vllm-llamaindex:latest container_name: llm-vllm-gaudi-server diff --git a/tests/llms/test_llms_text-generation_vllm_langchain_on_intel_hpu.sh b/tests/llms/test_llms_text-generation_vllm_langchain_on_intel_hpu.sh index b1fd41e9a..6b8e468f8 100644 --- a/tests/llms/test_llms_text-generation_vllm_langchain_on_intel_hpu.sh +++ b/tests/llms/test_llms_text-generation_vllm_langchain_on_intel_hpu.sh @@ -48,7 +48,7 @@ function start_service() { --ipc=host \ -e HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} \ opea/vllm-gaudi:comps \ - --enforce-eager --model $LLM_MODEL --tensor-parallel-size 1 --host 0.0.0.0 --port 80 --block-size 128 --max-num-seqs 256 --max-seq_len-to-capture 2048 + --model $LLM_MODEL --tensor-parallel-size 1 --host 0.0.0.0 --port 80 --block-size 128 --max-num-seqs 256 --max-seq_len-to-capture 2048 export vLLM_ENDPOINT="http://${ip_address}:${port_number}" docker run -d --rm \ @@ -62,7 +62,7 @@ function start_service() { # check whether vllm ray is fully ready n=0 - until [[ "$n" -ge 120 ]] || [[ $ready == true ]]; do + until [[ "$n" -ge 160 ]] || [[ $ready == true ]]; do docker logs test-comps-vllm-service > ${WORKPATH}/tests/test-comps-vllm-service.log n=$((n+1)) if grep -q throughput ${WORKPATH}/tests/test-comps-vllm-service.log; then diff --git a/tests/llms/test_llms_text-generation_vllm_llamaindex_on_intel_hpu.sh b/tests/llms/test_llms_text-generation_vllm_llamaindex_on_intel_hpu.sh index 3019d6c08..91a30ed85 100644 --- a/tests/llms/test_llms_text-generation_vllm_llamaindex_on_intel_hpu.sh +++ b/tests/llms/test_llms_text-generation_vllm_llamaindex_on_intel_hpu.sh @@ -48,7 +48,7 @@ function start_service() { --ipc=host \ -e HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} \ opea/vllm-gaudi:comps \ - --enforce-eager --model $LLM_MODEL --tensor-parallel-size 1 --host 0.0.0.0 --port 80 --block-size 128 --max-num-seqs 256 --max-seq_len-to-capture 2048 + --model $LLM_MODEL --tensor-parallel-size 1 --host 0.0.0.0 --port 80 --block-size 128 --max-num-seqs 256 --max-seq_len-to-capture 2048 export vLLM_ENDPOINT="http://${ip_address}:${port_number}" docker run -d --rm \ @@ -62,7 +62,7 @@ function start_service() { # check whether vllm ray is fully ready n=0 - until [[ "$n" -ge 120 ]] || [[ $ready == true ]]; do + until [[ "$n" -ge 160 ]] || [[ $ready == true ]]; do docker logs test-comps-vllm-service > ${WORKPATH}/tests/test-comps-vllm-service.log n=$((n+1)) if grep -q throughput ${WORKPATH}/tests/test-comps-vllm-service.log; then From c409ef9fcc7b2364837a83d911a9487d2ee8c136 Mon Sep 17 00:00:00 2001 From: Liang Lv Date: Tue, 10 Dec 2024 13:20:16 +0800 Subject: [PATCH 5/5] Add Component base class for code refactoring (#983) * Add Component base class Signed-off-by: lvliang-intel * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add controller class Signed-off-by: lvliang-intel * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * add ut Signed-off-by: lvliang-intel * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: lvliang-intel Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- comps/__init__.py | 3 + comps/cores/common/__init__.py | 2 + comps/cores/common/component.py | 155 +++++++++++++++++++++++++++ tests/cores/common/test_component.py | 148 +++++++++++++++++++++++++ 4 files changed, 308 insertions(+) create mode 100644 comps/cores/common/__init__.py create mode 100644 comps/cores/common/component.py create mode 100644 tests/cores/common/test_component.py diff --git a/comps/__init__.py b/comps/__init__.py index ee7caaf63..8fe3ac5fd 100644 --- a/comps/__init__.py +++ b/comps/__init__.py @@ -68,6 +68,9 @@ # Telemetry from comps.cores.telemetry.opea_telemetry import opea_telemetry +# Common +from comps.cores.common.component import OpeaComponent, OpeaComponentController + # Statistics from comps.cores.mega.base_statistics import statistics_dict, register_statistics diff --git a/comps/cores/common/__init__.py b/comps/cores/common/__init__.py new file mode 100644 index 000000000..916f3a44b --- /dev/null +++ b/comps/cores/common/__init__.py @@ -0,0 +1,2 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 diff --git a/comps/cores/common/component.py b/comps/cores/common/component.py new file mode 100644 index 000000000..8b59d9511 --- /dev/null +++ b/comps/cores/common/component.py @@ -0,0 +1,155 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +from abc import ABC, abstractmethod + + +class OpeaComponent(ABC): + """The OpeaComponent class serves as the base class for all components in the GenAIComps. + It provides a unified interface and foundational attributes that every derived component inherits and extends. + + Attributes: + name (str): The name of the component. + type (str): The type of the component (e.g., 'retriever', 'embedding', 'reranking', 'llm', etc.). + description (str): A brief description of the component's functionality. + config (dict): A dictionary containing configuration parameters for the component. + """ + + def __init__(self, name: str, type: str, description: str, config: dict = None): + """Initializes an OpeaComponent instance with the provided attributes. + + Args: + name (str): The name of the component. + type (str): The type of the component. + description (str): A brief description of the component. + config (dict, optional): Configuration parameters for the component. Defaults to an empty dictionary. + """ + self.name = name + self.type = type + self.description = description + self.config = config if config is not None else {} + + def get_meta(self) -> dict: + """Retrieves metadata about the component, including its name, type, description, and configuration. + + Returns: + dict: A dictionary containing the component's metadata. + """ + return { + "name": self.name, + "type": self.type, + "description": self.description, + "config": self.config, + } + + def update_config(self, key: str, value): + """Updates a configuration parameter for the component. + + Args: + key (str): The configuration parameter's key. + value: The new value for the configuration parameter. + """ + self.config[key] = value + + @abstractmethod + def check_health(self) -> bool: + """Checks the health of the component. + + Returns: + bool: True if the component is healthy, False otherwise. + """ + pass + + @abstractmethod + def invoke(self, *args, **kwargs): + """Invoke service accessing using the component. + + Args: + *args: Positional arguments. + **kwargs: Keyword arguments. + + Returns: + Any: The result of the service accessing. + """ + pass + + def __repr__(self): + """Provides a string representation of the component for debugging and logging purposes. + + Returns: + str: A string representation of the OpeaComponent instance. + """ + return f"OpeaComponent(name={self.name}, type={self.type}, description={self.description})" + + +class OpeaComponentController(ABC): + """The OpeaComponentController class serves as the base class for managing and orchestrating multiple + instances of components of the same type. It provides a unified interface for routing tasks, + registering components, and dynamically discovering available components. + + Attributes: + components (dict): A dictionary to store registered components by their unique identifiers. + """ + + def __init__(self): + """Initializes the OpeaComponentController instance with an empty component registry.""" + self.components = {} + self.active_component = None + + def register(self, component): + """Registers an OpeaComponent instance to the controller. + + Args: + component (OpeaComponent): An instance of a subclass of OpeaComponent to be managed. + + Raises: + ValueError: If the component is already registered. + """ + if component.name in self.components: + raise ValueError(f"Component '{component.name}' is already registered.") + self.components[component.name] = component + + def discover_and_activate(self): + """Discovers healthy components and activates one. + + If multiple components are healthy, it prioritizes the first registered component. + """ + for component in self.components.values(): + if component.check_health(): + self.active_component = component + print(f"Activated component: {component.name}") + return + raise RuntimeError("No healthy components available.") + + def invoke(self, *args, **kwargs): + """Invokes service accessing using the active component. + + Args: + *args: Positional arguments. + **kwargs: Keyword arguments. + + Returns: + Any: The result of the service accessing. + + Raises: + RuntimeError: If no active component is set. + """ + if not self.active_component: + raise RuntimeError("No active component. Call 'discover_and_activate' first.") + return self.active_component.invoke(*args, **kwargs) + + def list_components(self): + """Lists all registered components. + + Returns: + list: A list of component names that are currently registered. + """ + return self.components.keys() + + def __repr__(self): + """Provides a string representation of the controller and its registered components. + + Returns: + str: A string representation of the OpeaComponentController instance. + """ + return f"OpeaComponentController(registered_components={self.list_components()})" diff --git a/tests/cores/common/test_component.py b/tests/cores/common/test_component.py new file mode 100644 index 000000000..4af06a0e5 --- /dev/null +++ b/tests/cores/common/test_component.py @@ -0,0 +1,148 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import unittest +from unittest.mock import MagicMock + +from comps import OpeaComponent, OpeaComponentController + + +class TestOpeaComponent(unittest.TestCase): + class MockOpeaComponent(OpeaComponent): + def check_health(self) -> bool: + return True + + def invoke(self, *args, **kwargs): + return "Service accessed" + + def test_initialization(self): + component = self.MockOpeaComponent("TestComponent", "embedding", "Test description") + + self.assertEqual(component.name, "TestComponent") + self.assertEqual(component.type, "embedding") + self.assertEqual(component.description, "Test description") + self.assertEqual(component.config, {}) + + def test_get_meta(self): + component = self.MockOpeaComponent("TestComponent", "embedding", "Test description", {"key": "value"}) + meta = component.get_meta() + + self.assertEqual(meta["name"], "TestComponent") + self.assertEqual(meta["type"], "embedding") + self.assertEqual(meta["description"], "Test description") + self.assertEqual(meta["config"], {"key": "value"}) + + def test_update_config(self): + component = self.MockOpeaComponent("TestComponent", "embedding", "Test description") + component.update_config("key", "new_value") + + self.assertEqual(component.config["key"], "new_value") + + +class TestOpeaComponentController(unittest.TestCase): + def test_register_component(self): + controller = OpeaComponentController() + component = MagicMock() + component.name = "TestComponent" + controller.register(component) + + self.assertIn("TestComponent", controller.components) + + with self.assertRaises(ValueError): + controller.register(component) + + def test_discover_and_activate(self): + controller = OpeaComponentController() + + # Mock a healthy component + component1 = MagicMock() + component1.name = "Component1" + component1.check_health.return_value = True + + # Register and activate the healthy component + controller.register(component1) + controller.discover_and_activate() + + # Ensure the component is activated + self.assertEqual(controller.active_component, component1) + + # Add another component that is unhealthy + component2 = MagicMock() + component2.name = "Component2" + component2.check_health.return_value = False + controller.register(component2) + + # Call discover_and_activate again; the active component should remain the same + controller.discover_and_activate() + self.assertEqual(controller.active_component, component1) + + def test_invoke_no_active_component(self): + controller = OpeaComponentController() + with self.assertRaises(RuntimeError): + controller.invoke("arg1", key="value") + + def test_invoke_with_active_component(self): + controller = OpeaComponentController() + + # Mock a component + component = MagicMock() + component.name = "TestComponent" + component.check_health.return_value = True + component.invoke = MagicMock(return_value="Service accessed") + + # Register and activate the component + controller.register(component) + controller.discover_and_activate() + + # Invoke using the active component + result = controller.invoke("arg1", key="value") + + # Assert the result and method call + self.assertEqual(result, "Service accessed") + component.invoke.assert_called_with("arg1", key="value") + + def test_discover_then_invoke(self): + """Ensures that `discover_and_activate` and `invoke` work correctly when called sequentially.""" + controller = OpeaComponentController() + + # Mock a healthy component + component1 = MagicMock() + component1.name = "Component1" + component1.check_health.return_value = True + component1.invoke = MagicMock(return_value="Result from Component1") + + # Register the component + controller.register(component1) + + # Discover and activate + controller.discover_and_activate() + + # Ensure the component is activated + self.assertEqual(controller.active_component, component1) + + # Call invoke separately + result = controller.invoke("test_input") + self.assertEqual(result, "Result from Component1") + component1.invoke.assert_called_once_with("test_input") + + def test_list_components(self): + controller = OpeaComponentController() + + # Mock components + component1 = MagicMock() + component1.name = "Component1" + component2 = MagicMock() + component2.name = "Component2" + + # Register components + controller.register(component1) + controller.register(component2) + + # Assert the components list + components_list = controller.list_components() + self.assertIn("Component1", components_list) + self.assertIn("Component2", components_list) + + +if __name__ == "__main__": + unittest.main()