From a61b5031f2b5fd30aec24724e5c52bd8bc3cc1a8 Mon Sep 17 00:00:00 2001 From: Letong Han <106566639+letonghan@users.noreply.github.com> Date: Tue, 13 Aug 2024 12:14:56 +0800 Subject: [PATCH] Refine robustness of Dataprep Redis (#463) Signed-off-by: letonghan Signed-off-by: siddhivelankar23 --- comps/dataprep/redis/README.md | 13 +- comps/dataprep/redis/langchain/config.py | 2 + .../docker/docker-compose-dataprep-redis.yaml | 4 +- .../redis/langchain/prepare_doc_redis.py | 210 +++++++++++++----- .../dataprep/redis/langchain/requirements.txt | 4 +- comps/dataprep/utils.py | 13 ++ 6 files changed, 172 insertions(+), 74 deletions(-) diff --git a/comps/dataprep/redis/README.md b/comps/dataprep/redis/README.md index 1afb6e8e0..0845254df 100644 --- a/comps/dataprep/redis/README.md +++ b/comps/dataprep/redis/README.md @@ -38,9 +38,6 @@ Please refer to this [readme](../../vectorstores/langchain/redis/README.md). ```bash export REDIS_URL="redis://${your_ip}:6379" export INDEX_NAME=${your_index_name} -export LANGCHAIN_TRACING_V2=true -export LANGCHAIN_API_KEY=${your_langchain_api_key} -export LANGCHAIN_PROJECT="opea/gen-ai-comps:dataprep" export PYTHONPATH=${path_to_comps} ``` @@ -97,13 +94,9 @@ Please refer to this [readme](../../vectorstores/langchain/redis/README.md). ```bash export EMBEDDING_MODEL_ID="BAAI/bge-base-en-v1.5" export TEI_ENDPOINT="http://${your_ip}:6006" -export REDIS_HOST=${your_ip} -export REDIS_PORT=6379 export REDIS_URL="redis://${your_ip}:6379" export INDEX_NAME=${your_index_name} -export LANGCHAIN_TRACING_V2=true -export LANGCHAIN_API_KEY=${your_langchain_api_key} -export LANGCHAIN_PROJECT="opea/dataprep" +export HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token} ``` ## 2.3 Build Docker Image @@ -136,13 +129,13 @@ docker build -t opea/dataprep-on-ray-redis:latest --build-arg https_proxy=$https - option 1: Start single-process version (for 1-10 files processing) ```bash -docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/dataprep-redis:latest +docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT -e HUGGINGFACEHUB_API_TOKEN=$HUGGINGFACEHUB_API_TOKEN opea/dataprep-redis:latest ``` - option 2: Start multi-process version (for >10 files processing) ```bash -docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT -e TIMEOUT_SECONDS=600 opea/dataprep-on-ray-redis:latest +docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT -e HUGGINGFACEHUB_API_TOKEN=$HUGGINGFACEHUB_API_TOKEN -e TIMEOUT_SECONDS=600 opea/dataprep-on-ray-redis:latest ``` ## 2.5 Run with Docker Compose (Option B - deprecated, will move to genAIExample in future) diff --git a/comps/dataprep/redis/langchain/config.py b/comps/dataprep/redis/langchain/config.py index b441f80d8..75715912c 100644 --- a/comps/dataprep/redis/langchain/config.py +++ b/comps/dataprep/redis/langchain/config.py @@ -62,3 +62,5 @@ def format_redis_conn_from_env(): KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME", "file-keys") TIMEOUT_SECONDS = int(os.getenv("TIMEOUT_SECONDS", 600)) + +SEARCH_BATCH_SIZE = int(os.getenv("SEARCH_BATCH_SIZE", 10)) diff --git a/comps/dataprep/redis/langchain/docker/docker-compose-dataprep-redis.yaml b/comps/dataprep/redis/langchain/docker/docker-compose-dataprep-redis.yaml index 74e2bb78f..0ef8a1f1a 100644 --- a/comps/dataprep/redis/langchain/docker/docker-compose-dataprep-redis.yaml +++ b/comps/dataprep/redis/langchain/docker/docker-compose-dataprep-redis.yaml @@ -27,8 +27,6 @@ services: container_name: dataprep-redis-server ports: - "6007:6007" - - "6008:6008" - - "6009:6009" ipc: host environment: no_proxy: ${no_proxy} @@ -39,7 +37,7 @@ services: REDIS_URL: ${REDIS_URL} INDEX_NAME: ${INDEX_NAME} TEI_ENDPOINT: ${TEI_ENDPOINT} - LANGCHAIN_API_KEY: ${LANGCHAIN_API_KEY} + HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN} restart: unless-stopped networks: diff --git a/comps/dataprep/redis/langchain/prepare_doc_redis.py b/comps/dataprep/redis/langchain/prepare_doc_redis.py index 13af980ab..a749cd557 100644 --- a/comps/dataprep/redis/langchain/prepare_doc_redis.py +++ b/comps/dataprep/redis/langchain/prepare_doc_redis.py @@ -10,13 +10,13 @@ # from pyspark import SparkConf, SparkContext import redis -from config import EMBED_MODEL, INDEX_NAME, KEY_INDEX_NAME, REDIS_URL +from config import EMBED_MODEL, INDEX_NAME, KEY_INDEX_NAME, REDIS_URL, SEARCH_BATCH_SIZE from fastapi import Body, File, Form, HTTPException, UploadFile from langchain.text_splitter import RecursiveCharacterTextSplitter -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings +from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_community.vectorstores import Redis +from langchain_huggingface import HuggingFaceEndpointEmbeddings from langchain_text_splitters import HTMLHeaderTextSplitter -from langsmith import traceable from redis.commands.search.field import TextField from redis.commands.search.indexDefinition import IndexDefinition, IndexType @@ -25,7 +25,7 @@ create_upload_folder, document_loader, encode_filename, - get_file_structure, + format_search_results, get_separators, get_tables_result, parse_html, @@ -76,7 +76,7 @@ def search_by_id(client, doc_id): print(f"[ search by id ] searching docs of {doc_id}") try: results = client.load_document(doc_id) - print(f"[ search by id ] search success of {doc_id}") + print(f"[ search by id ] search success of {doc_id}: {results}") return results except Exception as e: print(f"[ search by id ] fail to search docs of {doc_id}: {e}") @@ -109,7 +109,7 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): # Create vectorstore if tei_embedding_endpoint: # create embeddings using TEI endpoint service - embedder = HuggingFaceHubEmbeddings(model=tei_embedding_endpoint) + embedder = HuggingFaceEndpointEmbeddings(model=tei_embedding_endpoint) else: # create embeddings using local embedding model embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL) @@ -139,8 +139,12 @@ def ingest_chunks_to_redis(file_name: str, chunks: List): client = r.ft(KEY_INDEX_NAME) if not check_index_existance(client): assert create_index(client) - assert store_by_id(client, key=file_name, value="#".join(file_ids)) + try: + assert store_by_id(client, key=file_name, value="#".join(file_ids)) + except Exception as e: + print(f"[ ingest chunks ] {e}. Fail to store chunks of file {file_name}.") + raise HTTPException(status_code=500, detail=f"Fail to store chunks of file {file_name}.") return True @@ -177,7 +181,6 @@ def ingest_data_to_redis(doc_path: DocPath): @register_microservice(name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep", host="0.0.0.0", port=6007) -@traceable(run_type="tool") async def ingest_documents( files: Optional[Union[UploadFile, List[UploadFile]]] = File(None), link_list: Optional[str] = Form(None), @@ -189,12 +192,30 @@ async def ingest_documents( print(f"files:{files}") print(f"link_list:{link_list}") + r = redis.Redis(connection_pool=redis_pool) + client = r.ft(KEY_INDEX_NAME) + if files: if not isinstance(files, list): files = [files] uploaded_files = [] + for file in files: encode_file = encode_filename(file.filename) + doc_id = "file:" + encode_file + + # check whether the file already exists + key_ids = None + try: + key_ids = search_by_id(client, doc_id).key_ids + print(f"[ upload file ] File {file.filename} already exists.") + except Exception as e: + print(f"[ upload file ] File {file.filename} does not exist.") + if key_ids: + raise HTTPException( + status_code=400, detail=f"Uploaded file {file.filename} already exists. Please change file name." + ) + save_path = upload_folder + encode_file await save_content_to_local_disk(save_path, file) ingest_data_to_redis( @@ -234,28 +255,39 @@ async def ingest_documents( return {"status": 200, "message": "Data preparation succeeded"} if link_list: - try: - link_list = json.loads(link_list) # Parse JSON string to list - if not isinstance(link_list, list): - raise HTTPException(status_code=400, detail="link_list should be a list.") - for link in link_list: - encoded_link = encode_filename(link) - save_path = upload_folder + encoded_link + ".txt" - content = parse_html([link])[0][0] - await save_content_to_local_disk(save_path, content) - ingest_data_to_redis( - DocPath( - path=save_path, - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - process_table=process_table, - table_strategy=table_strategy, - ) + link_list = json.loads(link_list) # Parse JSON string to list + if not isinstance(link_list, list): + raise HTTPException(status_code=400, detail=f"Link_list {link_list} should be a list.") + for link in link_list: + encoded_link = encode_filename(link) + doc_id = "file:" + encoded_link + ".txt" + + # check whether the link file already exists + key_ids = None + try: + key_ids = search_by_id(client, doc_id).key_ids + print(f"[ upload file ] Link {link} already exists.") + except Exception as e: + print(f"[ upload file ] Link {link} does not exist. Keep storing.") + if key_ids: + raise HTTPException( + status_code=400, detail=f"Uploaded link {link} already exists. Please change another link." + ) + + save_path = upload_folder + encoded_link + ".txt" + content = parse_html([link])[0][0] + await save_content_to_local_disk(save_path, content) + ingest_data_to_redis( + DocPath( + path=save_path, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + process_table=process_table, + table_strategy=table_strategy, ) - print(f"Successfully saved link list {link_list}") - return {"status": 200, "message": "Data preparation succeeded"} - except json.JSONDecodeError: - raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.") + ) + print(f"Successfully saved link list {link_list}") + return {"status": 200, "message": "Data preparation succeeded"} raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") @@ -263,36 +295,73 @@ async def ingest_documents( @register_microservice( name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6007 ) -@traceable(run_type="tool") async def rag_get_file_structure(): print("[ dataprep - get file ] start to get file structure") - if not Path(upload_folder).exists(): - print("No file uploaded, return empty list.") - return [] - - file_content = get_file_structure(upload_folder) - return file_content + # define redis client + r = redis.Redis(connection_pool=redis_pool) + offset = 0 + file_list = [] + while True: + response = r.execute_command("FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE) + # no doc retrieved + if len(response) < 2: + break + file_list = format_search_results(response, file_list) + offset += SEARCH_BATCH_SIZE + # last batch + if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE: + break + return file_list @register_microservice( name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6007 ) -@traceable(run_type="tool") async def delete_single_file(file_path: str = Body(..., embed=True)): """Delete file according to `file_path`. `file_path`: - specific file path (e.g. /path/to/file.txt) - - folder path (e.g. /path/to/folder) - "all": delete all files uploaded """ + + # define redis client + r = redis.Redis(connection_pool=redis_pool) + client = r.ft(KEY_INDEX_NAME) + client2 = r.ft(INDEX_NAME) + # delete all uploaded files if file_path == "all": print("[dataprep - del] delete all files") - remove_folder_with_ignore(upload_folder) - assert drop_index(index_name=INDEX_NAME) - assert drop_index(index_name=KEY_INDEX_NAME) + + # drop index KEY_INDEX_NAME + if check_index_existance(client): + try: + assert drop_index(index_name=KEY_INDEX_NAME) + except Exception as e: + print(f"[dataprep - del] {e}. Fail to drop index {KEY_INDEX_NAME}.") + raise HTTPException(status_code=500, detail=f"Fail to drop index {KEY_INDEX_NAME}.") + else: + print(f"[dataprep - del] Index {KEY_INDEX_NAME} does not exits.") + + # drop index INDEX_NAME + if check_index_existance(client2): + try: + assert drop_index(index_name=INDEX_NAME) + except Exception as e: + print(f"[dataprep - del] {e}. Fail to drop index {INDEX_NAME}.") + raise HTTPException(status_code=500, detail=f"Fail to drop index {INDEX_NAME}.") + else: + print(f"[dataprep - del] Index {INDEX_NAME} does not exits.") + + # delete files on local disk + try: + remove_folder_with_ignore(upload_folder) + except Exception as e: + print(f"[dataprep - del] {e}. Fail to delete {upload_folder}.") + raise HTTPException(status_code=500, detail=f"Fail to delete {upload_folder}.") + print("[dataprep - del] successfully delete all files.") create_upload_folder(upload_folder) return {"status": True} @@ -300,35 +369,58 @@ async def delete_single_file(file_path: str = Body(..., embed=True)): delete_path = Path(upload_folder + "/" + encode_filename(file_path)) print(f"[dataprep - del] delete_path: {delete_path}") - # partially delete files/folders + # partially delete files if delete_path.exists(): - r = redis.Redis(connection_pool=redis_pool) - client = r.ft(KEY_INDEX_NAME) - client2 = r.ft(INDEX_NAME) doc_id = "file:" + encode_filename(file_path) - objs = search_by_id(client, doc_id).key_ids - file_ids = objs.split("#") + + # determine whether this file exists in db KEY_INDEX_NAME + try: + key_ids = search_by_id(client, doc_id).key_ids + except Exception as e: + print(f"[dataprep - del] {e}, File {file_path} does not exists.") + raise HTTPException( + status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path." + ) + file_ids = key_ids.split("#") # delete file if delete_path.is_file(): + # delete file keys id in db KEY_INDEX_NAME try: - for file_id in file_ids: - assert delete_by_id(client2, file_id) assert delete_by_id(client, doc_id) - delete_path.unlink() except Exception as e: - print(f"[dataprep - del] fail to delete file {delete_path}: {e}") - return {"status": False} + print(f"[dataprep - del] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.") + raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.") + + # delete file content in db INDEX_NAME + for file_id in file_ids: + # determine whether this file exists in db INDEX_NAME + try: + content = search_by_id(client2, file_id).content + except Exception as e: + print(f"[dataprep - del] {e}. File {file_path} does not exists.") + raise HTTPException( + status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path." + ) + + # delete file content + try: + assert delete_by_id(client2, file_id) + except Exception as e: + print(f"[dataprep - del] {e}. File {file_path} delete failed for db {INDEX_NAME}") + raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.") + + # delete file on local disk + delete_path.unlink() + + return {"status": True} + # delete folder else: - try: - shutil.rmtree(delete_path) - except Exception as e: - print(f"[dataprep - del] fail to delete folder {delete_path}: {e}") - return {"status": False} - return {"status": True} + print(f"[dataprep - del] Delete folder {file_path} is not supported for now.") + raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.") else: - raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.") + raise HTTPException(status_code=404, detail=f"File {file_path} not found. Please check file_path.") if __name__ == "__main__": diff --git a/comps/dataprep/redis/langchain/requirements.txt b/comps/dataprep/redis/langchain/requirements.txt index 12d389513..284b9379b 100644 --- a/comps/dataprep/redis/langchain/requirements.txt +++ b/comps/dataprep/redis/langchain/requirements.txt @@ -5,10 +5,10 @@ docx2txt easyocr fastapi huggingface_hub -langchain +langchain==0.2.12 langchain-community langchain-text-splitters -langsmith +langchain_huggingface markdown numpy opentelemetry-api diff --git a/comps/dataprep/utils.py b/comps/dataprep/utils.py index 46acc8f5b..ae8361539 100644 --- a/comps/dataprep/utils.py +++ b/comps/dataprep/utils.py @@ -717,6 +717,19 @@ def get_file_structure(root_path: str, parent_path: str = "") -> List[Dict[str, return result +def format_search_results(response, file_list: list): + for i in range(1, len(response), 2): + file_name = response[i].decode()[5:] + file_dict = { + "name": decode_filename(file_name), + "id": decode_filename(file_name), + "type": "File", + "parent": "", + } + file_list.append(file_dict) + return file_list + + def remove_folder_with_ignore(folder_path: str, except_patterns: List = []): """Remove the specific folder, and ignore some files/folders.