Skip to content

Commit

Permalink
Refine robustness of Dataprep Redis (opea-project#463)
Browse files Browse the repository at this point in the history
Signed-off-by: letonghan <[email protected]>
Signed-off-by: siddhivelankar23 <[email protected]>
  • Loading branch information
letonghan authored and siddhivelankar23 committed Aug 22, 2024
1 parent 6866aed commit a61b503
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 74 deletions.
13 changes: 3 additions & 10 deletions comps/dataprep/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ Please refer to this [readme](../../vectorstores/langchain/redis/README.md).
```bash
export REDIS_URL="redis://${your_ip}:6379"
export INDEX_NAME=${your_index_name}
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=${your_langchain_api_key}
export LANGCHAIN_PROJECT="opea/gen-ai-comps:dataprep"
export PYTHONPATH=${path_to_comps}
```

Expand Down Expand Up @@ -97,13 +94,9 @@ Please refer to this [readme](../../vectorstores/langchain/redis/README.md).
```bash
export EMBEDDING_MODEL_ID="BAAI/bge-base-en-v1.5"
export TEI_ENDPOINT="http://${your_ip}:6006"
export REDIS_HOST=${your_ip}
export REDIS_PORT=6379
export REDIS_URL="redis://${your_ip}:6379"
export INDEX_NAME=${your_index_name}
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=${your_langchain_api_key}
export LANGCHAIN_PROJECT="opea/dataprep"
export HUGGINGFACEHUB_API_TOKEN=${your_hf_api_token}
```

## 2.3 Build Docker Image
Expand Down Expand Up @@ -136,13 +129,13 @@ docker build -t opea/dataprep-on-ray-redis:latest --build-arg https_proxy=$https
- option 1: Start single-process version (for 1-10 files processing)

```bash
docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT opea/dataprep-redis:latest
docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT -e HUGGINGFACEHUB_API_TOKEN=$HUGGINGFACEHUB_API_TOKEN opea/dataprep-redis:latest
```

- option 2: Start multi-process version (for >10 files processing)

```bash
docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT -e TIMEOUT_SECONDS=600 opea/dataprep-on-ray-redis:latest
docker run -d --name="dataprep-redis-server" -p 6007:6007 --runtime=runc --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e REDIS_URL=$REDIS_URL -e INDEX_NAME=$INDEX_NAME -e TEI_ENDPOINT=$TEI_ENDPOINT -e HUGGINGFACEHUB_API_TOKEN=$HUGGINGFACEHUB_API_TOKEN -e TIMEOUT_SECONDS=600 opea/dataprep-on-ray-redis:latest
```

## 2.5 Run with Docker Compose (Option B - deprecated, will move to genAIExample in future)
Expand Down
2 changes: 2 additions & 0 deletions comps/dataprep/redis/langchain/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ def format_redis_conn_from_env():
KEY_INDEX_NAME = os.getenv("KEY_INDEX_NAME", "file-keys")

TIMEOUT_SECONDS = int(os.getenv("TIMEOUT_SECONDS", 600))

SEARCH_BATCH_SIZE = int(os.getenv("SEARCH_BATCH_SIZE", 10))
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ services:
container_name: dataprep-redis-server
ports:
- "6007:6007"
- "6008:6008"
- "6009:6009"
ipc: host
environment:
no_proxy: ${no_proxy}
Expand All @@ -39,7 +37,7 @@ services:
REDIS_URL: ${REDIS_URL}
INDEX_NAME: ${INDEX_NAME}
TEI_ENDPOINT: ${TEI_ENDPOINT}
LANGCHAIN_API_KEY: ${LANGCHAIN_API_KEY}
HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN}
restart: unless-stopped

networks:
Expand Down
210 changes: 151 additions & 59 deletions comps/dataprep/redis/langchain/prepare_doc_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

# from pyspark import SparkConf, SparkContext
import redis
from config import EMBED_MODEL, INDEX_NAME, KEY_INDEX_NAME, REDIS_URL
from config import EMBED_MODEL, INDEX_NAME, KEY_INDEX_NAME, REDIS_URL, SEARCH_BATCH_SIZE
from fastapi import Body, File, Form, HTTPException, UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceHubEmbeddings
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain_community.vectorstores import Redis
from langchain_huggingface import HuggingFaceEndpointEmbeddings
from langchain_text_splitters import HTMLHeaderTextSplitter
from langsmith import traceable
from redis.commands.search.field import TextField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType

Expand All @@ -25,7 +25,7 @@
create_upload_folder,
document_loader,
encode_filename,
get_file_structure,
format_search_results,
get_separators,
get_tables_result,
parse_html,
Expand Down Expand Up @@ -76,7 +76,7 @@ def search_by_id(client, doc_id):
print(f"[ search by id ] searching docs of {doc_id}")
try:
results = client.load_document(doc_id)
print(f"[ search by id ] search success of {doc_id}")
print(f"[ search by id ] search success of {doc_id}: {results}")
return results
except Exception as e:
print(f"[ search by id ] fail to search docs of {doc_id}: {e}")
Expand Down Expand Up @@ -109,7 +109,7 @@ def ingest_chunks_to_redis(file_name: str, chunks: List):
# Create vectorstore
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
embedder = HuggingFaceHubEmbeddings(model=tei_embedding_endpoint)
embedder = HuggingFaceEndpointEmbeddings(model=tei_embedding_endpoint)
else:
# create embeddings using local embedding model
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)
Expand Down Expand Up @@ -139,8 +139,12 @@ def ingest_chunks_to_redis(file_name: str, chunks: List):
client = r.ft(KEY_INDEX_NAME)
if not check_index_existance(client):
assert create_index(client)
assert store_by_id(client, key=file_name, value="#".join(file_ids))

try:
assert store_by_id(client, key=file_name, value="#".join(file_ids))
except Exception as e:
print(f"[ ingest chunks ] {e}. Fail to store chunks of file {file_name}.")
raise HTTPException(status_code=500, detail=f"Fail to store chunks of file {file_name}.")
return True


Expand Down Expand Up @@ -177,7 +181,6 @@ def ingest_data_to_redis(doc_path: DocPath):


@register_microservice(name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep", host="0.0.0.0", port=6007)
@traceable(run_type="tool")
async def ingest_documents(
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
Expand All @@ -189,12 +192,30 @@ async def ingest_documents(
print(f"files:{files}")
print(f"link_list:{link_list}")

r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)

if files:
if not isinstance(files, list):
files = [files]
uploaded_files = []

for file in files:
encode_file = encode_filename(file.filename)
doc_id = "file:" + encode_file

# check whether the file already exists
key_ids = None
try:
key_ids = search_by_id(client, doc_id).key_ids
print(f"[ upload file ] File {file.filename} already exists.")
except Exception as e:
print(f"[ upload file ] File {file.filename} does not exist.")
if key_ids:
raise HTTPException(
status_code=400, detail=f"Uploaded file {file.filename} already exists. Please change file name."
)

save_path = upload_folder + encode_file
await save_content_to_local_disk(save_path, file)
ingest_data_to_redis(
Expand Down Expand Up @@ -234,101 +255,172 @@ async def ingest_documents(
return {"status": 200, "message": "Data preparation succeeded"}

if link_list:
try:
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
for link in link_list:
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
content = parse_html([link])[0][0]
await save_content_to_local_disk(save_path, content)
ingest_data_to_redis(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail=f"Link_list {link_list} should be a list.")
for link in link_list:
encoded_link = encode_filename(link)
doc_id = "file:" + encoded_link + ".txt"

# check whether the link file already exists
key_ids = None
try:
key_ids = search_by_id(client, doc_id).key_ids
print(f"[ upload file ] Link {link} already exists.")
except Exception as e:
print(f"[ upload file ] Link {link} does not exist. Keep storing.")
if key_ids:
raise HTTPException(
status_code=400, detail=f"Uploaded link {link} already exists. Please change another link."
)

save_path = upload_folder + encoded_link + ".txt"
content = parse_html([link])[0][0]
await save_content_to_local_disk(save_path, content)
ingest_data_to_redis(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.")
)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}

raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")


@register_microservice(
name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6007
)
@traceable(run_type="tool")
async def rag_get_file_structure():
print("[ dataprep - get file ] start to get file structure")

if not Path(upload_folder).exists():
print("No file uploaded, return empty list.")
return []

file_content = get_file_structure(upload_folder)
return file_content
# define redis client
r = redis.Redis(connection_pool=redis_pool)
offset = 0
file_list = []
while True:
response = r.execute_command("FT.SEARCH", KEY_INDEX_NAME, "*", "LIMIT", offset, offset + SEARCH_BATCH_SIZE)
# no doc retrieved
if len(response) < 2:
break
file_list = format_search_results(response, file_list)
offset += SEARCH_BATCH_SIZE
# last batch
if (len(response) - 1) // 2 < SEARCH_BATCH_SIZE:
break
return file_list


@register_microservice(
name="opea_service@prepare_doc_redis", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6007
)
@traceable(run_type="tool")
async def delete_single_file(file_path: str = Body(..., embed=True)):
"""Delete file according to `file_path`.
`file_path`:
- specific file path (e.g. /path/to/file.txt)
- folder path (e.g. /path/to/folder)
- "all": delete all files uploaded
"""

# define redis client
r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)
client2 = r.ft(INDEX_NAME)

# delete all uploaded files
if file_path == "all":
print("[dataprep - del] delete all files")
remove_folder_with_ignore(upload_folder)
assert drop_index(index_name=INDEX_NAME)
assert drop_index(index_name=KEY_INDEX_NAME)

# drop index KEY_INDEX_NAME
if check_index_existance(client):
try:
assert drop_index(index_name=KEY_INDEX_NAME)
except Exception as e:
print(f"[dataprep - del] {e}. Fail to drop index {KEY_INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"Fail to drop index {KEY_INDEX_NAME}.")
else:
print(f"[dataprep - del] Index {KEY_INDEX_NAME} does not exits.")

# drop index INDEX_NAME
if check_index_existance(client2):
try:
assert drop_index(index_name=INDEX_NAME)
except Exception as e:
print(f"[dataprep - del] {e}. Fail to drop index {INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"Fail to drop index {INDEX_NAME}.")
else:
print(f"[dataprep - del] Index {INDEX_NAME} does not exits.")

# delete files on local disk
try:
remove_folder_with_ignore(upload_folder)
except Exception as e:
print(f"[dataprep - del] {e}. Fail to delete {upload_folder}.")
raise HTTPException(status_code=500, detail=f"Fail to delete {upload_folder}.")

print("[dataprep - del] successfully delete all files.")
create_upload_folder(upload_folder)
return {"status": True}

delete_path = Path(upload_folder + "/" + encode_filename(file_path))
print(f"[dataprep - del] delete_path: {delete_path}")

# partially delete files/folders
# partially delete files
if delete_path.exists():
r = redis.Redis(connection_pool=redis_pool)
client = r.ft(KEY_INDEX_NAME)
client2 = r.ft(INDEX_NAME)
doc_id = "file:" + encode_filename(file_path)
objs = search_by_id(client, doc_id).key_ids
file_ids = objs.split("#")

# determine whether this file exists in db KEY_INDEX_NAME
try:
key_ids = search_by_id(client, doc_id).key_ids
except Exception as e:
print(f"[dataprep - del] {e}, File {file_path} does not exists.")
raise HTTPException(
status_code=404, detail=f"File not found in db {KEY_INDEX_NAME}. Please check file_path."
)
file_ids = key_ids.split("#")

# delete file
if delete_path.is_file():
# delete file keys id in db KEY_INDEX_NAME
try:
for file_id in file_ids:
assert delete_by_id(client2, file_id)
assert delete_by_id(client, doc_id)
delete_path.unlink()
except Exception as e:
print(f"[dataprep - del] fail to delete file {delete_path}: {e}")
return {"status": False}
print(f"[dataprep - del] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.")

# delete file content in db INDEX_NAME
for file_id in file_ids:
# determine whether this file exists in db INDEX_NAME
try:
content = search_by_id(client2, file_id).content
except Exception as e:
print(f"[dataprep - del] {e}. File {file_path} does not exists.")
raise HTTPException(
status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path."
)

# delete file content
try:
assert delete_by_id(client2, file_id)
except Exception as e:
print(f"[dataprep - del] {e}. File {file_path} delete failed for db {INDEX_NAME}")
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed.")

# delete file on local disk
delete_path.unlink()

return {"status": True}

# delete folder
else:
try:
shutil.rmtree(delete_path)
except Exception as e:
print(f"[dataprep - del] fail to delete folder {delete_path}: {e}")
return {"status": False}
return {"status": True}
print(f"[dataprep - del] Delete folder {file_path} is not supported for now.")
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")
else:
raise HTTPException(status_code=404, detail="File/folder not found. Please check del_path.")
raise HTTPException(status_code=404, detail=f"File {file_path} not found. Please check file_path.")


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions comps/dataprep/redis/langchain/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ docx2txt
easyocr
fastapi
huggingface_hub
langchain
langchain==0.2.12
langchain-community
langchain-text-splitters
langsmith
langchain_huggingface
markdown
numpy
opentelemetry-api
Expand Down
Loading

0 comments on commit a61b503

Please sign in to comment.