Skip to content

Commit

Permalink
Merge pull request #151 from KaranrajM/r2r-integration
Browse files Browse the repository at this point in the history
Integrated r2r with indexer and retriever
  • Loading branch information
shreypandey authored Aug 20, 2024
2 parents c39b90a + dbfe0e3 commit d5346de
Show file tree
Hide file tree
Showing 19 changed files with 6,325 additions and 1,387 deletions.
13 changes: 12 additions & 1 deletion .github/workflows/trigger-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,15 @@ jobs:
- name: Test flow with pytest
working-directory: ./flow
run: poetry run pytest

- name: Install dependencies indexer
working-directory: ./indexer
run: poetry install --with test
- name: Test indexer with pytest
working-directory: ./indexer
run: poetry run pytest
- name: Install dependencies retriever
working-directory: ./retriever
run: poetry install --with test
- name: Test retriever with pytest
working-directory: ./retriever
run: poetry run pytest
10 changes: 10 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,14 @@ repos:
name: Run Tests in flow
entry: bash -c 'cd flow && poetry run pytest'
language: system
types: [python]
- id: run-tests-indexer
name: Run Tests in indexer
entry: bash -c 'cd indexer && poetry run pytest'
language: system
types: [python]
- id: run-tests-retriever
name: Run Tests in retriever
entry: bash -c 'cd retriever && poetry run pytest'
language: system
types: [python]
10 changes: 8 additions & 2 deletions api/app/extensions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import logging
from lib.kafka_utils import KafkaProducer, KafkaException
from lib.data_models import Flow, Channel
from lib.data_models import Flow, Channel, Indexer

logger = logging.getLogger("jb-manager-api")

Expand All @@ -11,19 +11,25 @@
flow_topic = os.getenv("KAFKA_FLOW_TOPIC")
if not flow_topic:
raise ValueError("KAFKA_FLOW_TOPIC is not set in the environment")
indexer_topic = os.getenv("KAFKA_INDEXER_TOPIC")
if not indexer_topic:
raise ValueError("KAFKA_INDEXER_TOPIC is not set in the environment")
logger.info("Channel Topic: %s", channel_topic)
logger.info("Flow Topic: %s", flow_topic)
logger.info("Indexer Topic: %s", indexer_topic)

# Connect Kafka Producer automatically using env variables
# and SASL, if applicable
producer = KafkaProducer.from_env_vars()


def produce_message(message: Flow | Channel):
def produce_message(message: Flow | Channel | Indexer):
if isinstance(message, Flow):
topic = flow_topic
elif isinstance(message, Channel):
topic = channel_topic
elif isinstance(message, Indexer):
topic = indexer_topic
else:
raise ValueError("Invalid message type")
try:
Expand Down
43 changes: 37 additions & 6 deletions api/app/routers/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
import uuid
import logging
import uuid
from typing import List

from fastapi import APIRouter, HTTPException, Request
from ...crud import get_chat_history, get_bot_list, get_bot_chat_sessions
from fastapi.datastructures import UploadFile
from lib.data_models.indexer import Indexer, IndexType
from lib.file_storage import StorageHandler

from ...crud import get_bot_chat_sessions, get_bot_list, get_chat_history
from ...extensions import produce_message
from ...handlers.v1 import handle_webhook
from ...handlers.v1.bot_handlers import (
handle_activate_bot,
handle_deactivate_bot,
handle_delete_bot,
handle_update_bot,
handle_install_bot,
handle_update_bot,
)
from ...jb_schema import JBBotCode, JBBotActivate
from ...extensions import produce_message
from ...jb_schema import JBBotActivate, JBBotCode

logger = logging.getLogger("jb-manager-api")
router = APIRouter(
prefix="/v1",
tags=["v1"],
)

KEYS = {"JBMANAGER_KEY": str(uuid.uuid4())}


Expand Down Expand Up @@ -144,3 +149,29 @@ async def plugin_webhook(request: Request):
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return 200


@router.post("/index-data")
async def index_data(
indexer_type: IndexType,
collection_name: str,
files: List[UploadFile],
):
storage = StorageHandler.get_async_instance()
files_list = []
for file in files:
await storage.write_file(
file_path=file.filename,
file_content=file.file.read(),
mime_type=file.content_type,
)
files_list.append(file.filename)
indexer_input = Indexer(
type=indexer_type.value,
collection_name=collection_name,
files=files_list,
)
# write to indexer
produce_message(indexer_input)

return {"message": f"Indexing started for the files in {collection_name}"}
46 changes: 32 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,19 @@ services:
- KAFKA_PRODUCER_PASSWORD=${KAFKA_PRODUCER_PASSWORD}
- KAFKA_CHANNEL_TOPIC=${KAFKA_CHANNEL_TOPIC}
- KAFKA_FLOW_TOPIC=${KAFKA_FLOW_TOPIC}
- KAFKA_INDEXER_TOPIC=${KAFKA_INDEXER_TOPIC}
- ENCRYPTION_KEY=${ENCRYPTION_KEY}
- WA_API_HOST=${WA_API_HOST}
- STORAGE_TYPE=${STORAGE_TYPE}
- AZURE_STORAGE_ACCOUNT_URL=${AZURE_STORAGE_ACCOUNT_URL}
- AZURE_STORAGE_ACCOUNT_KEY=${AZURE_STORAGE_ACCOUNT_KEY}
- AZURE_STORAGE_CONTAINER=${AZURE_STORAGE_CONTAINER}
- PUBLIC_URL_PREFIX=${PUBLIC_URL_PREFIX}
depends_on:
- kafka
- postgres
volumes:
- ./media:/mnt/jb_files
kafka:
image: docker.io/bitnami/kafka:3.6
ports:
Expand Down Expand Up @@ -103,26 +111,39 @@ services:
- ./media:/mnt/jb_files
indexer:
environment:
- KAFKA_BROKER=${KAFKA_BROKER}
- KAFKA_USE_SASL=${KAFKA_USE_SASL}
- KAFKA_CONSUMER_TOPIC=${KAFKA_INDEXER_TOPIC}
- KAFKA_PRODUCER_FLOW_TOPIC=${KAFKA_FLOW_TOPIC}
- POSTGRES_DATABASE_NAME=${POSTGRES_DATABASE_NAME}
- POSTGRES_DATABASE_USERNAME=${POSTGRES_DATABASE_USERNAME}
- POSTGRES_DATABASE_PASSWORD=${POSTGRES_DATABASE_PASSWORD}
- POSTGRES_DATABASE_HOST=${POSTGRES_DATABASE_HOST}
- POSTGRES_DATABASE_PORT=${POSTGRES_DATABASE_PORT}
- OPENAI_API_TYPE=${OPENAI_API_TYPE}
- KAFKA_BROKER=${KAFKA_BROKER}
- KAFKA_USE_SASL=${KAFKA_USE_SASL}
- KAFKA_CONSUMER_TOPIC=${KAFKA_INDEXER_TOPIC}
- KAFKA_PRODUCER_FLOW_TOPIC=${KAFKA_FLOW_TOPIC}
- AZURE_DEPLOYMENT_NAME=${AZURE_DEPLOYMENT_NAME}
- AZURE_EMBEDDING_MODEL_NAME=${AZURE_EMBEDDING_MODEL_NAME}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
- AZURE_OPENAI_API_VERSION=${AZURE_OPENAI_API_VERSION}
- OPENAI_API_TYPE=${OPENAI_API_TYPE}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- DOCUMENT_LOCAL_STORAGE_PATH=./data
- STORAGE_TYPE=${STORAGE_TYPE}
- AZURE_STORAGE_ACCOUNT_URL=${AZURE_STORAGE_ACCOUNT_URL}
- AZURE_STORAGE_ACCOUNT_KEY=${AZURE_STORAGE_ACCOUNT_KEY}
- AZURE_STORAGE_CONTAINER=${AZURE_STORAGE_CONTAINER}
- PUBLIC_URL_PREFIX=${PUBLIC_URL_PREFIX}
depends_on:
- kafka
- postgres
volumes:
- ./media:/mnt/jb_files
retriever:
environment:
- POSTGRES_DATABASE_NAME=${POSTGRES_DATABASE_NAME}
- POSTGRES_DATABASE_USERNAME=${POSTGRES_DATABASE_USERNAME}
- POSTGRES_DATABASE_PASSWORD=${POSTGRES_DATABASE_PASSWORD}
- POSTGRES_DATABASE_HOST=${POSTGRES_DATABASE_HOST}
- POSTGRES_DATABASE_PORT=${POSTGRES_DATABASE_PORT}
- KAFKA_BROKER=${KAFKA_BROKER}
- KAFKA_USE_SASL=${KAFKA_USE_SASL}
- KAFKA_RETRIEVER_TOPIC=${KAFKA_RETRIEVER_TOPIC}
Expand All @@ -131,16 +152,13 @@ services:
- KAFKA_PRODUCER_PASSWORD=${KAFKA_PRODUCER_PASSWORD}
- KAFKA_CONSUMER_USERNAME=${KAFKA_CONSUMER_USERNAME}
- KAFKA_CONSUMER_PASSWORD=${KAFKA_CONSUMER_PASSWORD}
- POSTGRES_DATABASE_NAME=${POSTGRES_DATABASE_NAME}
- POSTGRES_DATABASE_USERNAME=${POSTGRES_DATABASE_USERNAME}
- POSTGRES_DATABASE_PASSWORD=${POSTGRES_DATABASE_PASSWORD}
- POSTGRES_DATABASE_HOST=${POSTGRES_DATABASE_HOST}
- POSTGRES_DATABASE_PORT=${POSTGRES_DATABASE_PORT}
- AZURE_DEPLOYMENT_NAME=${AZURE_DEPLOYMENT_NAME}
- AZURE_EMBEDDING_MODEL_NAME=${AZURE_EMBEDDING_MODEL_NAME}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
- AZURE_OPENAI_API_VERSION=${AZURE_OPENAI_API_VERSION}
- OPENAI_API_TYPE=${OPENAI_API_TYPE}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- AZURE_OPENAI_ENDPOINT=${AZURE_OPENAI_ENDPOINT}
- AZURE_OPENAI_API_KEY=${AZURE_OPENAI_API_KEY}
- AZURE_DEPLOYMENT_NAME=${AZURE_DEPLOYMENT_NAME}
depends_on:
- kafka
- postgres
Expand Down
1 change: 1 addition & 0 deletions flow/src/handlers/bot_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def handle_bot_output(fsm_output: FSMOutput, turn_id: str):
logger.error("RAG query not found in fsm output")
return
flow_output = RAG(
type=rag_query.type,
source="flow",
turn_id=turn_id,
collection_name=rag_query.collection_name,
Expand Down
3 changes: 2 additions & 1 deletion flow/tests/test_bot_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
FSMOutput,
FSMIntent,
MessageType,
RAGResponse,
Message,
TextMessage,
InteractiveReplyMessage,
Expand Down Expand Up @@ -357,13 +356,15 @@
FSMOutput(
intent=FSMIntent.RAG_CALL,
rag_query=RAGQuery(
type="default",
collection_name="test_collection",
query="test_query",
top_chunk_k_value=5,
),
),
RAG(
source="flow",
type="default",
turn_id="test_turn_id",
collection_name="test_collection",
query="test_query",
Expand Down
Loading

0 comments on commit d5346de

Please sign in to comment.