Skip to content

Commit

Permalink
Merge branch 'fea-sherlock' into fea-sherlock-webscraper
Browse files Browse the repository at this point in the history
  • Loading branch information
cwharris authored Nov 8, 2023
2 parents 41f85c4 + 895d915 commit d2c9845
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 72 deletions.
2 changes: 1 addition & 1 deletion docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,4 @@ dependencies:
- databricks-connect
- pytest-kafka==0.6.0
- pymilvus==2.3.2
- milvus
- milvus==2.3.2
12 changes: 12 additions & 0 deletions morpheus/service/vdb/milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,18 @@ def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict:

return resource.describe(**kwargs)

def release_resource(self, name: str) -> None:
"""
Release a loaded collection from the memory.
Parameters
----------
name : str
Name of the collection to release.
"""

self._client.release_collection(collection_name=name)

def close(self) -> None:
"""
Close the connection to the Milvus vector database.
Expand Down
12 changes: 12 additions & 0 deletions morpheus/service/vdb/vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,18 @@ def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict:

pass

@abstractmethod
def release_resource(self, name: str) -> None:
"""
Release a loaded resource from the memory.
Parameters
----------
name : str
Name of the resource to release.
"""
pass

@abstractmethod
def close(self) -> None:
"""
Expand Down
80 changes: 9 additions & 71 deletions tests/test_milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import json
import random

Expand Down Expand Up @@ -322,8 +321,7 @@ def test_delete_by_keys(milvus_service: MilvusVectorDBService,

# Delete data by keys from the collection.
keys_to_delete = [5, 6]
response = milvus_service.delete_by_keys(collection_name, keys_to_delete)
assert response == keys_to_delete
milvus_service.delete_by_keys(collection_name, keys_to_delete)

response = milvus_service.query(collection_name, query="id >= 0")
assert len(response) == len(milvus_data) - 2
Expand Down Expand Up @@ -366,82 +364,22 @@ def test_delete(milvus_service: MilvusVectorDBService, idx_part_collection_confi
milvus_service.drop(collection_name)


@pytest.mark.milvus
def test_single_instance_with_collection_lock(milvus_service: MilvusVectorDBService,
idx_part_collection_config: dict,
milvus_data: list[dict]):

collection_name = "test_insert_and_search_order_with_collection_lock"
def test_release_collection(milvus_service: MilvusVectorDBService,
idx_part_collection_config: dict,
milvus_data: list[dict]):
collection_name = "test_release_collection"

# Make sure to drop any existing collection from previous runs.
milvus_service.drop(collection_name)

# Create a collection.
milvus_service.create(collection_name, **idx_part_collection_config)

filter_query = "age == 26 or age == 27"
execution_order = []

def insert_data():
result = milvus_service.insert(collection_name, milvus_data)
assert result['insert_count'] == len(milvus_data)
execution_order.append("Insert Executed")

def query_data():
result = milvus_service.query(collection_name, query=filter_query)
execution_order.append("Search Executed")
assert isinstance(result, list)

def count_entities():
milvus_service.count(collection_name)
execution_order.append("Count Collection Entities Executed")

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(insert_data)
executor.submit(query_data)
executor.submit(count_entities)

# Assert the execution order
assert execution_order == ["Count Collection Entities Executed", "Insert Executed", "Search Executed"]


@pytest.mark.milvus
def test_multi_instance_with_collection_lock(milvus_service: MilvusVectorDBService,
idx_part_collection_config: dict,
milvus_data: list[dict],
milvus_server_uri: str):

milvus_service_2 = MilvusVectorDBService(uri=milvus_server_uri)

collection_name = "test_insert_and_search_order_with_collection_lock"
filter_query = "age == 26 or age == 27"

# Make sure to drop any existing collection from previous runs.
milvus_service.drop(collection_name)

execution_order = []

def create_collection():
milvus_service.create(collection_name, **idx_part_collection_config)
execution_order.append("Create Executed")

def insert_data():
result = milvus_service_2.insert(collection_name, milvus_data)
assert result['insert_count'] == len(milvus_data)
execution_order.append("Insert Executed")

def query_data():
result = milvus_service.query(collection_name, query=filter_query)
execution_order.append("Query Executed")
assert isinstance(result, list)

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(create_collection)
executor.submit(insert_data)
executor.submit(query_data)
# Insert data into the collection.
milvus_service.insert(collection_name, milvus_data)

# Assert the execution order
assert execution_order == ["Create Executed", "Insert Executed", "Query Executed"]
# Release resource from the memory.
milvus_service.release_resource(name=collection_name)


def test_get_collection_lock():
Expand Down

0 comments on commit d2c9845

Please sign in to comment.