Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Azure Cosmos DB NoSQL Vector Store & Collection implementation #9296

Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6eefa06
Init: Azure Cosmos DB NoSQL Vector Collection Impl
TaoChenOSU Oct 14, 2024
4855c3c
Add serialize and deserialize implementations
TaoChenOSU Oct 15, 2024
e1dcc8f
Cosmos DB NoSQL store
TaoChenOSU Oct 15, 2024
8501dfb
WIP: Integration tests
TaoChenOSU Oct 16, 2024
3398ad8
DONE: Integration test
TaoChenOSU Oct 16, 2024
a5990c3
Merge branch 'main' into local-branch-cosmos-db-no-sql-vector-impl
TaoChenOSU Oct 23, 2024
d69bd94
Merge branch 'main' into taochen/python-azure-cosmos-db-nosql-vector-…
TaoChenOSU Oct 24, 2024
d215aa2
Add integration tests
TaoChenOSU Oct 25, 2024
be33681
Unit tests; Next: more integration tests for Cosmos DB NoSQL specific…
TaoChenOSU Oct 26, 2024
048c9e8
Complete integration tests; next: workflow file update
TaoChenOSU Oct 28, 2024
1e4529e
Workflow
TaoChenOSU Oct 28, 2024
2dac177
Merge branch 'main' into taochen/python-azure-cosmos-db-nosql-vector-…
TaoChenOSU Oct 28, 2024
1f3b6de
Sample
TaoChenOSU Oct 28, 2024
a0896be
Fix unit test
TaoChenOSU Oct 28, 2024
49039f2
fix unit test 2
TaoChenOSU Oct 28, 2024
16ce4a7
Address comments
TaoChenOSU Oct 29, 2024
bee5b8a
Restructure cosmos db module
TaoChenOSU Oct 29, 2024
d4f4488
Handle cosmos client lifetime
TaoChenOSU Oct 30, 2024
d4e591f
Fix unit test 3
TaoChenOSU Oct 31, 2024
a8693cb
Merge branch 'main' into taochen/python-azure-cosmos-db-nosql-vector-…
TaoChenOSU Nov 8, 2024
d68f0b8
Fix distance function
TaoChenOSU Nov 8, 2024
96875cf
Fix distance function 2
TaoChenOSU Nov 8, 2024
eca01ee
Fix unit tests
TaoChenOSU Nov 8, 2024
6f421d1
Fix integration tests
TaoChenOSU Nov 8, 2024
f4df5b9
Address comments 1
TaoChenOSU Nov 8, 2024
530b0ca
Address comments 2
TaoChenOSU Nov 9, 2024
93bd96f
Address comments 3
TaoChenOSU Nov 9, 2024
c1c8eaa
Various fixes
TaoChenOSU Nov 9, 2024
7300774
Address comments 4
TaoChenOSU Nov 11, 2024
2c31c22
Add database name to settings
TaoChenOSU Nov 11, 2024
a42db22
Merge branch 'main' into taochen/python-azure-cosmos-db-nosql-vector-…
eavanvalkenburg Nov 12, 2024
99d0d31
Fix unit tests
TaoChenOSU Nov 12, 2024
b9965c3
Fix integration test secrets and variables
TaoChenOSU Nov 12, 2024
8c273e7
Integration test timeout
TaoChenOSU Nov 12, 2024
45af835
Fix database name
TaoChenOSU Nov 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/python-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ jobs:
run: docker run -d --name redis-stack-server -p 6379:6379 redis/redis-stack-server:latest
- name: Setup Weaviate docker deployment
run: docker run -d -p 8080:8080 -p 50051:50051 cr.weaviate.io/semitechnologies/weaviate:1.26.6
- name: Start Azure Cosmos DB emulator
if: matrix.os == 'windows-latest'
run: |
Write-Host "Launching Cosmos DB Emulator"
Import-Module "$env:ProgramFiles\Azure Cosmos DB Emulator\PSModules\Microsoft.Azure.CosmosDB.Emulator"
Start-CosmosDbEmulator
- name: Azure CLI Login
if: github.event_name != 'pull_request'
uses: azure/login@v2
Expand Down Expand Up @@ -170,6 +176,8 @@ jobs:
VERTEX_AI_GEMINI_MODEL_ID: ${{ vars.VERTEX_AI_GEMINI_MODEL_ID }}
VERTEX_AI_EMBEDDING_MODEL_ID: ${{ vars.VERTEX_AI_EMBEDDING_MODEL_ID }}
REDIS_CONNECTION_STRING: ${{ vars.REDIS_CONNECTION_STRING }}
COSMOS_DB_NOSQL_URL: ${{ vars.COSMOS_DB_NOSQL_URL }}
COSMOS_DB_NOSQL_KEY: ${{ secrets.COSMOS_DB_NOSQL_KEY }}
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
run: |
uv run pytest -n logical --dist loadfile --dist worksteal ./tests/integration ./tests/samples -v --junitxml=pytest.xml
- name: Surface failing tests
Expand Down Expand Up @@ -259,6 +267,12 @@ jobs:
run: docker run -d --name redis-stack-server -p 6379:6379 redis/redis-stack-server:latest
- name: Setup Weaviate docker deployment
run: docker run -d -p 8080:8080 -p 50051:50051 cr.weaviate.io/semitechnologies/weaviate:1.26.6
- name: Start Azure Cosmos DB emulator
if: matrix.os == 'windows-latest'
run: |
Write-Host "Launching Cosmos DB Emulator"
Import-Module "$env:ProgramFiles\Azure Cosmos DB Emulator\PSModules\Microsoft.Azure.CosmosDB.Emulator"
Start-CosmosDbEmulator
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
- name: Azure CLI Login
if: github.event_name != 'pull_request'
uses: azure/login@v2
Expand Down Expand Up @@ -310,6 +324,8 @@ jobs:
VERTEX_AI_GEMINI_MODEL_ID: ${{ vars.VERTEX_AI_GEMINI_MODEL_ID }}
VERTEX_AI_EMBEDDING_MODEL_ID: ${{ vars.VERTEX_AI_EMBEDDING_MODEL_ID }}
REDIS_CONNECTION_STRING: ${{ vars.REDIS_CONNECTION_STRING }}
COSMOS_DB_NOSQL_URL: ${{ vars.COSMOS_DB_NOSQL_URL }}
COSMOS_DB_NOSQL_KEY: ${{ secrets.COSMOS_DB_NOSQL_KEY }}
run: |
uv run pytest -n logical --dist loadfile --dist worksteal ./tests/integration ./tests/samples -v --junitxml=pytest.xml
- name: Surface failing tests
Expand Down
2 changes: 1 addition & 1 deletion python/.coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ omit =
semantic_kernel/connectors/memory/astradb/*
semantic_kernel/connectors/memory/azure_cognitive_search/*
semantic_kernel/connectors/memory/azure_cosmosdb/*
semantic_kernel/connectors/memory/azure_cosmosdb_no_sql/*
semantic_kernel/connectors/memory/azure_cosmosdb_no_sql/azure_cosmosdb_no_sql_memory_store.py
semantic_kernel/connectors/memory/chroma/*
semantic_kernel/connectors/memory/milvus/*
semantic_kernel/connectors/memory/mongodb_atlas/*
Expand Down
1 change: 1 addition & 0 deletions python/.cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"mongocluster",
"ndarray",
"nopep",
"NOSQL",
"ollama",
"onyourdatatest",
"OPENAI",
Expand Down
169 changes: 95 additions & 74 deletions python/samples/concepts/memory/new_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from semantic_kernel.connectors.ai.open_ai import OpenAIEmbeddingPromptExecutionSettings, OpenAITextEmbedding
from semantic_kernel.connectors.ai.open_ai.services.azure_text_embedding import AzureTextEmbedding
from semantic_kernel.connectors.memory.azure_ai_search import AzureAISearchCollection
from semantic_kernel.connectors.memory.azure_cosmosdb_no_sql.azure_cosmos_db_no_sql_collection import (
AzureCosmosDBNoSQLCollection,
)
from semantic_kernel.connectors.memory.postgres.postgres_collection import PostgresCollection
from semantic_kernel.connectors.memory.qdrant import QdrantCollection
from semantic_kernel.connectors.memory.redis import RedisHashsetCollection, RedisJsonCollection
Expand All @@ -25,55 +28,64 @@
VectorStoreRecordVectorField,
vectorstoremodel,
)


@vectorstoremodel
@dataclass
class MyDataModelArray:
vector: Annotated[
np.ndarray | None,
VectorStoreRecordVectorField(
embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
index_kind="hnsw",
dimensions=1536,
distance_function="cosine",
property_type="float",
serialize_function=np.ndarray.tolist,
deserialize_function=np.array,
),
] = None
other: str | None = None
id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
content: Annotated[
str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="vector", property_type="str")
] = "content1"


@vectorstoremodel
@dataclass
class MyDataModelList:
vector: Annotated[
list[float] | None,
VectorStoreRecordVectorField(
embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
index_kind="hnsw",
dimensions=1536,
distance_function="cosine",
property_type="float",
),
] = None
other: str | None = None
id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
content: Annotated[
str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="vector", property_type="str")
] = "content1"
from semantic_kernel.data.const import DistanceFunction, IndexKind


def get_data_model_array(index_kind: IndexKind, distance_function: DistanceFunction) -> type:
@vectorstoremodel
@dataclass
class DataModelArray:
vector: Annotated[
np.ndarray | None,
VectorStoreRecordVectorField(
embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
index_kind=index_kind,
dimensions=1536,
distance_function=distance_function,
property_type="float",
serialize_function=np.ndarray.tolist,
deserialize_function=np.array,
),
] = None
other: str | None = None
id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
content: Annotated[
str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="vector", property_type="str")
] = "content1"

return DataModelArray


def get_data_model_list(index_kind: IndexKind, distance_function: DistanceFunction) -> type:
@vectorstoremodel
@dataclass
class DataModelList:
vector: Annotated[
list[float] | None,
VectorStoreRecordVectorField(
embedding_settings={"embedding": OpenAIEmbeddingPromptExecutionSettings(dimensions=1536)},
index_kind=index_kind,
dimensions=1536,
distance_function=distance_function,
property_type="float",
),
] = None
other: str | None = None
id: Annotated[str, VectorStoreRecordKeyField()] = field(default_factory=lambda: str(uuid4()))
content: Annotated[
str, VectorStoreRecordDataField(has_embedding=True, embedding_property_name="vector", property_type="str")
] = "content1"

return DataModelList


collection_name = "test"
MyDataModel = MyDataModelArray
# Depending on the vector database, the index kind and distance function may need to be adjusted,
# since not all combinations are supported by all databases.
DataModel = get_data_model_array(IndexKind.HNSW, DistanceFunction.COSINE)

# A list of VectorStoreRecordCollection that can be used.
# Available stores are:
# Available collections are:
# - ai_search: Azure AI Search
# - postgres: PostgreSQL
# - redis_json: Redis JSON
Expand All @@ -83,63 +95,72 @@ class MyDataModelList:
# - weaviate: Weaviate
# Please either configure the weaviate settings via environment variables or provide them through the constructor.
# Note that embed mode is not supported on Windows: https://github.com/weaviate/weaviate/issues/3315
#
# This is represented as a mapping from the store name to a
# function which returns the store.
# Using a function allows for lazy initialization of the store,
# so that settings for unused stores do not cause validation errors.
stores: dict[str, Callable[[], VectorStoreRecordCollection]] = {
"ai_search": lambda: AzureAISearchCollection[MyDataModel](
data_model_type=MyDataModel,
# - azure_cosmos_nosql: Azure Cosmos NoSQL
# https://learn.microsoft.com/en-us/azure/cosmos-db/how-to-develop-emulator?tabs=windows%2Cpython&pivots=api-nosql
# Please see the link above to learn how to set up the Azure Cosmos NoSQL emulator on your machine.
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
# For this sample to work with Azure Cosmos NoSQL, please adjust the index_kind of the data model to QUANTIZED_FLAT.
# This is represented as a mapping from the collection name to a
# function which returns the collection.
# Using a function allows for lazy initialization of the collection,
# so that settings for unused collections do not cause validation errors.
collections: dict[str, Callable[[], VectorStoreRecordCollection]] = {
"ai_search": lambda: AzureAISearchCollection[DataModel](
data_model_type=DataModel,
),
"postgres": lambda: PostgresCollection[str, MyDataModel](
data_model_type=MyDataModel,
"postgres": lambda: PostgresCollection[str, DataModel](
data_model_type=DataModel,
collection_name=collection_name,
),
"redis_json": lambda: RedisJsonCollection[MyDataModel](
data_model_type=MyDataModel,
"redis_json": lambda: RedisJsonCollection[DataModel](
data_model_type=DataModel,
collection_name=collection_name,
prefix_collection_name_to_key_names=True,
),
"redis_hashset": lambda: RedisHashsetCollection[MyDataModel](
data_model_type=MyDataModel,
"redis_hashset": lambda: RedisHashsetCollection[DataModel](
data_model_type=DataModel,
collection_name=collection_name,
prefix_collection_name_to_key_names=True,
),
"qdrant": lambda: QdrantCollection[MyDataModel](
data_model_type=MyDataModel, collection_name=collection_name, prefer_grpc=True, named_vectors=False
"qdrant": lambda: QdrantCollection[DataModel](
data_model_type=DataModel, collection_name=collection_name, prefer_grpc=True, named_vectors=False
),
"volatile": lambda: VolatileCollection[DataModel](
data_model_type=DataModel,
collection_name=collection_name,
),
"volatile": lambda: VolatileCollection[MyDataModel](
data_model_type=MyDataModel,
"weaviate": lambda: WeaviateCollection[DataModel](
data_model_type=DataModel,
collection_name=collection_name,
),
"weaviate": lambda: WeaviateCollection[MyDataModel](
data_model_type=MyDataModel,
"azure_cosmos_nosql": lambda: AzureCosmosDBNoSQLCollection(
data_model_type=DataModel,
database_name="sample_database",
collection_name=collection_name,
create_database=True,
),
}


async def main(store: str, use_azure_openai: bool, embedding_model: str):
async def main(collection: str, use_azure_openai: bool, embedding_model: str):
kernel = Kernel()
service_id = "embedding"
if use_azure_openai:
kernel.add_service(AzureTextEmbedding(service_id=service_id, deployment_name=embedding_model))
else:
kernel.add_service(OpenAITextEmbedding(service_id=service_id, ai_model_id=embedding_model))
async with stores[store]() as record_store:
await record_store.create_collection_if_not_exists()
async with collections[collection]() as record_collection:
await record_collection.create_collection_if_not_exists()

record1 = MyDataModel(content="My text", id="e6103c03-487f-4d7d-9c23-4723651c17f4")
record2 = MyDataModel(content="My other text", id="09caec77-f7e1-466a-bcec-f1d51c5b15be")
record1 = DataModel(content="My text", id="e6103c03-487f-4d7d-9c23-4723651c17f4")
record2 = DataModel(content="My other text", id="09caec77-f7e1-466a-bcec-f1d51c5b15be")

records = await VectorStoreRecordUtils(kernel).add_vector_to_records(
[record1, record2], data_model_type=MyDataModel
[record1, record2], data_model_type=DataModel
)
keys = await record_store.upsert_batch(records)
keys = await record_collection.upsert_batch(records)
print(f"upserted {keys=}")

results = await record_store.get_batch([record1.id, record2.id])
results = await record_collection.get_batch([record1.id, record2.id])
if results:
for result in results:
print(f"found {result.id=}")
Expand All @@ -156,7 +177,7 @@ async def main(store: str, use_azure_openai: bool, embedding_model: str):
argparse.ArgumentParser()

parser = argparse.ArgumentParser()
parser.add_argument("--store", default="volatile", choices=stores.keys(), help="What store to use.")
parser.add_argument("--collection", default="volatile", choices=collections.keys(), help="What collection to use.")
# Option of whether to use OpenAI or Azure OpenAI.
parser.add_argument("--use-azure-openai", action="store_true", help="Use Azure OpenAI instead of OpenAI.")
# Model
Expand All @@ -165,4 +186,4 @@ async def main(store: str, use_azure_openai: bool, embedding_model: str):
)
args = parser.parse_args()

asyncio.run(main(store=args.store, use_azure_openai=args.use_azure_openai, embedding_model=args.model))
asyncio.run(main(collection=args.collection, use_azure_openai=args.use_azure_openai, embedding_model=args.model))
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright (c) Microsoft. All rights reserved.

from azure.cosmos.aio import ContainerProxy, CosmosClient, DatabaseProxy
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from azure.identity import DefaultAzureCredential

from semantic_kernel.connectors.memory.azure_cosmosdb_no_sql.azure_cosmos_db_no_sql_settings import (
AzureCosmosDBNoSQLSettings,
)
from semantic_kernel.exceptions.memory_connector_exceptions import MemoryConnectorResourceNotFound
from semantic_kernel.kernel_pydantic import KernelBaseModel
from semantic_kernel.utils.experimental_decorator import experimental_class


@experimental_class
class AzureCosmosDBNoSQLBase(KernelBaseModel):
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved
"""An Azure Cosmos DB NoSQL collection stores documents in a Azure Cosmos DB NoSQL account."""

cosmos_db_nosql_settings: AzureCosmosDBNoSQLSettings
database_name: str
# If create_database is True, the database will be created
# if it does not exist when an operation requires a database.
create_database: bool

def _get_cosmos_client(self) -> CosmosClient:
"""Gets the Cosmos client.

We cannot cache the Cosmos client because it is only good for one context.
https://github.com/Azure/azure-sdk-for-python/issues/25640
"""
if not self.cosmos_db_nosql_settings.key:
return CosmosClient(str(self.cosmos_db_nosql_settings.url), credential=DefaultAzureCredential())
TaoChenOSU marked this conversation as resolved.
Show resolved Hide resolved

return CosmosClient(
str(self.cosmos_db_nosql_settings.url),
credential=self.cosmos_db_nosql_settings.key.get_secret_value(),
)

async def _does_database_exist(self, cosmos_client: CosmosClient) -> bool:
"""Checks if the database exists."""
try:
await cosmos_client.get_database_client(self.database_name).read()
return True
except CosmosResourceNotFoundError:
return False
except Exception as e:
raise MemoryConnectorResourceNotFound(f"Failed to check if database '{self.database_name}' exists.") from e

async def _get_database_proxy(self, cosmos_client: CosmosClient) -> DatabaseProxy:
"""Gets the database proxy."""
try:
if await self._does_database_exist(cosmos_client):
return cosmos_client.get_database_client(self.database_name)

if self.create_database:
return await cosmos_client.create_database(self.database_name)
raise MemoryConnectorResourceNotFound(f"Database '{self.database_name}' does not exist.")
except Exception as e:
raise MemoryConnectorResourceNotFound(f"Failed to get database proxy for '{id}'.") from e

async def _get_container_proxy(self, container_name: str, cosmos_client: CosmosClient) -> ContainerProxy:
"""Gets the container proxy."""
try:
database_proxy = await self._get_database_proxy(cosmos_client)
return database_proxy.get_container_client(container_name)
except Exception as e:
raise MemoryConnectorResourceNotFound(f"Failed to get container proxy for '{container_name}'.") from e
Loading
Loading