Skip to content

Commit

Permalink
Pinecone update to OPEA (opea-project#420)
Browse files Browse the repository at this point in the history
Signed-off-by: pallavi jaini <[email protected]>
Signed-off-by: BaoHuiling <[email protected]>
  • Loading branch information
pallavijaini0525 authored and BaoHuiling committed Aug 15, 2024
1 parent dacb69b commit ca545cf
Show file tree
Hide file tree
Showing 14 changed files with 520 additions and 141 deletions.
6 changes: 3 additions & 3 deletions comps/dataprep/pinecone/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import os

# Embedding model
EMBED_MODEL = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5")

# Pinecone configuration
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "xxx_xxx")
PINECONE_INDEX_NAME = int(os.getenv("PINECONE_INDEX_NAME", "langchain-test"))
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "langchain-test")

# LLM/Embedding endpoints
TGI_LLM_ENDPOINT = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080")
TGI_LLM_ENDPOINT_NO_RAG = os.getenv("TGI_LLM_ENDPOINT_NO_RAG", "http://localhost:8081")
TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_ENDPOINT")
TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT")
17 changes: 16 additions & 1 deletion comps/dataprep/pinecone/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ FROM python:3.11-slim

ENV LANG=C.UTF-8

ARG ARCH="cpu"

RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \
build-essential \
libgl1-mesa-glx \
libjemalloc-dev \
vim
default-jre \
vim \
libcairo2


RUN useradd -m -s /bin/bash user && \
mkdir -p /home/user && \
Expand All @@ -23,8 +28,18 @@ COPY comps /home/user/comps
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r /home/user/comps/dataprep/pinecone/requirements.txt

RUN pip install --no-cache-dir --upgrade pip setuptools && \
if [ ${ARCH} = "cpu" ]; then pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \
pip install --no-cache-dir -r /home/user/comps/dataprep/pinecone/requirements.txt

ENV PYTHONPATH=$PYTHONPATH:/home/user

USER root

RUN mkdir -p /home/user/comps/dataprep/pinecone/uploaded_files && chown -R user /home/user/comps/dataprep/pinecone/uploaded_files

USER user

WORKDIR /home/user/comps/dataprep/pinecone

ENTRYPOINT ["python", "prepare_doc_pinecone.py"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

version: "3"
services:
tei-embedding-service:
image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.5
container_name: tei-embedding-server
ports:
- "6006:80"
volumes:
- "./data:/data"
shm_size: 1g
environment:
no_proxy: ${no_proxy}
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
command: --model-id ${EMBEDDING_MODEL_ID} --auto-truncate
dataprep-pinecone:
image: opea/gen-ai-comps:dataprep-pinecone-xeon-server
image: opea/dataprep-pinecone:latest
container_name: dataprep-pinecone-server
ports:
- "6000:6000"
- "6007:6007"
- "6008:6008"
- "6009:6009"
ipc: host
environment:
no_proxy: ${no_proxy}
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
PINECONE_API_KEY: ${PINECONE_API_KEY}
PINECONE_INDEX_NAME: ${PINECONE_INDEX_NAME}
TEI_EMBEDDING_ENDPOINT: ${TEI_EMBEDDING_ENDPOINT}
LANGCHAIN_API_KEY: ${LANGCHAIN_API_KEY}
HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN}
restart: unless-stopped

networks:
Expand Down
244 changes: 223 additions & 21 deletions comps/dataprep/pinecone/prepare_doc_pinecone.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,105 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import json
import os
import shutil
import uuid
from pathlib import Path
from typing import List, Optional, Union

from config import EMBED_MODEL, PINECONE_API_KEY, PINECONE_INDEX_NAME
from fastapi import Body, File, Form, HTTPException, UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings, HuggingFaceHubEmbeddings
from langchain_community.vectorstores import Pinecone
from langchain_pinecone import PineconeVectorStore
from langchain_text_splitters import HTMLHeaderTextSplitter
from langsmith import traceable
from pinecone import Pinecone, ServerlessSpec

from comps import DocPath, opea_microservices, opea_telemetry, register_microservice
from comps.dataprep.utils import document_loader, get_separators
from comps.dataprep.utils import (
create_upload_folder,
document_loader,
encode_filename,
get_file_structure,
get_separators,
get_tables_result,
parse_html,
remove_folder_with_ignore,
save_content_to_local_disk,
)

tei_embedding_endpoint = os.getenv("TEI_ENDPOINT")
tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT")
upload_folder = "./uploaded_files/"


@register_microservice(
name="opea_service@prepare_doc_pinecone",
endpoint="/v1/dataprep",
host="0.0.0.0",
port=6000,
input_datatype=DocPath,
output_datatype=None,
)
@opea_telemetry
def ingest_documents(doc_path: DocPath):
def check_index_existance():
print(f"[ check index existence ] checking {PINECONE_INDEX_NAME}")
pc = Pinecone(api_key=PINECONE_API_KEY)
existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]
if PINECONE_INDEX_NAME not in existing_indexes:
print("[ check index existence ] index does not exist")
return None
else:
return True


def create_index(client):
print(f"[ create index ] creating index {PINECONE_INDEX_NAME}")
try:
client.create_index(
name=PINECONE_INDEX_NAME,
dimension=768,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
print(f"[ create index ] index {PINECONE_INDEX_NAME} successfully created")
except Exception as e:
print(f"[ create index ] fail to create index {PINECONE_INDEX_NAME}: {e}")
return False
return True


def drop_index(index_name):
print(f"[ drop index ] dropping index {index_name}")
pc = Pinecone(api_key=PINECONE_API_KEY)
try:
pc.delete_index(index_name)
print(f"[ drop index ] index {index_name} deleted")
except Exception as e:
print(f"[ drop index ] index {index_name} delete failed: {e}")
return False
return True


def ingest_data_to_pinecone(doc_path: DocPath):
"""Ingest document to Pinecone."""
doc_path = doc_path.path
print(f"Parsing document {doc_path}.")
path = doc_path.path
print(f"Parsing document {path}.")

text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500, chunk_overlap=100, add_start_index=True, separators=get_separators()
)
content = document_loader(doc_path)
chunks = text_splitter.split_text(content)
if path.endswith(".html"):
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=doc_path.chunk_size,
chunk_overlap=doc_path.chunk_overlap,
add_start_index=True,
separators=get_separators(),
)

content = document_loader(path)
chunks = text_splitter.split_text(content)
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")

# Create vectorstore
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
Expand All @@ -43,20 +108,157 @@ def ingest_documents(doc_path: DocPath):
# create embeddings using local embedding model
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)

pc = Pinecone(api_key=PINECONE_API_KEY)

# Checking Index existence
if not check_index_existance():
# Creating the index
create_index(pc)
print("Successfully created the index", PINECONE_INDEX_NAME)

# Batch size
batch_size = 32
num_chunks = len(chunks)
file_ids = []

for i in range(0, num_chunks, batch_size):
batch_chunks = chunks[i : i + batch_size]
batch_texts = batch_chunks

_ = Pinecone.from_texts(
vectorstore = PineconeVectorStore.from_texts(
texts=batch_texts,
embedding=embedder,
index_name=PINECONE_INDEX_NAME,
)
print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")

# store file_ids into index file-keys
pc = Pinecone(api_key=PINECONE_API_KEY)


async def ingest_link_to_pinecone(link_list: List[str]):
# Create embedding obj
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
embedder = HuggingFaceHubEmbeddings(model=tei_embedding_endpoint)
else:
# create embeddings using local embedding model
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)

pc = Pinecone(api_key=PINECONE_API_KEY)

# Checking Index existence
if not check_index_existance():
# Creating the index
create_index(pc)
print("Successfully created the index", PINECONE_INDEX_NAME)

# save link contents and doc_ids one by one
for link in link_list:
content = parse_html([link])[0][0]
print(f"[ ingest link ] link: {link} content: {content}")
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
print(f"[ ingest link ] save_path: {save_path}")
await save_content_to_local_disk(save_path, content)

vectorstore = PineconeVectorStore.from_texts(
texts=content,
embedding=embedder,
index_name=PINECONE_INDEX_NAME,
)

return True


@register_microservice(name="opea_service@prepare_doc_pinecone", endpoint="/v1/dataprep", host="0.0.0.0", port=6007)
@traceable(run_type="tool")
async def ingest_documents(
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
):
print(f"files:{files}")
print(f"link_list:{link_list}")

if files:
if not isinstance(files, list):
files = [files]
uploaded_files = []
for file in files:
encode_file = encode_filename(file.filename)
save_path = upload_folder + encode_file
await save_content_to_local_disk(save_path, file)
ingest_data_to_pinecone(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)
uploaded_files.append(save_path)
print(f"Successfully saved file {save_path}")

return {"status": 200, "message": "Data preparation succeeded"}

if link_list:
try:
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
await ingest_link_to_pinecone(link_list)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.")

raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")


@register_microservice(
name="opea_service@prepare_doc_pinecone_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6008
)
@traceable(run_type="tool")
async def rag_get_file_structure():
print("[ dataprep - get file ] start to get file structure")

if not Path(upload_folder).exists():
print("No file uploaded, return empty list.")
return []

file_content = get_file_structure(upload_folder)
return file_content


@register_microservice(
name="opea_service@prepare_doc_pinecone_del", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6009
)
@traceable(run_type="tool")
async def delete_all(file_path: str = Body(..., embed=True)):
"""Delete file according to `file_path`.
`file_path`:
- "all": delete all files uploaded
"""
# delete all uploaded files
if file_path == "all":
print("[dataprep - del] delete all files")
remove_folder_with_ignore(upload_folder)
assert drop_index(index_name=PINECONE_INDEX_NAME)
print("[dataprep - del] successfully delete all files.")
create_upload_folder(upload_folder)
return {"status": True}
else:
raise HTTPException(status_code=404, detail="Single file deletion is not implemented yet")


if __name__ == "__main__":
create_upload_folder(upload_folder)
opea_microservices["opea_service@prepare_doc_pinecone"].start()
opea_microservices["opea_service@prepare_doc_pinecone_file"].start()
opea_microservices["opea_service@prepare_doc_pinecone_del"].start()
Loading

0 comments on commit ca545cf

Please sign in to comment.