Skip to content

Commit

Permalink
Merge branch 'release/v0.6' of https://github.com/danswer-ai/danswer
Browse files Browse the repository at this point in the history
…into fixes/user-group-fk
  • Loading branch information
rkuo-danswer committed Oct 1, 2024
2 parents d724da9 + be4aef8 commit 7678a35
Show file tree
Hide file tree
Showing 20 changed files with 504 additions and 76 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/pr-python-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ name: Python Checks
on:
merge_group:
pull_request:
branches: [ main ]
branches:
- main
- 'release/**'

jobs:
mypy-check:
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/pr-python-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ name: Python Unit Tests
on:
merge_group:
pull_request:
branches: [ main ]
branches:
- main
- 'release/**'

jobs:
backend-check:
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/run-it.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ concurrency:
on:
merge_group:
pull_request:
branches: [ main ]
branches:
- main
- 'release/**'

env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
Expand Down
4 changes: 3 additions & 1 deletion CONTRIBUTING_MACOS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ MacOS will likely require you to remove some quarantine attributes on some of th
After installing pre-commit, run the following command:
```bash
sudo xattr -r -d com.apple.quarantine ~/.cache/pre-commit
```
```

doc version 0.1
16 changes: 13 additions & 3 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from danswer.db.models import IndexModelStatus
from danswer.document_index.factory import get_default_document_index
from danswer.indexing.embedder import DefaultIndexingEmbedder
from danswer.indexing.indexing_heartbeat import IndexingHeartbeat
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
from danswer.utils.logger import IndexAttemptSingleton
from danswer.utils.logger import setup_logger
Expand Down Expand Up @@ -103,15 +104,24 @@ def _run_indexing(
)

embedding_model = DefaultIndexingEmbedder.from_db_search_settings(
search_settings=search_settings
search_settings=search_settings,
heartbeat=IndexingHeartbeat(
index_attempt_id=index_attempt.id,
db_session=db_session,
# let the world know we're still making progress after
# every 10 batches
freq=10,
),
)

indexing_pipeline = build_indexing_pipeline(
attempt_id=index_attempt.id,
embedder=embedding_model,
document_index=document_index,
ignore_time_skip=index_attempt.from_beginning
or (search_settings.status == IndexModelStatus.FUTURE),
ignore_time_skip=(
index_attempt.from_beginning
or (search_settings.status == IndexModelStatus.FUTURE)
),
db_session=db_session,
)

Expand Down
4 changes: 4 additions & 0 deletions backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@
for ignored_tag in os.environ.get("JIRA_CONNECTOR_LABELS_TO_SKIP", "").split(",")
if ignored_tag
]
# Maximum size for Jira tickets in bytes (default: 100KB)
JIRA_CONNECTOR_MAX_TICKET_SIZE = int(
os.environ.get("JIRA_CONNECTOR_MAX_TICKET_SIZE", 100 * 1024)
)

GONG_CONNECTOR_START_TIME = os.environ.get("GONG_CONNECTOR_START_TIME")

Expand Down
13 changes: 11 additions & 2 deletions backend/danswer/connectors/danswer_jira/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.app_configs import JIRA_CONNECTOR_LABELS_TO_SKIP
from danswer.configs.app_configs import JIRA_CONNECTOR_MAX_TICKET_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from danswer.connectors.interfaces import GenerateDocumentsOutput
Expand Down Expand Up @@ -134,10 +135,18 @@ def fetch_jira_issues_batch(
else extract_text_from_adf(jira.raw["fields"]["description"])
)
comments = _get_comment_strs(jira, comment_email_blacklist)
semantic_rep = f"{description}\n" + "\n".join(
ticket_content = f"{description}\n" + "\n".join(
[f"Comment: {comment}" for comment in comments if comment]
)

# Check ticket size
if len(ticket_content.encode("utf-8")) > JIRA_CONNECTOR_MAX_TICKET_SIZE:
logger.info(
f"Skipping {jira.key} because it exceeds the maximum size of "
f"{JIRA_CONNECTOR_MAX_TICKET_SIZE} bytes."
)
continue

page_url = f"{jira_client.client_info()}/browse/{jira.key}"

people = set()
Expand Down Expand Up @@ -180,7 +189,7 @@ def fetch_jira_issues_batch(
doc_batch.append(
Document(
id=page_url,
sections=[Section(link=page_url, text=semantic_rep)],
sections=[Section(link=page_url, text=ticket_content)],
source=DocumentSource.JIRA,
semantic_identifier=jira.fields.summary,
doc_updated_at=time_str_to_utc(jira.fields.updated),
Expand Down
15 changes: 14 additions & 1 deletion backend/danswer/indexing/chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
get_metadata_keys_to_ignore,
)
from danswer.connectors.models import Document
from danswer.indexing.indexing_heartbeat import Heartbeat
from danswer.indexing.models import DocAwareChunk
from danswer.natural_language_processing.utils import BaseTokenizer
from danswer.utils.logger import setup_logger
Expand Down Expand Up @@ -123,6 +124,7 @@ def __init__(
chunk_token_limit: int = DOC_EMBEDDING_CONTEXT_SIZE,
chunk_overlap: int = CHUNK_OVERLAP,
mini_chunk_size: int = MINI_CHUNK_SIZE,
heartbeat: Heartbeat | None = None,
) -> None:
from llama_index.text_splitter import SentenceSplitter

Expand All @@ -131,6 +133,7 @@ def __init__(
self.enable_multipass = enable_multipass
self.enable_large_chunks = enable_large_chunks
self.tokenizer = tokenizer
self.heartbeat = heartbeat

self.blurb_splitter = SentenceSplitter(
tokenizer=tokenizer.tokenize,
Expand Down Expand Up @@ -255,7 +258,7 @@ def _create_chunk(
# If the chunk does not have any useable content, it will not be indexed
return chunks

def chunk(self, document: Document) -> list[DocAwareChunk]:
def _handle_single_document(self, document: Document) -> list[DocAwareChunk]:
# Specifically for reproducing an issue with gmail
if document.source == DocumentSource.GMAIL:
logger.debug(f"Chunking {document.semantic_identifier}")
Expand Down Expand Up @@ -302,3 +305,13 @@ def chunk(self, document: Document) -> list[DocAwareChunk]:
normal_chunks.extend(large_chunks)

return normal_chunks

def chunk(self, documents: list[Document]) -> list[DocAwareChunk]:
final_chunks: list[DocAwareChunk] = []
for document in documents:
final_chunks.extend(self._handle_single_document(document))

if self.heartbeat:
self.heartbeat.heartbeat()

return final_chunks
42 changes: 11 additions & 31 deletions backend/danswer/indexing/embedder.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
from abc import ABC
from abc import abstractmethod

from sqlalchemy.orm import Session

from danswer.db.models import IndexModelStatus
from danswer.db.models import SearchSettings
from danswer.db.search_settings import get_current_search_settings
from danswer.db.search_settings import get_secondary_search_settings
from danswer.indexing.indexing_heartbeat import Heartbeat
from danswer.indexing.models import ChunkEmbedding
from danswer.indexing.models import DocAwareChunk
from danswer.indexing.models import IndexChunk
Expand All @@ -24,6 +20,9 @@


class IndexingEmbedder(ABC):
"""Converts chunks into chunks with embeddings. Note that one chunk may have
multiple embeddings associated with it."""

def __init__(
self,
model_name: str,
Expand All @@ -33,6 +32,7 @@ def __init__(
provider_type: EmbeddingProvider | None,
api_key: str | None,
api_url: str | None,
heartbeat: Heartbeat | None,
):
self.model_name = model_name
self.normalize = normalize
Expand All @@ -54,6 +54,7 @@ def __init__(
server_host=INDEXING_MODEL_SERVER_HOST,
server_port=INDEXING_MODEL_SERVER_PORT,
retrim_content=True,
heartbeat=heartbeat,
)

@abstractmethod
Expand All @@ -74,6 +75,7 @@ def __init__(
provider_type: EmbeddingProvider | None = None,
api_key: str | None = None,
api_url: str | None = None,
heartbeat: Heartbeat | None = None,
):
super().__init__(
model_name,
Expand All @@ -83,6 +85,7 @@ def __init__(
provider_type,
api_key,
api_url,
heartbeat,
)

@log_function_time()
Expand Down Expand Up @@ -166,7 +169,7 @@ def embed_chunks(
title_embed_dict[title] = title_embedding

new_embedded_chunk = IndexChunk(
**chunk.dict(),
**chunk.model_dump(),
embeddings=ChunkEmbedding(
full_embedding=chunk_embeddings[0],
mini_chunk_embeddings=chunk_embeddings[1:],
Expand All @@ -180,7 +183,7 @@ def embed_chunks(

@classmethod
def from_db_search_settings(
cls, search_settings: SearchSettings
cls, search_settings: SearchSettings, heartbeat: Heartbeat | None = None
) -> "DefaultIndexingEmbedder":
return cls(
model_name=search_settings.model_name,
Expand All @@ -190,28 +193,5 @@ def from_db_search_settings(
provider_type=search_settings.provider_type,
api_key=search_settings.api_key,
api_url=search_settings.api_url,
heartbeat=heartbeat,
)


def get_embedding_model_from_search_settings(
db_session: Session, index_model_status: IndexModelStatus = IndexModelStatus.PRESENT
) -> IndexingEmbedder:
search_settings: SearchSettings | None
if index_model_status == IndexModelStatus.PRESENT:
search_settings = get_current_search_settings(db_session)
elif index_model_status == IndexModelStatus.FUTURE:
search_settings = get_secondary_search_settings(db_session)
if not search_settings:
raise RuntimeError("No secondary index configured")
else:
raise RuntimeError("Not supporting embedding model rollbacks")

return DefaultIndexingEmbedder(
model_name=search_settings.model_name,
normalize=search_settings.normalize,
query_prefix=search_settings.query_prefix,
passage_prefix=search_settings.passage_prefix,
provider_type=search_settings.provider_type,
api_key=search_settings.api_key,
api_url=search_settings.api_url,
)
41 changes: 41 additions & 0 deletions backend/danswer/indexing/indexing_heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import abc
from typing import Any

from sqlalchemy import func
from sqlalchemy.orm import Session

from danswer.db.index_attempt import get_index_attempt
from danswer.utils.logger import setup_logger

logger = setup_logger()


class Heartbeat(abc.ABC):
"""Useful for any long-running work that goes through a bunch of items
and needs to occasionally give updates on progress.
e.g. chunking, embedding, updating vespa, etc."""

@abc.abstractmethod
def heartbeat(self, metadata: Any = None) -> None:
raise NotImplementedError


class IndexingHeartbeat(Heartbeat):
def __init__(self, index_attempt_id: int, db_session: Session, freq: int):
self.cnt = 0

self.index_attempt_id = index_attempt_id
self.db_session = db_session
self.freq = freq

def heartbeat(self, metadata: Any = None) -> None:
self.cnt += 1
if self.cnt % self.freq == 0:
index_attempt = get_index_attempt(
db_session=self.db_session, index_attempt_id=self.index_attempt_id
)
if index_attempt:
index_attempt.time_updated = func.now()
self.db_session.commit()
else:
logger.error("Index attempt not found, this should not happen!")
20 changes: 10 additions & 10 deletions backend/danswer/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from danswer.document_index.interfaces import DocumentMetadata
from danswer.indexing.chunker import Chunker
from danswer.indexing.embedder import IndexingEmbedder
from danswer.indexing.indexing_heartbeat import IndexingHeartbeat
from danswer.indexing.models import DocAwareChunk
from danswer.indexing.models import DocMetadataAwareIndexChunk
from danswer.utils.logger import setup_logger
Expand Down Expand Up @@ -283,18 +284,10 @@ def index_doc_batch(
return 0, 0

logger.debug("Starting chunking")
chunks: list[DocAwareChunk] = []
for document in ctx.updatable_docs:
chunks.extend(chunker.chunk(document=document))
chunks: list[DocAwareChunk] = chunker.chunk(ctx.updatable_docs)

logger.debug("Starting embedding")
chunks_with_embeddings = (
embedder.embed_chunks(
chunks=chunks,
)
if chunks
else []
)
chunks_with_embeddings = embedder.embed_chunks(chunks) if chunks else []

updatable_ids = [doc.id for doc in ctx.updatable_docs]

Expand Down Expand Up @@ -406,6 +399,13 @@ def build_indexing_pipeline(
tokenizer=embedder.embedding_model.tokenizer,
enable_multipass=multipass,
enable_large_chunks=enable_large_chunks,
# after every doc, update status in case there are a bunch of
# really long docs
heartbeat=IndexingHeartbeat(
index_attempt_id=attempt_id, db_session=db_session, freq=1
)
if attempt_id
else None,
)

return partial(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from danswer.configs.model_configs import DOC_EMBEDDING_CONTEXT_SIZE
from danswer.db.models import SearchSettings
from danswer.indexing.indexing_heartbeat import Heartbeat
from danswer.natural_language_processing.utils import get_tokenizer
from danswer.natural_language_processing.utils import tokenizer_trim_content
from danswer.utils.logger import setup_logger
Expand Down Expand Up @@ -95,6 +96,7 @@ def __init__(
api_url: str | None,
provider_type: EmbeddingProvider | None,
retrim_content: bool = False,
heartbeat: Heartbeat | None = None,
) -> None:
self.api_key = api_key
self.provider_type = provider_type
Expand All @@ -107,6 +109,7 @@ def __init__(
self.tokenizer = get_tokenizer(
model_name=model_name, provider_type=provider_type
)
self.heartbeat = heartbeat

model_server_url = build_model_server_url(server_host, server_port)
self.embed_server_endpoint = f"{model_server_url}/encoder/bi-encoder-embed"
Expand Down Expand Up @@ -166,6 +169,9 @@ def _batch_encode_texts(

response = self._make_model_server_request(embed_request)
embeddings.extend(response.embeddings)

if self.heartbeat:
self.heartbeat.heartbeat()
return embeddings

def encode(
Expand Down
Loading

0 comments on commit 7678a35

Please sign in to comment.