Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pinecone update to OPEA #420

Merged
merged 10 commits into from
Aug 12, 2024
Merged
6 changes: 3 additions & 3 deletions comps/dataprep/pinecone/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import os

# Embedding model
EMBED_MODEL = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
EMBED_MODEL = os.getenv("EMBED_MODEL", "BAAI/bge-base-en-v1.5")

# Pinecone configuration
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY", "xxx_xxx")
PINECONE_INDEX_NAME = int(os.getenv("PINECONE_INDEX_NAME", "langchain-test"))
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME", "langchain-test")

# LLM/Embedding endpoints
TGI_LLM_ENDPOINT = os.getenv("TGI_LLM_ENDPOINT", "http://localhost:8080")
TGI_LLM_ENDPOINT_NO_RAG = os.getenv("TGI_LLM_ENDPOINT_NO_RAG", "http://localhost:8081")
TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_ENDPOINT")
TEI_EMBEDDING_ENDPOINT = os.getenv("TEI_EMBEDDING_ENDPOINT")
17 changes: 16 additions & 1 deletion comps/dataprep/pinecone/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ FROM python:3.11-slim

ENV LANG=C.UTF-8

ARG ARCH="cpu"

RUN apt-get update -y && apt-get install -y --no-install-recommends --fix-missing \
build-essential \
libgl1-mesa-glx \
libjemalloc-dev \
vim
default-jre \
vim \
libcairo2


RUN useradd -m -s /bin/bash user && \
mkdir -p /home/user && \
Expand All @@ -23,8 +28,18 @@ COPY comps /home/user/comps
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r /home/user/comps/dataprep/pinecone/requirements.txt

RUN pip install --no-cache-dir --upgrade pip setuptools && \
if [ ${ARCH} = "cpu" ]; then pip install torch torchvision --index-url https://download.pytorch.org/whl/cpu; fi && \
pip install --no-cache-dir -r /home/user/comps/dataprep/pinecone/requirements.txt

ENV PYTHONPATH=$PYTHONPATH:/home/user

USER root

RUN mkdir -p /home/user/comps/dataprep/pinecone/uploaded_files && chown -R user /home/user/comps/dataprep/pinecone/uploaded_files

USER user

WORKDIR /home/user/comps/dataprep/pinecone

ENTRYPOINT ["python", "prepare_doc_pinecone.py"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,40 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

version: "3"
services:
tei-embedding-service:
image: ghcr.io/huggingface/text-embeddings-inference:cpu-1.5
container_name: tei-embedding-server
ports:
- "6006:80"
volumes:
- "./data:/data"
shm_size: 1g
environment:
no_proxy: ${no_proxy}
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
command: --model-id ${EMBEDDING_MODEL_ID} --auto-truncate
dataprep-pinecone:
image: opea/gen-ai-comps:dataprep-pinecone-xeon-server
image: opea/dataprep-pinecone:latest
container_name: dataprep-pinecone-server
ports:
- "6000:6000"
- "6007:6007"
- "6008:6008"
- "6009:6009"
ipc: host
environment:
no_proxy: ${no_proxy}
http_proxy: ${http_proxy}
https_proxy: ${https_proxy}
PINECONE_API_KEY: ${PINECONE_API_KEY}
PINECONE_INDEX_NAME: ${PINECONE_INDEX_NAME}
TEI_EMBEDDING_ENDPOINT: ${TEI_EMBEDDING_ENDPOINT}
LANGCHAIN_API_KEY: ${LANGCHAIN_API_KEY}
HUGGINGFACEHUB_API_TOKEN: ${HUGGINGFACEHUB_API_TOKEN}
restart: unless-stopped

networks:
Expand Down
244 changes: 223 additions & 21 deletions comps/dataprep/pinecone/prepare_doc_pinecone.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,105 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import json
import os
import shutil
import uuid
from pathlib import Path
from typing import List, Optional, Union

from config import EMBED_MODEL, PINECONE_API_KEY, PINECONE_INDEX_NAME
from fastapi import Body, File, Form, HTTPException, UploadFile
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings, HuggingFaceHubEmbeddings
from langchain_community.vectorstores import Pinecone
from langchain_pinecone import PineconeVectorStore
from langchain_text_splitters import HTMLHeaderTextSplitter
from langsmith import traceable
from pinecone import Pinecone, ServerlessSpec

from comps import DocPath, opea_microservices, opea_telemetry, register_microservice
from comps.dataprep.utils import document_loader, get_separators
from comps.dataprep.utils import (
create_upload_folder,
document_loader,
encode_filename,
get_file_structure,
get_separators,
get_tables_result,
parse_html,
remove_folder_with_ignore,
save_content_to_local_disk,
)

tei_embedding_endpoint = os.getenv("TEI_ENDPOINT")
tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT")
upload_folder = "./uploaded_files/"


@register_microservice(
name="opea_service@prepare_doc_pinecone",
endpoint="/v1/dataprep",
host="0.0.0.0",
port=6000,
input_datatype=DocPath,
output_datatype=None,
)
@opea_telemetry
def ingest_documents(doc_path: DocPath):
def check_index_existance():
print(f"[ check index existence ] checking {PINECONE_INDEX_NAME}")
pc = Pinecone(api_key=PINECONE_API_KEY)
existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]
if PINECONE_INDEX_NAME not in existing_indexes:
print("[ check index existence ] index does not exist")
return None
else:
return True


def create_index(client):
pallavijaini0525 marked this conversation as resolved.
Show resolved Hide resolved
print(f"[ create index ] creating index {PINECONE_INDEX_NAME}")
try:
client.create_index(
name=PINECONE_INDEX_NAME,
dimension=768,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
print(f"[ create index ] index {PINECONE_INDEX_NAME} successfully created")
except Exception as e:
print(f"[ create index ] fail to create index {PINECONE_INDEX_NAME}: {e}")
return False
return True


def drop_index(index_name):
print(f"[ drop index ] dropping index {index_name}")
pc = Pinecone(api_key=PINECONE_API_KEY)
try:
pc.delete_index(index_name)
print(f"[ drop index ] index {index_name} deleted")
except Exception as e:
print(f"[ drop index ] index {index_name} delete failed: {e}")
return False
return True


def ingest_data_to_pinecone(doc_path: DocPath):
"""Ingest document to Pinecone."""
doc_path = doc_path.path
print(f"Parsing document {doc_path}.")
path = doc_path.path
print(f"Parsing document {path}.")

text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1500, chunk_overlap=100, add_start_index=True, separators=get_separators()
)
content = document_loader(doc_path)
chunks = text_splitter.split_text(content)
if path.endswith(".html"):
headers_to_split_on = [
("h1", "Header 1"),
("h2", "Header 2"),
("h3", "Header 3"),
]
text_splitter = HTMLHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
else:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=doc_path.chunk_size,
chunk_overlap=doc_path.chunk_overlap,
add_start_index=True,
separators=get_separators(),
)

content = document_loader(path)
chunks = text_splitter.split_text(content)
if doc_path.process_table and path.endswith(".pdf"):
table_chunks = get_tables_result(path, doc_path.table_strategy)
chunks = chunks + table_chunks
print("Done preprocessing. Created ", len(chunks), " chunks of the original pdf")

# Create vectorstore
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
Expand All @@ -43,20 +108,157 @@ def ingest_documents(doc_path: DocPath):
# create embeddings using local embedding model
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)

pc = Pinecone(api_key=PINECONE_API_KEY)

# Checking Index existence
if not check_index_existance():
# Creating the index
create_index(pc)
print("Successfully created the index", PINECONE_INDEX_NAME)

# Batch size
batch_size = 32
num_chunks = len(chunks)
file_ids = []

for i in range(0, num_chunks, batch_size):
batch_chunks = chunks[i : i + batch_size]
batch_texts = batch_chunks

_ = Pinecone.from_texts(
vectorstore = PineconeVectorStore.from_texts(
texts=batch_texts,
embedding=embedder,
index_name=PINECONE_INDEX_NAME,
)
print(f"Processed batch {i//batch_size + 1}/{(num_chunks-1)//batch_size + 1}")

# store file_ids into index file-keys
pc = Pinecone(api_key=PINECONE_API_KEY)


async def ingest_link_to_pinecone(link_list: List[str]):
# Create embedding obj
if tei_embedding_endpoint:
# create embeddings using TEI endpoint service
embedder = HuggingFaceHubEmbeddings(model=tei_embedding_endpoint)
else:
# create embeddings using local embedding model
embedder = HuggingFaceBgeEmbeddings(model_name=EMBED_MODEL)

pc = Pinecone(api_key=PINECONE_API_KEY)

# Checking Index existence
if not check_index_existance():
# Creating the index
create_index(pc)
print("Successfully created the index", PINECONE_INDEX_NAME)

# save link contents and doc_ids one by one
for link in link_list:
content = parse_html([link])[0][0]
print(f"[ ingest link ] link: {link} content: {content}")
encoded_link = encode_filename(link)
save_path = upload_folder + encoded_link + ".txt"
print(f"[ ingest link ] save_path: {save_path}")
await save_content_to_local_disk(save_path, content)

vectorstore = PineconeVectorStore.from_texts(
texts=content,
embedding=embedder,
index_name=PINECONE_INDEX_NAME,
)

return True


@register_microservice(name="opea_service@prepare_doc_pinecone", endpoint="/v1/dataprep", host="0.0.0.0", port=6007)
@traceable(run_type="tool")
async def ingest_documents(
files: Optional[Union[UploadFile, List[UploadFile]]] = File(None),
link_list: Optional[str] = Form(None),
chunk_size: int = Form(1500),
chunk_overlap: int = Form(100),
process_table: bool = Form(False),
table_strategy: str = Form("fast"),
):
print(f"files:{files}")
print(f"link_list:{link_list}")

if files:
if not isinstance(files, list):
files = [files]
uploaded_files = []
for file in files:
encode_file = encode_filename(file.filename)
save_path = upload_folder + encode_file
await save_content_to_local_disk(save_path, file)
ingest_data_to_pinecone(
DocPath(
path=save_path,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
process_table=process_table,
table_strategy=table_strategy,
)
)
uploaded_files.append(save_path)
print(f"Successfully saved file {save_path}")

return {"status": 200, "message": "Data preparation succeeded"}

if link_list:
try:
link_list = json.loads(link_list) # Parse JSON string to list
if not isinstance(link_list, list):
raise HTTPException(status_code=400, detail="link_list should be a list.")
await ingest_link_to_pinecone(link_list)
print(f"Successfully saved link list {link_list}")
return {"status": 200, "message": "Data preparation succeeded"}
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON format for link_list.")

raise HTTPException(status_code=400, detail="Must provide either a file or a string list.")


@register_microservice(
name="opea_service@prepare_doc_pinecone_file", endpoint="/v1/dataprep/get_file", host="0.0.0.0", port=6008
)
@traceable(run_type="tool")
async def rag_get_file_structure():
print("[ dataprep - get file ] start to get file structure")

if not Path(upload_folder).exists():
print("No file uploaded, return empty list.")
return []

file_content = get_file_structure(upload_folder)
return file_content


@register_microservice(
name="opea_service@prepare_doc_pinecone_del", endpoint="/v1/dataprep/delete_file", host="0.0.0.0", port=6009
)
@traceable(run_type="tool")
async def delete_all(file_path: str = Body(..., embed=True)):
"""Delete file according to `file_path`.
`file_path`:
- "all": delete all files uploaded
"""
# delete all uploaded files
if file_path == "all":
print("[dataprep - del] delete all files")
remove_folder_with_ignore(upload_folder)
assert drop_index(index_name=PINECONE_INDEX_NAME)
print("[dataprep - del] successfully delete all files.")
create_upload_folder(upload_folder)
return {"status": True}
else:
raise HTTPException(status_code=404, detail="Single file deletion is not implemented yet")


if __name__ == "__main__":
create_upload_folder(upload_folder)
opea_microservices["opea_service@prepare_doc_pinecone"].start()
opea_microservices["opea_service@prepare_doc_pinecone_file"].start()
opea_microservices["opea_service@prepare_doc_pinecone_del"].start()
Loading
Loading