diff --git a/docker/conda/environments/cuda11.8_dev.yml b/docker/conda/environments/cuda11.8_dev.yml index 8d88715d6a..8235cdc6c1 100644 --- a/docker/conda/environments/cuda11.8_dev.yml +++ b/docker/conda/environments/cuda11.8_dev.yml @@ -123,4 +123,4 @@ dependencies: - databricks-connect - pytest-kafka==0.6.0 - pymilvus==2.3.2 - - milvus + - milvus==2.3.2 diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index ea1bd3b4ff..614208d0ed 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -1064,6 +1064,18 @@ def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: return resource.describe(**kwargs) + def release_resource(self, name: str) -> None: + """ + Release a loaded collection from the memory. + + Parameters + ---------- + name : str + Name of the collection to release. + """ + + self._client.release_collection(collection_name=name) + def close(self) -> None: """ Close the connection to the Milvus vector database. diff --git a/morpheus/service/vdb/vector_db_service.py b/morpheus/service/vdb/vector_db_service.py index 3ecc6a65aa..4519437f03 100644 --- a/morpheus/service/vdb/vector_db_service.py +++ b/morpheus/service/vdb/vector_db_service.py @@ -430,6 +430,18 @@ def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: pass + @abstractmethod + def release_resource(self, name: str) -> None: + """ + Release a loaded resource from the memory. + + Parameters + ---------- + name : str + Name of the resource to release. + """ + pass + @abstractmethod def close(self) -> None: """ diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index fe3d47cef2..a0ae9d0a89 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import concurrent.futures import json import random @@ -322,8 +321,7 @@ def test_delete_by_keys(milvus_service: MilvusVectorDBService, # Delete data by keys from the collection. keys_to_delete = [5, 6] - response = milvus_service.delete_by_keys(collection_name, keys_to_delete) - assert response == keys_to_delete + milvus_service.delete_by_keys(collection_name, keys_to_delete) response = milvus_service.query(collection_name, query="id >= 0") assert len(response) == len(milvus_data) - 2 @@ -366,12 +364,10 @@ def test_delete(milvus_service: MilvusVectorDBService, idx_part_collection_confi milvus_service.drop(collection_name) -@pytest.mark.milvus -def test_single_instance_with_collection_lock(milvus_service: MilvusVectorDBService, - idx_part_collection_config: dict, - milvus_data: list[dict]): - - collection_name = "test_insert_and_search_order_with_collection_lock" +def test_release_collection(milvus_service: MilvusVectorDBService, + idx_part_collection_config: dict, + milvus_data: list[dict]): + collection_name = "test_release_collection" # Make sure to drop any existing collection from previous runs. milvus_service.drop(collection_name) @@ -379,69 +375,11 @@ def test_single_instance_with_collection_lock(milvus_service: MilvusVectorDBServ # Create a collection. milvus_service.create(collection_name, **idx_part_collection_config) - filter_query = "age == 26 or age == 27" - execution_order = [] - - def insert_data(): - result = milvus_service.insert(collection_name, milvus_data) - assert result['insert_count'] == len(milvus_data) - execution_order.append("Insert Executed") - - def query_data(): - result = milvus_service.query(collection_name, query=filter_query) - execution_order.append("Search Executed") - assert isinstance(result, list) - - def count_entities(): - milvus_service.count(collection_name) - execution_order.append("Count Collection Entities Executed") - - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - executor.submit(insert_data) - executor.submit(query_data) - executor.submit(count_entities) - - # Assert the execution order - assert execution_order == ["Count Collection Entities Executed", "Insert Executed", "Search Executed"] - - -@pytest.mark.milvus -def test_multi_instance_with_collection_lock(milvus_service: MilvusVectorDBService, - idx_part_collection_config: dict, - milvus_data: list[dict], - milvus_server_uri: str): - - milvus_service_2 = MilvusVectorDBService(uri=milvus_server_uri) - - collection_name = "test_insert_and_search_order_with_collection_lock" - filter_query = "age == 26 or age == 27" - - # Make sure to drop any existing collection from previous runs. - milvus_service.drop(collection_name) - - execution_order = [] - - def create_collection(): - milvus_service.create(collection_name, **idx_part_collection_config) - execution_order.append("Create Executed") - - def insert_data(): - result = milvus_service_2.insert(collection_name, milvus_data) - assert result['insert_count'] == len(milvus_data) - execution_order.append("Insert Executed") - - def query_data(): - result = milvus_service.query(collection_name, query=filter_query) - execution_order.append("Query Executed") - assert isinstance(result, list) - - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - executor.submit(create_collection) - executor.submit(insert_data) - executor.submit(query_data) + # Insert data into the collection. + milvus_service.insert(collection_name, milvus_data) - # Assert the execution order - assert execution_order == ["Create Executed", "Insert Executed", "Query Executed"] + # Release resource from the memory. + milvus_service.release_resource(name=collection_name) def test_get_collection_lock():