Skip to content

Commit

Permalink
Added milvus lite to pipeline tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bsuryadevara committed Oct 9, 2023
1 parent cde18b2 commit c9316c0
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 147 deletions.
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,4 @@ dependencies:
# Add additional dev dependencies here
- pytest-kafka==0.6.0
- pymilvus==2.3.1
- milvus
27 changes: 27 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import pytest
import requests
from milvus import default_server

from _utils.kafka import _init_pytest_kafka
from _utils.kafka import kafka_bootstrap_servers_fixture # noqa: F401 pylint:disable=unused-import
Expand Down Expand Up @@ -937,3 +938,29 @@ def filter_probs_df(dataset, use_cpp: bool):
that as well, while excluding the combination of C++ execution and Pandas dataframes.
"""
yield dataset["filter_probs.csv"]


@pytest.fixture(scope="module")
def milvus_server_uri():
"""
Pytest fixture to start and stop a Milvus server and provide its URI for testing.
This fixture starts a Milvus server, retrieves its URI (Uniform Resource Identifier), and provides
the URI as a yield value to the tests using this fixture. After all tests in the module are
completed, the Milvus server is stopped.
"""
logger = logging.getLogger(f"morpheus.{__name__}")
try:
default_server.start()
host = "127.0.0.1"
port = default_server.listen_port
uri = f"http://{host}:{port}"

yield uri
except Exception as exec_inf:
logger.error("Error in starting Milvus server: %s", exec_inf)
finally:
try:
default_server.stop()
except Exception as exec_inf:
logger.error("Error in stopping Milvus server: %s", exec_inf)
3 changes: 2 additions & 1 deletion tests/test_milvus_vector_db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def mock_milvus_client() -> MilvusClient:
yield mock_client.return_value


# pylint:disable=unused-argument
@pytest.fixture(scope="function", name="milvus_service_fixture")
def milvus_service(mock_milvus_client_fixture) -> MilvusVectorDBService:
service = MilvusVectorDBService(uri="http://localhost:19530")
Expand Down Expand Up @@ -76,7 +77,7 @@ def test_create(milvus_service_fixture: MilvusVectorDBService,
has_collection: bool):
filepath = path.join(TEST_DIRS.tests_data_dir, "service", "milvus_test_collection_conf.json")
collection_config = {}
with open(filepath, "r") as file:
with open(filepath, "r", encoding="utf-8") as file:
collection_config = json.load(file)

mock_milvus_client_fixture.has_collection.return_value = has_collection
Expand Down
189 changes: 47 additions & 142 deletions tests/test_milvus_write_to_vector_db_stage_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,195 +14,100 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import random

import pytest

import cudf

from _utils import TEST_DIRS
from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.modules import to_control_message # noqa: F401 # pylint: disable=unused-import
from morpheus.pipeline import LinearPipeline
from morpheus.service.milvus_vector_db_service import MilvusVectorDBService
from morpheus.stages.general.linear_modules_stage import LinearModulesStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.output.write_to_vector_db import WriteToVectorDBStage
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from morpheus.utils.module_ids import TO_CONTROL_MESSAGE


@pytest.fixture(scope="function", name="milvus_vdb_serivce_fixture")
def milvus_vdb_serivce():
vdb_service = MilvusVectorDBService(uri="http://localhost:19530")
return vdb_service


def create_milvus_collection(vdb_service: MilvusVectorDBService):
collection_config = {
"collection_conf": {
"shards": 2,
"auto_id": False,
"consistency_level": "Strong",
"description": "Test collection",
"schema_conf": {
"enable_dynamic_field": True,
"schema_fields": [
{
"name": "id",
"dtype": "int64",
"description": "Primary key for the collection",
"is_primary": True,
},
{
"name": "embedding",
"dtype": "float_vector",
"description": "Embedding vectors",
"is_primary": False,
"dim": 10,
},
{
"name": "age",
"dtype": "int64",
"description ": "Age",
"is_primary": False,
},
],
"description": "Test collection schema",
},
}
}
vdb_service.create(name="test", overwrite=True, **collection_config)


def create_milvus_collection_idx_part(vdb_service: MilvusVectorDBService):
collection_config = {
"collection_conf": {
"shards": 2,
"auto_id": False,
"consistency_level": "Strong",
"description": "Test collection with partition and index",
"index_conf": {
"field_name": "embedding", "metric_type": "L2"
},
"partition_conf": {
"timeout": 1, "partitions": [{
"name": "age_partition", "description": "Partition by age"
}]
},
"schema_conf": {
"enable_dynamic_field": True,
"schema_fields": [
{
"name": "id",
"dtype": "int64",
"description": "Primary key for the collection",
"is_primary": True,
},
{
"name": "embedding",
"dtype": "float_vector",
"description": "Embedding vectors",
"is_primary": False,
"dim": 10,
},
{
"name": "age",
"dtype": "int64",
"description ": "Age",
"is_primary": False,
},
],
"description": "Test collection schema",
},
}
}
vdb_service.create(name="test_idx_part", overwrite=True, **collection_config)

@pytest.fixture(scope="function", name="milvus_service_fixture")
def milvus_service(milvus_server_uri: str):
service = MilvusVectorDBService(uri=milvus_server_uri)
yield service

@pytest.mark.use_cpp
def test_write_to_vector_db_stage_with_instance_pipe(milvus_vdb_serivce_fixture,
config: Config,
pipeline_batch_size: int = 256):
config.pipeline_batch_size = pipeline_batch_size

rows_count = 5
dimensions = 10
collection_name = "test"

create_milvus_collection(milvus_vdb_serivce_fixture)
def get_test_df(num_input_rows):

df = cudf.DataFrame({
"id": [i for i in range(rows_count)],
"age": [random.randint(20, 40) for i in range(rows_count)],
"embedding": [[random.random() for _ in range(dimensions)] for _ in range(rows_count)]
"id": list(range(num_input_rows)),
"age": [random.randint(20, 40) for i in range(num_input_rows)],
"embedding": [[random.random() for _ in range(10)] for _ in range(num_input_rows)]
})

to_cm_module_config = {
"module_id": TO_CONTROL_MESSAGE, "module_name": "to_control_message", "namespace": MORPHEUS_MODULE_NAMESPACE
}
vdb_service = MilvusVectorDBService(uri="http://localhost:19530")
return df

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [df]))
pipe.add_stage(
LinearModulesStage(config,
to_cm_module_config,
input_port_name="input",
output_port_name="output",
output_type=ControlMessage))
pipe.add_stage(WriteToVectorDBStage(config, resource_name=collection_name, service=vdb_service))

pipe.run()
def create_milvus_collection(conf_file: str, service: MilvusVectorDBService) -> str:

actual_count = milvus_vdb_serivce_fixture.count(name=collection_name)
milvus_vdb_serivce_fixture.close()
conf_filepath = os.path.join(TEST_DIRS.tests_data_dir, "service", conf_file)

assert actual_count == 5
with open(conf_filepath, 'r', encoding="utf-8") as json_file:
collection_config = json.load(json_file)

collection_name = collection_config.pop("name")

@pytest.mark.use_cpp
def test_write_to_vector_db_stage_with_name_pipe(milvus_vdb_serivce_fixture,
config: Config,
pipeline_batch_size: int = 256):
config.pipeline_batch_size = pipeline_batch_size
service.create(name=collection_name, overwrite=True, **collection_config)

create_milvus_collection_idx_part(milvus_vdb_serivce_fixture)
return collection_name

rows_count = 5
dimensions = 10
collection_name = "test_idx_part"

resource_kwargs = {"collection_conf": {"partition_name": "age_partition"}}
@pytest.mark.slow
@pytest.mark.use_cpp
@pytest.mark.parametrize("use_instance, num_input_rows, expected_num_output_rows", [(True, 5, 5), (False, 5, 5)])
def test_write_to_vector_db_stage_pipe(milvus_service_fixture: MilvusVectorDBService,
milvus_server_uri: str,
use_instance: bool,
config: Config,
num_input_rows: int,
expected_num_output_rows: int):

df = cudf.DataFrame({
"id": [i for i in range(rows_count)],
"age": [random.randint(20, 40) for i in range(rows_count)],
"embedding": [[random.random() for _ in range(dimensions)] for _ in range(rows_count)]
})
collection_name = create_milvus_collection("milvus_test_collection_conf.json", milvus_service_fixture)
df = get_test_df(num_input_rows)

to_cm_module_config = {
"module_id": TO_CONTROL_MESSAGE, "module_name": "to_control_message", "namespace": MORPHEUS_MODULE_NAMESPACE
}

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [df], repeat=2))
pipe.set_source(InMemorySourceStage(config, [df]))
pipe.add_stage(
LinearModulesStage(config,
to_cm_module_config,
input_port_name="input",
output_port_name="output",
output_type=ControlMessage))
pipe.add_stage(
WriteToVectorDBStage(config,
resource_name=collection_name,
service="milvus",
uri="http://localhost:19530",
resource_kwargs=resource_kwargs))

if use_instance:
write_to_vdb_stage = WriteToVectorDBStage(config, resource_name=collection_name, service=milvus_service_fixture)
else:
write_to_vdb_stage = WriteToVectorDBStage(config,
resource_name=collection_name,
service="milvus",
uri=milvus_server_uri)

pipe.add_stage(write_to_vdb_stage)
sink_stage = pipe.add_stage(InMemorySinkStage(config))
pipe.run()

actual_count = milvus_vdb_serivce_fixture.count(name=collection_name)
milvus_vdb_serivce_fixture.close()
messages = sink_stage.get_messages()

assert actual_count == 10
assert len(messages) == 1
assert isinstance(messages[0], ControlMessage)
assert messages[0].has_metadata("insert_response")
assert len(messages[0].payload().df) == expected_num_output_rows
4 changes: 2 additions & 2 deletions tests/tests_data/service/milvus_test_collection_conf.json
Git LFS file not shown
Git LFS file not shown

0 comments on commit c9316c0

Please sign in to comment.