diff --git a/examples/llm/common/content_extractor_module.py b/examples/llm/common/content_extractor_module.py index dcda9aa45c..fd27d02661 100644 --- a/examples/llm/common/content_extractor_module.py +++ b/examples/llm/common/content_extractor_module.py @@ -180,6 +180,7 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int, for chunk in split_text: processed_data.append({ 'title': file_meta.file_name, + 'link': 'none', 'source': f"{file_meta.file_type}:{file_meta.file_path}", 'summary': 'none', 'content': chunk @@ -272,6 +273,14 @@ def file_content_extractor(builder: mrc.Builder): "txt": TextConverter() } + chunk_params = { + file_type: { + "chunk_size": converters_meta.get(file_type, {}).get("chunk_size", chunk_size), + "chunk_overlap": converters_meta.get(file_type, {}).get("chunk_overlap", chunk_overlap) + } + for file_type in converters.keys() + } + def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: data = [] with ThreadPoolExecutor(max_workers=num_threads) as executor: @@ -292,7 +301,10 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta: for file_meta, future in zip(files_meta, futures): docs = future.result() if docs: - result = process_content(docs, file_meta, chunk_size, chunk_overlap) + file_type_chunk_params = chunk_params[file_meta.file_type] + result = process_content(docs, file_meta, + file_type_chunk_params["chunk_size"], + file_type_chunk_params["chunk_overlap"]) if result: data.extend(result) diff --git a/examples/llm/common/utils.py b/examples/llm/common/utils.py index 393d435799..8af50c22b3 100644 --- a/examples/llm/common/utils.py +++ b/examples/llm/common/utils.py @@ -14,6 +14,7 @@ import logging import pymilvus +from morpheus.service.vdb.milvus_client import DATA_TYPE_MAP from langchain.embeddings import HuggingFaceEmbeddings from morpheus.llm.services.nemo_llm_service import NeMoLLMService @@ -45,51 +46,17 @@ def build_llm_service(model_name: str, llm_service: str, tokens_to_generate: int return llm_service.get_client(model_name, **model_kwargs) -def build_milvus_config(embedding_size: int): - milvus_resource_kwargs = { - "index_conf": { - "field_name": "embedding", - "metric_type": "L2", - "index_type": "HNSW", - "params": { - "M": 8, - "efConstruction": 64, - }, - }, - "schema_conf": { - "enable_dynamic_field": True, - "schema_fields": [ - pymilvus.FieldSchema(name="id", - dtype=pymilvus.DataType.INT64, - description="Primary key for the collection", - is_primary=True, - auto_id=True).to_dict(), - pymilvus.FieldSchema(name="title", - dtype=pymilvus.DataType.VARCHAR, - description="Title or heading of the data entry", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="source", - dtype=pymilvus.DataType.VARCHAR, - description="Source or origin of the data entry", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="summary", - dtype=pymilvus.DataType.VARCHAR, - description="Brief summary or abstract of the data content", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="content", - dtype=pymilvus.DataType.VARCHAR, - description="Main content or body of the data entry", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="embedding", - dtype=pymilvus.DataType.FLOAT_VECTOR, - description="Embedding vectors representing the data entry", - dim=embedding_size).to_dict(), - ], - "description": "Collection schema for diverse data sources" - } - } - - return milvus_resource_kwargs +def build_milvus_config(resource_schema_config: dict): + + schema_fields = [] + for field_data in resource_schema_config["schema_conf"]["schema_fields"]: + field_data["dtype"] = DATA_TYPE_MAP.get(field_data["dtype"]) + field_schema = pymilvus.FieldSchema(**field_data) + schema_fields.append(field_schema.to_dict()) + + resource_schema_config["schema_conf"]["schema_fields"] = schema_fields + + return resource_schema_config def build_milvus_service(embedding_size: int, uri: str = "http://localhost:19530"): diff --git a/examples/llm/vdb_upload/common.py b/examples/llm/vdb_upload/common.py index 43a692d5fd..b0f39974c1 100644 --- a/examples/llm/vdb_upload/common.py +++ b/examples/llm/vdb_upload/common.py @@ -202,7 +202,7 @@ def process_vdb_sources(pipe: Pipeline, config: Config, vdb_source_config: typin return vdb_sources -def build_milvus_config(embedding_size: int) -> typing.Dict[str, typing.Any]: +def build_defualt_milvus_config(embedding_size: int) -> typing.Dict[str, typing.Any]: """ Builds the configuration for Milvus. diff --git a/examples/llm/vdb_upload/pipeline.py b/examples/llm/vdb_upload/pipeline.py index b9dc08a7bb..cab3906d48 100644 --- a/examples/llm/vdb_upload/pipeline.py +++ b/examples/llm/vdb_upload/pipeline.py @@ -72,7 +72,6 @@ def pipeline(pipeline_config: Config, source_config: typing.List, vdb_config: ty monitor_2 = pipe.add_stage( MonitorStage(pipeline_config, description="Inference rate", unit="events", delayed_start=True)) - # TODO(Bhargav): Convert WriteToVectorDBStage to module + retain backwards compatibility. vector_db = pipe.add_stage(WriteToVectorDBStage(pipeline_config, **vdb_config)) monitor_3 = pipe.add_stage( diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 78395ec718..801c8688f0 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -19,8 +19,9 @@ import yaml from morpheus.config import Config, PipelineModes -from ..common.utils import build_milvus_config +from .common import build_defualt_milvus_config from ..common.utils import build_rss_urls +from ..common.utils import build_milvus_config logger = logging.getLogger(__name__) @@ -223,7 +224,7 @@ def build_cli_configs(source_type, enable_cache, embedding_size, isolate_embeddi cli_vdb_conf = { 'embedding_size': embedding_size, 'recreate': True, - 'resource_kwargs': build_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None, + 'resource_kwargs': build_defualt_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None, 'resource_name': vector_db_resource_name, 'service': vector_db_service, 'uri': vector_db_uri, @@ -313,7 +314,12 @@ def build_final_config(vdb_conf_path, cli_source_conf, cli_embeddings_conf, cli_ pipeline_conf = merge_configs(vdb_pipeline_config.get('pipeline', {}), cli_pipeline_conf) source_conf = vdb_pipeline_config.get('sources', []) + list(cli_source_conf.values()) tokenizer_conf = merge_configs(vdb_pipeline_config.get('tokenizer', {}), cli_tokenizer_conf) - vdb_conf = merge_configs(vdb_pipeline_config.get('vdb', {}), cli_vdb_conf) + vdb_conf = vdb_pipeline_config.get('vdb', {}) + resource_schema = vdb_conf.pop("resource_shema", None) + + if resource_schema: + vdb_conf["resource_kwargs"] = build_milvus_config(resource_schema) + vdb_conf = merge_configs(vdb_conf, cli_vdb_conf) # TODO: class labels depends on this, so it should be a pipeline level parameter, not a vdb level parameter pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size', 384) diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 5a5397771d..5bdd27bd63 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -123,8 +123,46 @@ vdb_pipeline: model_name: "bert-base-uncased-hash" vdb: - embedding_size: 384 # Size of the embeddings to store in the vector database + embedding_size: 384 recreate: True # Whether to recreate the resource if it already exists resource_name: "VDB2" # Identifier for the resource in the vector database service: "milvus" # Specify the type of vector database uri: "http://localhost:19530" # URI for connecting to the Vector Database server + resource_schema: + index_conf: + field_name: embedding + metric_type: L2 + index_type: HNSW + params: + M: 8 + efConstruction: 64 + + schema_conf: + enable_dynamic_field: true + schema_fields: + - name: id + dtype: INT64 + description: Primary key for the collection + is_primary: true + auto_id: true + - name: title + dtype: VARCHAR + description: Title or heading of the data entry + max_length: 65_535 + - name: source + dtype: VARCHAR + description: Source or origin of the data entry + max_length: 65_535 + - name: summary + dtype: VARCHAR + description: Brief summary or abstract of the data content + max_length: 65_535 + - name: content + dtype: VARCHAR + description: Main content or body of the data entry + max_length: 65_535 + - name: embedding + dtype: FLOAT_VECTOR + description: Embedding vectors representing the data entry + dim: 384 # Size of the embeddings to store in the vector database + description: Collection schema for diverse data sources diff --git a/morpheus/modules/output/__init__.py b/morpheus/modules/output/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/morpheus/modules/output/write_to_vector_db.py b/morpheus/modules/output/write_to_vector_db.py new file mode 100644 index 0000000000..af303364d9 --- /dev/null +++ b/morpheus/modules/output/write_to_vector_db.py @@ -0,0 +1,224 @@ +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import pickle + +from dataclasses import dataclass +from morpheus.messages import MultiMessage +from morpheus.messages import MultiResponseMessage +from morpheus.messages import ControlMessage +from morpheus.service.vdb.utils import VectorDBServiceFactory +from morpheus.service.vdb.vector_db_service import VectorDBService +import logging +import mrc +import cudf +from mrc.core import operators as ops +from morpheus.utils.module_utils import ModuleInterface +from morpheus.utils.module_utils import register_module +from morpheus.utils.module_ids import WRITE_TO_VECTOR_DB +from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE +from pydantic import BaseModel +from pydantic import Field +from pydantic import ValidationError +from pydantic import validator + +logger = logging.getLogger(__name__) + + +@dataclass +class AccumulationStats: + msg_count: int + last_insert_time: float + data: list[cudf.DataFrame] + + +class WriteToVDBParamContract(BaseModel): + embedding_column_name: str = "embedding" + recreate: bool = False + service: str = Field(default_factory=None) + is_service_serialized: bool = False + resource_name: str = Field(default_factory=None) + resource_kwargs: dict = Field(default_factory=dict) + service_kwargs: dict = Field(default_factory=dict) + batch_size: int = 1024 + write_time_interval: float = 3.0 + + @validator('service', pre=True) + def validate_service(cls, v): + if not v: + raise ValueError("Service must be a service name or a serialized instance of VectorDBService") + return v + + @validator('resource_name', pre=True) + def validate_resource_name(cls, v): + if not v: + raise ValueError("Resource name must not be None or Empty.") + return v + + + +@register_module(WRITE_TO_VECTOR_DB, MORPHEUS_MODULE_NAMESPACE) +def _write_to_vector_db(builder: mrc.Builder): + """ + Deserializes incoming messages into either MultiMessage or ControlMessage format. + + Parameters + ---------- + builder : mrc.Builder + The Morpheus builder instance to attach this module to. + + Notes + ----- + The `module_config` should contain: + - 'embedding_column_name': str, the name of the column containing embeddings (default is "embedding"). + - 'recreate': bool, whether to recreate the resource if it already exists (default is False). + - 'service': str, the name of the service or a serialized instance of VectorDBService. + - 'is_service_serialized': bool, whether the provided service is serialized (default is False). + - 'resource_name': str, the name of the collection resource (must not be None or empty). + - 'resource_kwargs': dict, additional keyword arguments for resource creation. + - 'service_kwargs': dict, additional keyword arguments for VectorDBService creation. + - 'batch_size': int, accumulates messages until reaching the specified batch size for writing to VDB. + - 'write_time_interval': float, specifies the time interval (in seconds) for writing messages, or writing messages + when the accumulated batch size is reached. + + Raises + ------ + ValueError + If 'resource_name' is None or empty. + If 'service' is not provided or is not a valid service name or a serialized instance of VectorDBService. + """ + + module_config = builder.get_current_module_config() + + try: + write_to_vdb_config = WriteToVDBParamContract(**module_config) + except ValidationError as e: + # Format the error message for better readability + error_messages = '; '.join([f"{error['loc'][0]}: {error['msg']}" for error in e.errors()]) + log_error_message = f"Invalid configuration for write_to_vector_db: {error_messages}" + logger.error(log_error_message) + raise ValueError(log_error_message) + + embedding_column_name = write_to_vdb_config.embedding_column_name + recreate = write_to_vdb_config.recreate + service = write_to_vdb_config.service + is_service_serialized = write_to_vdb_config.is_service_serialized + resource_name = write_to_vdb_config.resource_name + resource_kwargs = write_to_vdb_config.resource_kwargs + service_kwargs = write_to_vdb_config.service_kwargs + batch_size = write_to_vdb_config.batch_size + write_time_interval = write_to_vdb_config.write_time_interval + + # Check if service is serialized and convert if needed + service: VectorDBService = ( + pickle.loads(bytes(service, "latin1")) + if is_service_serialized + else VectorDBServiceFactory.create_instance(service_name=service, **service_kwargs) + ) + + has_object = service.has_store_object(name=resource_name) + + if (recreate and has_object): + # Delete the existing resource + service.drop(name=resource_name) + has_object = False + + # Ensure that the resource exists + if (not has_object): + service.create(name=resource_name, **resource_kwargs) + + accumulator_dict = {resource_name: AccumulationStats(msg_count=0, last_insert_time=-1, data=[])} + + def on_completed(): + final_df_references = [] + + # Pushing remaining messages + for key, accum_stats in accumulator_dict.items(): + if accum_stats.data: + merged_df = cudf.concat(accum_stats.data) + service.insert_dataframe(name=key, df=merged_df) + final_df_references.append(accum_stats.data) + # Close vector database service connection + service.close() + + return final_df_references + + def extract_df(msg): + df = None + resrc_name = None + + if isinstance(msg, ControlMessage): + df = msg.payload().df + resrc_name = msg.get_metadata("resource_name") + elif isinstance(msg, MultiResponseMessage): + df = msg.get_meta() + if df is not None and not df.empty: + embeddings = msg.get_probs_tensor() + df[embedding_column_name] = embeddings.tolist() + elif isinstance(msg, MultiMessage): + df = msg.get_meta() + else: + raise RuntimeError(f"Unexpected message type '{type(msg)}' was encountered.") + + return df, resrc_name + + def on_data(msg): + try: + df, resrc_name = extract_df(msg) + + if df is not None and not df.empty: + final_df_references = [] + df_size = len(df) + current_time = time.time() + + # Use default resource name + if not resrc_name: + resrc_name = resource_name + if not service.has_store_object(resrc_name): + logger.error("Resource not exists in the vector database: %s", resource_name) + return final_df_references + + if resrc_name in accumulator_dict: + accumlator: AccumulationStats = accumulator_dict[resrc_name] + accumlator.msg_count += df_size + accumlator.data.append(df) + else: + accumulator_dict[resrc_name] = AccumulationStats(msg_count=df_size, last_insert_time=-1, data=[df]) + + for key, accum_stats in accumulator_dict.items(): + if accum_stats.msg_count >= batch_size or (accum_stats.last_insert_time != -1 and (current_time - accum_stats.last_insert_time) >= write_time_interval): + if accum_stats.data: + merged_df = cudf.concat(accum_stats.data) + service.insert_dataframe(name=key, df=merged_df, **resource_kwargs) + final_df_references.append(merged_df) + # Reset accumlator stats + accum_stats.data.clear() + accum_stats.last_insert_time = current_time + accum_stats.msg_count = 0 + + return final_df_references + + except Exception as exc: + logger.error("Unable to insert into collection: %s due to %s", resrc_name, exc) + + node = builder.make_node(WRITE_TO_VECTOR_DB, + ops.map(on_data), + ops.on_completed(on_completed)) + + builder.register_module_input("input", node) + builder.register_module_output("output", node) + + +WriteToVectorDB = ModuleInterface(WRITE_TO_VECTOR_DB, MORPHEUS_MODULE_NAMESPACE) diff --git a/morpheus/service/vdb/milvus_client.py b/morpheus/service/vdb/milvus_client.py index 7f2436eb46..3908919ea0 100644 --- a/morpheus/service/vdb/milvus_client.py +++ b/morpheus/service/vdb/milvus_client.py @@ -18,6 +18,24 @@ from pymilvus import MilvusClient as PyMilvusClient from pymilvus.orm.mutation import MutationResult +DATA_TYPE_MAP = { + "BOOL": 1, + "INT8": 2, + "INT16": 3, + "INT32": 4, + "INT64": 5, + "FLOAT": 10, + "DOUBLE": 11, + "STRING": 20, + "VARCHAR": 21, + "ARRAY": 22, + "JSON": 23, + "BINARY_VECTOR": 100, + "FLOAT_VECTOR": 101, + "FLOAT16_VECTOR": 102, + "BFLOAT16_VECTOR": 103, + "UNKNOWN": 999 +} def handle_exceptions(func_name: str, error_message: str) -> typing.Callable: """ diff --git a/morpheus/stages/output/write_to_vector_db_stage.py b/morpheus/stages/output/write_to_vector_db_stage.py index be4da973b1..e9b0d453e5 100644 --- a/morpheus/stages/output/write_to_vector_db_stage.py +++ b/morpheus/stages/output/write_to_vector_db_stage.py @@ -13,10 +13,12 @@ # limitations under the License. import logging +import pickle import typing +from morpheus.modules.output.write_to_vector_db import WriteToVectorDB import mrc -from mrc.core import operators as ops +from morpheus.utils.module_utils import ModuleDefinition from morpheus.config import Config from morpheus.messages import ControlMessage @@ -24,15 +26,11 @@ from morpheus.messages.multi_message import MultiMessage from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage -from morpheus.service.vdb.utils import VectorDBServiceFactory from morpheus.service.vdb.vector_db_service import VectorDBService logger = logging.getLogger(__name__) -# TODO(Bhargav): Add accumulator functionality and related config options -# TODO(Bhargav): Add support for dynamic 'collection' target check for CMs, such that if 'collection' is set we use it -# instead of the default collection name. class WriteToVectorDBStage(PassThruTypeMixin, SinglePortStage): """ Writes messages to a Vector Database. @@ -52,6 +50,11 @@ class WriteToVectorDBStage(PassThruTypeMixin, SinglePortStage): Specifies whether to recreate the resource if it already exists, by default False. resource_kwargs : dict, optional Additional keyword arguments to pass when performing vector database writes on a given resource. + batch_size : int + Accumulates messages until reaching the specified batch size for writing to VDB. + write_time_interval : float + Specifies the time interval (in seconds) for writing messages, or writing messages + when the accumulated batch size is reached. **service_kwargs : dict Additional keyword arguments to pass when creating a VectorDBService instance. @@ -68,38 +71,36 @@ def __init__(self, embedding_column_name: str = "embedding", recreate: bool = False, resource_kwargs: dict = None, + batch_size: int = 1024, + write_time_interval: float = 3.0, **service_kwargs): super().__init__(config) - self._resource_name = resource_name - self._embedding_column_name = embedding_column_name - self._recreate = recreate - self._resource_kwargs = resource_kwargs if resource_kwargs is not None else {} - - if isinstance(service, str): - # If service is a string, assume it's the service name - self._service: VectorDBService = VectorDBServiceFactory.create_instance(service_name=service, - **service_kwargs) - elif isinstance(service, VectorDBService): - # If service is an instance of VectorDBService, use it directly - self._service: VectorDBService = service - else: - raise ValueError("service must be a string (service name) or an instance of VectorDBService") - - has_object = self._service.has_store_object(name=self._resource_name) - - if (self._recreate and has_object): - # Delete the existing resource - self._service.drop(name=self._resource_name) - has_object = False - - # Ensure that the resource exists - if (not has_object): - self._service.create(name=self._resource_name, **self._resource_kwargs) - - # Get the service for just the resource we are interested in - self._resource_service = self._service.load_resource(name=self._resource_name) + resource_kwargs = resource_kwargs if resource_kwargs is not None else {} + is_service_serialized = False + if isinstance(service, VectorDBService): + service = str(pickle.dumps(service), encoding="latin1") + is_service_serialized = True + + module_config = { + "service": service, + "is_service_serialized": is_service_serialized, + "recreate": recreate, + "resource_name": resource_name, + "embedding_column_name": embedding_column_name, + "resource_kwargs": resource_kwargs, + "service_kwargs": service_kwargs, + "batch_size": batch_size, + "write_time_interval": write_time_interval + } + + module_name = f"write_to_vector_db__{resource_name}" + + if logger.isEnabledFor(logging.DEBUG): + logger.debug(f"Module will be loading with name: {module_name}") + + self._module_defination: ModuleDefinition = WriteToVectorDB.get_definition(module_name, module_config) @property def name(self) -> str: @@ -120,57 +121,15 @@ def accepted_types(self) -> typing.Tuple: def supports_cpp_node(self): """Indicates whether this stage supports a C++ node.""" return False - - def on_completed(self): - # Close vector database service connection - self._service.close() - + def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - def extract_df(msg): - df = None - - if isinstance(msg, ControlMessage): - df = msg.payload().df - # For control message, check if we have a collection tag - elif isinstance(msg, MultiResponseMessage): - df = msg.get_meta() - if df is not None and not df.empty: - embeddings = msg.get_probs_tensor() - df[self._embedding_column_name] = embeddings.tolist() - elif isinstance(msg, MultiMessage): - df = msg.get_meta() - else: - raise RuntimeError(f"Unexpected message type '{type(msg)}' was encountered.") - - return df # Return df, collection_tag or df, None - - def on_data(msg): - try: - df = extract_df(msg) - # df, collection_name = extract_df(msg) - # Call accumulator function, progress if we have enough data or our timeout has elapsed - # Need a different accumulator for each collection_name - - if df is not None and not df.empty: - result = self._service.insert_dataframe(name=self._resource_name, df=df, **self._resource_kwargs) - - if isinstance(msg, ControlMessage): - msg.set_metadata("insert_response", result) - - return msg - - except Exception as exc: - logger.error("Unable to insert into collection: %s due to %s", self._resource_name, exc) - - return None + module = self._module_defination.load(builder) - to_vector_db = builder.make_node(self.unique_name, - ops.map(on_data), - ops.filter(lambda x: x is not None), - ops.on_completed(self.on_completed)) + # Input and Output port names should be same as input and output port names of write_to_vector_db module. + mod_in_node = module.input_port("input") + mod_out_node = module.output_port("output") - builder.make_edge(input_node, to_vector_db) + builder.make_edge(input_node, mod_in_node) - # Return input unchanged to allow passthrough - return to_vector_db + return mod_out_node diff --git a/morpheus/utils/module_ids.py b/morpheus/utils/module_ids.py index 2626859871..0076fd1ca6 100644 --- a/morpheus/utils/module_ids.py +++ b/morpheus/utils/module_ids.py @@ -27,3 +27,4 @@ FILTER_CM_FAILED = "FilterCmFailed" PAYLOAD_BATCHER = "PayloadBatcher" WRITE_TO_ELASTICSEARCH = "WriteToElasticsearch" +WRITE_TO_VECTOR_DB = "WriteToVectorDB"