Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add hybrid search retriever using Qdrant in-memory vector store #1176

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Add `create_project` bool to `StudioClient.__init__()` to enable users to automatically create their Studio projects
- Add progressbar to the `Runner` to be able to track the `Run`
- Add `StudioClient.submit_benchmark_lineages` function and include it in `StudioClient.submit_benchmark_execution`
- Add `HybridQdrantInMemoryRetriever` enabling hybrid search for in-memory Qdrant collections

#### DocumentIndexClient
- Add method `DocumentIndexClient.chunks()` for retrieving all text chunks of a document.
Expand Down
552 changes: 473 additions & 79 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ argilla = "^2.4.0"
dict-hash = "^1.3.4"
sqlalchemy = "^2.0.35"
psycopg2-binary = "^2.9.9"
fastembed = "^0.4.2"

[tool.poetry.group.dev.dependencies]
# lint & format
Expand Down
3 changes: 3 additions & 0 deletions src/intelligence_layer/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
from .retrievers.document_index_retriever import (
DocumentIndexRetriever as DocumentIndexRetriever,
)
from .retrievers.hybrid_qdrant_in_memory_retriever import (
HybridQdrantInMemoryRetriever as HybridQdrantInMemoryRetriever,
)
from .retrievers.qdrant_in_memory_retriever import (
QdrantInMemoryRetriever as QdrantInMemoryRetriever,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from typing import Optional

from qdrant_client import QdrantClient
from qdrant_client.http.models import Distance, PointStruct, VectorParams, models
from qdrant_client.hybrid.fusion import reciprocal_rank_fusion

from intelligence_layer.connectors.limited_concurrency_client import (
AlephAlphaClientProtocol,
LimitedConcurrencyClient,
)
from intelligence_layer.connectors.retrievers.base_retriever import (
Document,
DocumentChunk,
SearchResult,
)
from intelligence_layer.connectors.retrievers.qdrant_in_memory_retriever import (
QdrantInMemoryRetriever,
RetrieverType,
)


class HybridQdrantInMemoryRetriever(QdrantInMemoryRetriever):
"""Search through documents stored in memory using hybrid (keyword + semantic) search.

This retriever uses a [Qdrant](https://github.com/qdrant/qdrant)-in-Memory vector store instance to store documents and their asymmetric embeddings.
When run, the given query is embedded using both a dense and sparse embedding model and scored against the documents in the collection to find the most relevant documents.
Finally, the retrievals are fused using the Reciprocal Rank Fusion algorithm.

Args:
documents: The sequence of documents to be made searchable.
k: The (top) number of documents to be returned by search.
client: Aleph Alpha client instance for running model related API calls. Defaults to `LimitedConcurrencyClient.from_env()`.
threshold: The minimum value of the fusion rank score (combined cosine similarity and keyword similarity). Defaults to 0.0.
retriever_type: The type of retriever to be instantiated. Should be `ASYMMETRIC` for most query-document retrieveal use cases, `SYMMETRIC` is optimized
for similar document retrieval. Defaults to `ASYMMETRIC`.
distance_metric: The distance metric to be used for vector comparison. Defaults to `Distance.COSINE`.
sparse_model_name: The name of the sparse embedding model from `fastemebed` to be used. Defaults to `"Qdrant/bm25"`.
max_workers: The maximum number of workers to use for concurrent processing. Defaults to 10.

Example:
>>> from intelligence_layer.connectors import LimitedConcurrencyClient, Document, HybridQdrantInMemoryRetriever
>>> client = LimitedConcurrencyClient.from_env()
>>> documents = [Document(text=t) for t in ["I do not like rain.", "Summer is warm.", "We are so back."]]
>>> retriever = HybridQdrantInMemoryRetriever(documents, 5, client=client)
>>> query = "Do you like summer?"
>>> documents = retriever.get_relevant_documents_with_scores(query)
"""

def __init__(
self,
documents: Sequence[Document],
k: int,
client: AlephAlphaClientProtocol | None = None,
threshold: float = 0.0,
retriever_type: RetrieverType = RetrieverType.ASYMMETRIC,
distance_metric: Distance = Distance.COSINE,
sparse_model_name: str = "Qdrant/bm25",
max_workers: int = 10,
) -> None:
self._client = client or LimitedConcurrencyClient.from_env()
self._search_client = QdrantClient(":memory:")
self._collection_name = "in_memory_collection"
self._k = k
self._threshold = threshold
self._query_representation, self._document_representation = retriever_type.value
self._distance_metric = distance_metric
self._max_workers = max_workers

self._search_client.set_sparse_model(sparse_model_name)
self._sparse_vector_field_name = "text-sparse"
self._dense_vector_field_name = "text-dense"

if self._search_client.collection_exists(collection_name=self._collection_name):
self._search_client.delete_collection(
collection_name=self._collection_name,
)
NiklasKoehneckeAA marked this conversation as resolved.
Show resolved Hide resolved

self._search_client.create_collection(
collection_name=self._collection_name,
vectors_config={
self._dense_vector_field_name: VectorParams(
size=128, distance=self._distance_metric
)
},
sparse_vectors_config={
self._sparse_vector_field_name: self._search_client.get_fastembed_sparse_vector_params()[ # type: ignore[index]
str(self._search_client.get_sparse_vector_field_name())
]
},
)

self._add_texts_to_memory(documents)

def _sparse_embed_query(self, query: str) -> models.SparseVector:
if self._search_client.sparse_embedding_model_name is None:
raise ValueError("Sparse embedding model is not set!")
sparse_embedder = self._search_client._get_or_init_sparse_model(
model_name=self._search_client.sparse_embedding_model_name
)
NiklasKoehneckeAA marked this conversation as resolved.
Show resolved Hide resolved

sparse_vector = next(sparse_embedder.query_embed(query=query))
NiklasKoehneckeAA marked this conversation as resolved.
Show resolved Hide resolved
sparse_query_vector = models.SparseVector(
indices=sparse_vector.indices.tolist(),
values=sparse_vector.values.tolist(),
)

return sparse_query_vector

def get_filtered_documents_with_scores(
self, query: str, filter: Optional[models.Filter]
) -> Sequence[SearchResult[int]]:
"""Retrieves documents that match the given query and filter conditions, using hybrid search.

This method performs a hybrid search by embedding the query into dense and sparse vectors.
It then executes search requests for both vector types and combines the results using the
Reciprocal Rank Fusion algorithm.

Args:
query: The text query to search for.
filter: An optional filter to apply to the search results.

Returns:
All documents that correspond to the query and pass the filter,
sorted by their reciprocal rank fusion score.
"""
dense_query_vector = self._embed(query, self._query_representation)
sparse_query_vector = self._sparse_embed_query(query)

dense_request = models.SearchRequest(
vector=models.NamedVector(
name=self._dense_vector_field_name,
vector=dense_query_vector,
),
limit=self._k,
filter=filter,
with_payload=True,
)
sparse_request = models.SearchRequest(
vector=models.NamedSparseVector(
name=self._sparse_vector_field_name,
vector=sparse_query_vector,
),
limit=self._k,
filter=filter,
with_payload=True,
)

dense_request_response, sparse_request_response = (
self._search_client.search_batch(
collection_name=self._collection_name,
requests=[dense_request, sparse_request],
)
)
search_result = reciprocal_rank_fusion(
[dense_request_response, sparse_request_response], limit=self._k
)

return [
self._point_to_search_result(point)
for point in search_result
if point.score >= self._threshold
]

def get_relevant_documents_with_scores(
self, query: str
) -> Sequence[SearchResult[int]]:
"""Search for relevant documents given a query using hybrid search (dense + sparse retrieval).

This method performs a hybrid search by embedding the query into dense and sparse vectors.
It then executes search requests for both vector types and combines the results using the
Reciprocal Rank Fusion algorithm.

Args:
query: The text to be searched with.

Returns:
All documents that correspond to the query,
sorted by their reciprocal rank fusion score.
"""
return self.get_filtered_documents_with_scores(query, filter=None)

def _add_texts_to_memory(self, documents: Sequence[Document]) -> None:
with ThreadPoolExecutor(
max_workers=min(len(documents), self._max_workers)
) as executor:
dense_embeddings = list(
executor.map(
lambda doc: self._embed(doc.text, self._document_representation),
documents,
)
)
if self._search_client.sparse_embedding_model_name is None:
raise ValueError("Sparse embedding model is not set!")
sparse_embeddings = list(
self._search_client._sparse_embed_documents(
documents=[document.text for document in documents],
embedding_model_name=self._search_client.sparse_embedding_model_name,
)
)
self._search_client.upsert(
collection_name=self._collection_name,
wait=True,
points=[
PointStruct(
id=idx,
vector={
self._dense_vector_field_name: dense_vector,
self._sparse_vector_field_name: sparse_vector,
},
asajatovic marked this conversation as resolved.
Show resolved Hide resolved
payload=DocumentChunk(
text=document.text,
start=0,
end=len(document.text) - 1,
metadata=document.metadata,
).model_dump(),
)
for idx, (dense_vector, sparse_vector, document) in enumerate(
zip(dense_embeddings, sparse_embeddings, documents, strict=True)
)
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from collections.abc import Sequence

from pytest import fixture

from intelligence_layer.connectors import AlephAlphaClientProtocol, RetrieverType
from intelligence_layer.connectors.retrievers.base_retriever import Document
from intelligence_layer.connectors.retrievers.hybrid_qdrant_in_memory_retriever import (
HybridQdrantInMemoryRetriever,
)
from tests.conftest import to_document


@fixture
def in_memory_retriever_documents() -> Sequence[Document]:
return [
Document(text="Summer is warm but I like it"),
Document(text="I do not like rain"),
Document(text="We are so back"),
Document(text="Summer rain is rejuvenating"),
]


@fixture
def hybrid_asymmetric_in_memory_retriever(
client: AlephAlphaClientProtocol,
in_memory_retriever_documents: Sequence[Document],
) -> HybridQdrantInMemoryRetriever:
return HybridQdrantInMemoryRetriever(
in_memory_retriever_documents,
client=client,
k=2,
retriever_type=RetrieverType.ASYMMETRIC,
)


@fixture
def hybrid_symmetric_in_memory_retriever(
client: AlephAlphaClientProtocol,
in_memory_retriever_documents: Sequence[Document],
) -> HybridQdrantInMemoryRetriever:
return HybridQdrantInMemoryRetriever(
in_memory_retriever_documents,
client=client,
k=2,
retriever_type=RetrieverType.SYMMETRIC,
)


def test_asymmetric_in_memory_retriever(
hybrid_asymmetric_in_memory_retriever: HybridQdrantInMemoryRetriever,
in_memory_retriever_documents: Sequence[Document],
) -> None:
query = "Do you like hot weather?"
documents = (
hybrid_asymmetric_in_memory_retriever.get_relevant_documents_with_scores(query)
)
assert in_memory_retriever_documents[0] == to_document(documents[0].document_chunk)
assert len(documents) <= 2


def test_symmetric_in_memory_retriever(
hybrid_symmetric_in_memory_retriever: HybridQdrantInMemoryRetriever,
in_memory_retriever_documents: Sequence[Document],
) -> None:
query = "I hate drizzle"
documents = hybrid_symmetric_in_memory_retriever.get_relevant_documents_with_scores(
query
)
assert in_memory_retriever_documents[1] == to_document(documents[0].document_chunk)
assert len(documents) <= 2


def test_hybrid_in_memory_retriever(
hybrid_asymmetric_in_memory_retriever: HybridQdrantInMemoryRetriever,
in_memory_retriever_documents: Sequence[Document],
) -> None:
query = "Summer rain"
documents = (
hybrid_asymmetric_in_memory_retriever.get_relevant_documents_with_scores(query)
)
assert in_memory_retriever_documents[3] == to_document(documents[0].document_chunk)
assert len(documents) <= 2