From ab7eaf4a1c3a74cf23767197eb3de945510bf33e Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 23 Apr 2024 14:23:04 -0700 Subject: [PATCH 01/27] First pass at a truncate strings method --- morpheus/io/utils.py | 59 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/morpheus/io/utils.py b/morpheus/io/utils.py index 7c4cfce260..7f1aab0835 100644 --- a/morpheus/io/utils.py +++ b/morpheus/io/utils.py @@ -14,6 +14,10 @@ # limitations under the License. """IO utilities.""" +import pandas as pd + +import cudf + from morpheus.utils.type_aliases import DataFrameType @@ -31,3 +35,58 @@ def filter_null_data(x: DataFrameType): return x return x[~x['data'].isna()] + + +def _cudf_needs_truncate(df: cudf.DataFrame, max_bytes: int) -> bool: + """ + Optimization, cudf contains a byte_count() method that pandas lacks. + """ + for col in df.columns: + series: cudf.Series = df[col] + if series.dtype == 'object': + if series.str.byte_count().max() > max_bytes: + return True + + return False + + +def truncate_string_cols_by_bytes(df: DataFrameType, max_bytes: int) -> DataFrameType: + """ + Truncates all string columns in a dataframe to a maximum number of bytes. + + If truncation is not needed, the original dataframe is returned. If `df` is a cudf.DataFrame, and truncating is + needed this function will convert to a pandas DataFrame to perform the truncation. + + Parameters + ---------- + df : DataFrameType + The dataframe to truncate. + max_bytes : int + The maximum number of bytes to truncate the strings to. + + Returns + ------- + DataFrameType + The truncated dataframe, if needed. + """ + + if isinstance(df, cudf.DataFrame): + # cudf specific optimization + if not _cudf_needs_truncate(df, max_bytes): + return df + + # If truncating is needed we need to convert to pandas to use the str.encode() method + df = df.to_pandas() + + for col in df.columns: + series: pd.Series = df[col] + if series.dtype == 'object': + encoded_series = series.str.encode(encoding='utf-8', errors='strict') + if encoded_series.str.len().max() > max_bytes: + sliced_series = encoded_series.str.slice(0, max_bytes) + + # There is a possibility that slicing by max_len will slice a multi-byte character in half setting + # errors='ignore' will cause the resulting string to be truncated after the last full character + df[col] = sliced_series.str.decode(encoding='utf-8', errors='ignore') + + return df From 2f14478a22ed0cc7ff34bc095129a49793c34bb6 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 23 Apr 2024 16:09:29 -0700 Subject: [PATCH 02/27] Tests for new truncate_string_cols_by_bytes method and the _cudf_needs_truncate helper --- tests/io/test_io_utils.py | 77 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100755 tests/io/test_io_utils.py diff --git a/tests/io/test_io_utils.py b/tests/io/test_io_utils.py new file mode 100755 index 0000000000..fce622f402 --- /dev/null +++ b/tests/io/test_io_utils.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections.abc import Callable + +import pandas as pd +import pytest + +import cudf + +from _utils.dataset_manager import DatasetManager +from morpheus.io import utils as io_utils +from morpheus.utils.type_aliases import DataFrameType + +MULTI_BYTE_STRINGS = ["ñäμɛ", "Moρφευσ", "río"] + + +def _mk_df(df_class: Callable[..., DataFrameType], data: list[str]) -> DataFrameType: + """ + Create a dataframe with a 'data' column containing the given data, and some other columns with different data types + """ + float_col = [] + int_col = [] + short_str_col = [] + for i in range(len(data)): + float_col.append(i) + int_col.append(i) + short_str_col.append(f"{i}"[0:3]) + + return df_class({"data": data, "float_col": float_col, "int_col": int_col, "short_str_col": short_str_col}) + + +@pytest.mark.parametrize("data, max_bytes, expected", + [(MULTI_BYTE_STRINGS[:], 8, True), (MULTI_BYTE_STRINGS[:], 12, False), + (MULTI_BYTE_STRINGS[:], 20, False), (["." * 20], 19, True), (["." * 20], 20, False), + (["." * 20], 21, False)]) +def test_cudf_needs_truncate(data: list[str], max_bytes: int, expected: bool): + df = _mk_df(cudf.DataFrame, data) + assert io_utils._cudf_needs_truncate(df, max_bytes) is expected + + +@pytest.mark.parametrize( + "data, max_bytes, expected_data", + [(MULTI_BYTE_STRINGS[:], 4, ["ñä", "Moρ", "río"]), (MULTI_BYTE_STRINGS[:], 5, ["ñä", "Moρ", "río"]), + (MULTI_BYTE_STRINGS[:], 8, ["ñäμɛ", "Moρφε", "río"]), (MULTI_BYTE_STRINGS[:], 9, ["ñäμɛ", "Moρφε", "río"]), + (MULTI_BYTE_STRINGS[:], 12, MULTI_BYTE_STRINGS[:]), (MULTI_BYTE_STRINGS[:], 20, MULTI_BYTE_STRINGS[:]), + (["." * 20], 19, ["." * 19]), (["." * 20], 20, ["." * 20]), (["." * 20], 21, ["." * 20])]) +def test_truncate_string_cols_by_bytes(dataset: DatasetManager, + data: list[str], + max_bytes: int, + expected_data: list[str]): + input_df = _mk_df(dataset.df_class, data) + + if data == expected_data: + expected_df_class = dataset.df_class + else: + expected_df_class = pd.DataFrame + + expected_df = _mk_df(expected_df_class, expected_data) + + result_df = io_utils.truncate_string_cols_by_bytes(input_df, max_bytes) + + assert isinstance(result_df, expected_df_class) + dataset.assert_df_equal(result_df, expected_df) From 83ad8ad99851f4080c115e47b5ec2738d402bfdb Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 23 Apr 2024 16:32:39 -0700 Subject: [PATCH 03/27] Log a warning when we truncate a column --- morpheus/io/utils.py | 9 ++++++++- tests/io/test_io_utils.py | 6 ++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/morpheus/io/utils.py b/morpheus/io/utils.py index 7f1aab0835..410e0a4a8e 100644 --- a/morpheus/io/utils.py +++ b/morpheus/io/utils.py @@ -14,12 +14,16 @@ # limitations under the License. """IO utilities.""" +import logging + import pandas as pd import cudf from morpheus.utils.type_aliases import DataFrameType +logger = logging.getLogger(__name__) + def filter_null_data(x: DataFrameType): """ @@ -50,7 +54,7 @@ def _cudf_needs_truncate(df: cudf.DataFrame, max_bytes: int) -> bool: return False -def truncate_string_cols_by_bytes(df: DataFrameType, max_bytes: int) -> DataFrameType: +def truncate_string_cols_by_bytes(df: DataFrameType, max_bytes: int, warn_on_truncate: bool = True) -> DataFrameType: """ Truncates all string columns in a dataframe to a maximum number of bytes. @@ -83,6 +87,9 @@ def truncate_string_cols_by_bytes(df: DataFrameType, max_bytes: int) -> DataFram if series.dtype == 'object': encoded_series = series.str.encode(encoding='utf-8', errors='strict') if encoded_series.str.len().max() > max_bytes: + if warn_on_truncate: + logger.warning("Truncating column '%s' to %d bytes", col, max_bytes) + sliced_series = encoded_series.str.slice(0, max_bytes) # There is a possibility that slicing by max_len will slice a multi-byte character in half setting diff --git a/tests/io/test_io_utils.py b/tests/io/test_io_utils.py index fce622f402..06fac72816 100755 --- a/tests/io/test_io_utils.py +++ b/tests/io/test_io_utils.py @@ -52,6 +52,7 @@ def test_cudf_needs_truncate(data: list[str], max_bytes: int, expected: bool): assert io_utils._cudf_needs_truncate(df, max_bytes) is expected +@pytest.mark.parametrize("warn_on_truncate", [True, False]) @pytest.mark.parametrize( "data, max_bytes, expected_data", [(MULTI_BYTE_STRINGS[:], 4, ["ñä", "Moρ", "río"]), (MULTI_BYTE_STRINGS[:], 5, ["ñä", "Moρ", "río"]), @@ -61,7 +62,8 @@ def test_cudf_needs_truncate(data: list[str], max_bytes: int, expected: bool): def test_truncate_string_cols_by_bytes(dataset: DatasetManager, data: list[str], max_bytes: int, - expected_data: list[str]): + expected_data: list[str], + warn_on_truncate: bool): input_df = _mk_df(dataset.df_class, data) if data == expected_data: @@ -71,7 +73,7 @@ def test_truncate_string_cols_by_bytes(dataset: DatasetManager, expected_df = _mk_df(expected_df_class, expected_data) - result_df = io_utils.truncate_string_cols_by_bytes(input_df, max_bytes) + result_df = io_utils.truncate_string_cols_by_bytes(input_df, max_bytes, warn_on_truncate=warn_on_truncate) assert isinstance(result_df, expected_df_class) dataset.assert_df_equal(result_df, expected_df) From f91d9ac454339f73b913ee848b3695018d5f7544 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 09:03:17 -0700 Subject: [PATCH 04/27] Refactor such that each column has it's own max length --- morpheus/io/utils.py | 24 ++++++++++++---------- tests/io/test_io_utils.py | 42 ++++++++++++++++++++++++++++++--------- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/morpheus/io/utils.py b/morpheus/io/utils.py index 410e0a4a8e..f5793bd32b 100644 --- a/morpheus/io/utils.py +++ b/morpheus/io/utils.py @@ -41,20 +41,24 @@ def filter_null_data(x: DataFrameType): return x[~x['data'].isna()] -def _cudf_needs_truncate(df: cudf.DataFrame, max_bytes: int) -> bool: +def _cudf_needs_truncate(df: cudf.DataFrame, column_max_bytes: dict[str, int]) -> bool: """ Optimization, cudf contains a byte_count() method that pandas lacks. """ - for col in df.columns: + for (col, max_bytes) in column_max_bytes.items(): series: cudf.Series = df[col] - if series.dtype == 'object': - if series.str.byte_count().max() > max_bytes: - return True + + assert series.dtype == 'object' + + if series.str.byte_count().max() > max_bytes: + return True return False -def truncate_string_cols_by_bytes(df: DataFrameType, max_bytes: int, warn_on_truncate: bool = True) -> DataFrameType: +def truncate_string_cols_by_bytes(df: DataFrameType, + column_max_bytes: dict[str, int], + warn_on_truncate: bool = True) -> DataFrameType: """ Truncates all string columns in a dataframe to a maximum number of bytes. @@ -65,8 +69,8 @@ def truncate_string_cols_by_bytes(df: DataFrameType, max_bytes: int, warn_on_tru ---------- df : DataFrameType The dataframe to truncate. - max_bytes : int - The maximum number of bytes to truncate the strings to. + column_max_bytes: dict[str, int] + A mapping of string column names to the maximum number of bytes for each column. Returns ------- @@ -76,13 +80,13 @@ def truncate_string_cols_by_bytes(df: DataFrameType, max_bytes: int, warn_on_tru if isinstance(df, cudf.DataFrame): # cudf specific optimization - if not _cudf_needs_truncate(df, max_bytes): + if not _cudf_needs_truncate(df, column_max_bytes): return df # If truncating is needed we need to convert to pandas to use the str.encode() method df = df.to_pandas() - for col in df.columns: + for (col, max_bytes) in column_max_bytes.items(): series: pd.Series = df[col] if series.dtype == 'object': encoded_series = series.str.encode(encoding='utf-8', errors='strict') diff --git a/tests/io/test_io_utils.py b/tests/io/test_io_utils.py index 06fac72816..c1b3bf4003 100755 --- a/tests/io/test_io_utils.py +++ b/tests/io/test_io_utils.py @@ -44,21 +44,45 @@ def _mk_df(df_class: Callable[..., DataFrameType], data: list[str]) -> DataFrame @pytest.mark.parametrize("data, max_bytes, expected", - [(MULTI_BYTE_STRINGS[:], 8, True), (MULTI_BYTE_STRINGS[:], 12, False), - (MULTI_BYTE_STRINGS[:], 20, False), (["." * 20], 19, True), (["." * 20], 20, False), - (["." * 20], 21, False)]) + [(MULTI_BYTE_STRINGS[:], { + "data": 8 + }, True), (MULTI_BYTE_STRINGS[:], { + "data": 12 + }, False), (MULTI_BYTE_STRINGS[:], { + "data": 20 + }, False), (["." * 20], { + "data": 19 + }, True), (["." * 20], { + "data": 20 + }, False), (["." * 20], { + "data": 21 + }, False)]) def test_cudf_needs_truncate(data: list[str], max_bytes: int, expected: bool): df = _mk_df(cudf.DataFrame, data) assert io_utils._cudf_needs_truncate(df, max_bytes) is expected @pytest.mark.parametrize("warn_on_truncate", [True, False]) -@pytest.mark.parametrize( - "data, max_bytes, expected_data", - [(MULTI_BYTE_STRINGS[:], 4, ["ñä", "Moρ", "río"]), (MULTI_BYTE_STRINGS[:], 5, ["ñä", "Moρ", "río"]), - (MULTI_BYTE_STRINGS[:], 8, ["ñäμɛ", "Moρφε", "río"]), (MULTI_BYTE_STRINGS[:], 9, ["ñäμɛ", "Moρφε", "río"]), - (MULTI_BYTE_STRINGS[:], 12, MULTI_BYTE_STRINGS[:]), (MULTI_BYTE_STRINGS[:], 20, MULTI_BYTE_STRINGS[:]), - (["." * 20], 19, ["." * 19]), (["." * 20], 20, ["." * 20]), (["." * 20], 21, ["." * 20])]) +@pytest.mark.parametrize("data, max_bytes, expected_data", + [(MULTI_BYTE_STRINGS[:], { + "data": 4 + }, ["ñä", "Moρ", "río"]), (MULTI_BYTE_STRINGS[:], { + "data": 5 + }, ["ñä", "Moρ", "río"]), (MULTI_BYTE_STRINGS[:], { + "data": 8 + }, ["ñäμɛ", "Moρφε", "río"]), (MULTI_BYTE_STRINGS[:], { + "data": 9 + }, ["ñäμɛ", "Moρφε", "río"]), (MULTI_BYTE_STRINGS[:], { + "data": 12 + }, MULTI_BYTE_STRINGS[:]), (MULTI_BYTE_STRINGS[:], { + "data": 20 + }, MULTI_BYTE_STRINGS[:]), (["." * 20], { + "data": 19 + }, ["." * 19]), (["." * 20], { + "data": 20 + }, ["." * 20]), (["." * 20], { + "data": 21 + }, ["." * 20])]) def test_truncate_string_cols_by_bytes(dataset: DatasetManager, data: list[str], max_bytes: int, From 10a68f92567c965bdaa1098203447132f4e66df9 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 09:29:36 -0700 Subject: [PATCH 05/27] Expand tests --- tests/io/test_io_utils.py | 116 ++++++++++++++++++++++++-------------- 1 file changed, 74 insertions(+), 42 deletions(-) diff --git a/tests/io/test_io_utils.py b/tests/io/test_io_utils.py index c1b3bf4003..0ec1c2c6ac 100755 --- a/tests/io/test_io_utils.py +++ b/tests/io/test_io_utils.py @@ -25,68 +25,100 @@ from morpheus.io import utils as io_utils from morpheus.utils.type_aliases import DataFrameType -MULTI_BYTE_STRINGS = ["ñäμɛ", "Moρφευσ", "río"] +MULTI_BYTE_STRINGS = ["ñäμɛ", "Moρφευσ", "taç"] -def _mk_df(df_class: Callable[..., DataFrameType], data: list[str]) -> DataFrameType: +def _mk_df(df_class: Callable[..., DataFrameType], data: dict[str, list[str]]) -> DataFrameType: """ Create a dataframe with a 'data' column containing the given data, and some other columns with different data types """ + num_rows = len(data[list(data.keys())[0]]) + float_col = [] int_col = [] short_str_col = [] - for i in range(len(data)): + for i in range(num_rows): float_col.append(i) int_col.append(i) short_str_col.append(f"{i}"[0:3]) - return df_class({"data": data, "float_col": float_col, "int_col": int_col, "short_str_col": short_str_col}) - - -@pytest.mark.parametrize("data, max_bytes, expected", - [(MULTI_BYTE_STRINGS[:], { - "data": 8 - }, True), (MULTI_BYTE_STRINGS[:], { - "data": 12 - }, False), (MULTI_BYTE_STRINGS[:], { - "data": 20 - }, False), (["." * 20], { - "data": 19 - }, True), (["." * 20], { - "data": 20 - }, False), (["." * 20], { - "data": 21 - }, False)]) + df_data = data.copy() + df_data.update({"float_col": float_col, "int_col": int_col, "short_str_col": short_str_col}) + + return df_class(df_data) + + +@pytest.mark.parametrize( + "data, max_bytes, expected", + [({ + "data": MULTI_BYTE_STRINGS[:] + }, { + "data": 8 + }, True), ({ + "data": MULTI_BYTE_STRINGS[:], "ignored_col": ["a" * 20, "b" * 20, "c" * 20] + }, { + "data": 12 + }, False), ({ + "data": MULTI_BYTE_STRINGS[:] + }, { + "data": 20 + }, False), ({ + "data": ["." * 20] + }, { + "data": 19 + }, True), ({ + "data": ["." * 20] + }, { + "data": 20 + }, False), ({ + "data": ["." * 20] + }, { + "data": 21 + }, False)]) def test_cudf_needs_truncate(data: list[str], max_bytes: int, expected: bool): df = _mk_df(cudf.DataFrame, data) assert io_utils._cudf_needs_truncate(df, max_bytes) is expected @pytest.mark.parametrize("warn_on_truncate", [True, False]) -@pytest.mark.parametrize("data, max_bytes, expected_data", - [(MULTI_BYTE_STRINGS[:], { - "data": 4 - }, ["ñä", "Moρ", "río"]), (MULTI_BYTE_STRINGS[:], { - "data": 5 - }, ["ñä", "Moρ", "río"]), (MULTI_BYTE_STRINGS[:], { - "data": 8 - }, ["ñäμɛ", "Moρφε", "río"]), (MULTI_BYTE_STRINGS[:], { - "data": 9 - }, ["ñäμɛ", "Moρφε", "río"]), (MULTI_BYTE_STRINGS[:], { - "data": 12 - }, MULTI_BYTE_STRINGS[:]), (MULTI_BYTE_STRINGS[:], { - "data": 20 - }, MULTI_BYTE_STRINGS[:]), (["." * 20], { - "data": 19 - }, ["." * 19]), (["." * 20], { - "data": 20 - }, ["." * 20]), (["." * 20], { - "data": 21 - }, ["." * 20])]) +@pytest.mark.parametrize( + "data, max_bytes, expected_data", + [({ + "multibyte_strings": MULTI_BYTE_STRINGS[:], "ascii_strings": ["a" * 20, "b" * 21, "c" * 19] + }, { + "multibyte_strings": 4, "ascii_strings": 20 + }, { + "multibyte_strings": ["ñä", "Moρ", "taç"], "ascii_strings": ["a" * 20, "b" * 20, "c" * 19] + }), + ({ + "data": MULTI_BYTE_STRINGS[:], "ignored_col": ["a" * 20, "b" * 20, "c" * 20] + }, { + "data": 5 + }, { + "data": ["ñä", "Moρ", "taç"], "ignored_col": ["a" * 20, "b" * 20, "c" * 20] + }), ({ + "data": MULTI_BYTE_STRINGS[:] + }, { + "data": 8 + }, { + "data": ["ñäμɛ", "Moρφε", "taç"] + }), ({ + "data": MULTI_BYTE_STRINGS[:] + }, { + "data": 9 + }, { + "data": ["ñäμɛ", "Moρφε", "taç"] + }), ({ + "data": MULTI_BYTE_STRINGS[:] + }, { + "data": 12 + }, { + "data": MULTI_BYTE_STRINGS[:] + })]) def test_truncate_string_cols_by_bytes(dataset: DatasetManager, - data: list[str], + data: dict[str, list[str]], max_bytes: int, - expected_data: list[str], + expected_data: dict[str, list[str]], warn_on_truncate: bool): input_df = _mk_df(dataset.df_class, data) From c80fc27d5184c5b96cf5cac7c1192ac0d7f991ef Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 09:52:20 -0700 Subject: [PATCH 06/27] Add in truncating of long string fields. In insert_dataframe, perform pandas conversion as late as possible to perform as many operations in cudf as possible, also only pay the cost of converting the columns we need to pandas. --- .../service/vdb/milvus_vector_db_service.py | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 37cd82d1ba..265aad1c49 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -24,6 +24,7 @@ import cudf +from morpheus.io.utils import truncate_string_cols_by_bytes from morpheus.service.vdb.vector_db_service import VectorDBResourceService from morpheus.service.vdb.vector_db_service import VectorDBService @@ -222,9 +223,11 @@ class MilvusVectorDBResourceService(VectorDBResourceService): Name of the resource. client : MilvusClient An instance of the MilvusClient for interaction with the Milvus Vector Database. + truncate_long_strings : bool, optional + When true, truncate strings values that are longer than the max length of the field """ - def __init__(self, name: str, client: "MilvusClient") -> None: + def __init__(self, name: str, client: "MilvusClient", truncate_long_strings: bool = False) -> None: if IMPORT_EXCEPTION is not None: raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION @@ -239,13 +242,22 @@ def __init__(self, name: str, client: "MilvusClient") -> None: self._vector_field = None self._fillna_fields_dict = {} + # Mapping of field name to max length for string fields + self._fields_max_length: dict[str, int] = {} + for field in self._fields: if field.dtype == pymilvus.DataType.FLOAT_VECTOR: self._vector_field = field.name else: + if field.dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): + max_length = field.params.get('max_length') + if max_length is not None: + self._fields_max_length[field.name] = max_length if not field.auto_id: self._fillna_fields_dict[field.name] = field.dtype + self._truncate_long_strings = truncate_long_strings + self._collection.load() def _set_up_collection(self): @@ -291,10 +303,6 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa dict Returns response content as a dictionary. """ - - if isinstance(df, cudf.DataFrame): - df = df.to_pandas() - # Ensure that there are no None values in the DataFrame entries. for field_name, dtype in self._fillna_fields_dict.items(): if dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): @@ -311,11 +319,18 @@ def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwa else: logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype) + if self._truncate_long_strings: + df = truncate_string_cols_by_bytes(df, self._fields_max_length, warn_on_truncate=True) + # From the schema, this is the list of columns we need, excluding any auto_id columns column_names = [field.name for field in self._fields if not field.auto_id] + collection_df = df[column_names] + if isinstance(collection_df, cudf.DataFrame): + collection_df = collection_df.to_pandas() + # Note: dataframe columns has to be in the order of collection schema fields.s - result = self._collection.insert(data=df[column_names], **kwargs) + result = self._collection.insert(data=collection_df, **kwargs) self._collection.flush() return self._insert_result_to_dict(result=result) From 4b771e8b8cb8a0744ec3fe6d957b4743be86a5b6 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 09:56:02 -0700 Subject: [PATCH 07/27] Use DataFrameType alias --- morpheus/service/vdb/milvus_vector_db_service.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 265aad1c49..1f274b4c65 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -27,6 +27,7 @@ from morpheus.io.utils import truncate_string_cols_by_bytes from morpheus.service.vdb.vector_db_service import VectorDBResourceService from morpheus.service.vdb.vector_db_service import VectorDBService +from morpheus.utils.type_aliases import DataFrameType logger = logging.getLogger(__name__) @@ -287,13 +288,13 @@ def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) return self._insert_result_to_dict(result=result) - def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict: + def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict: """ Insert a dataframe entires into the vector database. Parameters ---------- - df : typing.Union[cudf.DataFrame, pd.DataFrame] + df : DataFrameType Dataframe to be inserted into the collection. **kwargs : dict[str, typing.Any] Extra keyword arguments specific to the vector database implementation. @@ -703,7 +704,7 @@ def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing. for part in partition_conf["partitions"]: self._client.create_partition(collection_name=name, partition_name=part["name"], timeout=timeout) - def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) -> list[dict]: + def _build_schema_conf(self, df: DataFrameType) -> list[dict]: fields = [] # Always add a primary key @@ -741,7 +742,7 @@ def _build_schema_conf(self, df: typing.Union[cudf.DataFrame, pd.DataFrame]) -> def create_from_dataframe(self, name: str, - df: typing.Union[cudf.DataFrame, pd.DataFrame], + df: DataFrameType, overwrite: bool = False, **kwargs: dict[str, typing.Any]) -> None: """ @@ -812,10 +813,7 @@ def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, return resource.insert(data, **kwargs) @with_collection_lock - def insert_dataframe(self, - name: str, - df: typing.Union[cudf.DataFrame, pd.DataFrame], - **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + def insert_dataframe(self, name: str, df: DataFrameType, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: """ Converts dataframe to rows and insert to a collection in the Milvus vector database. @@ -823,7 +821,7 @@ def insert_dataframe(self, ---------- name : str Name of the collection to be inserted. - df : typing.Union[cudf.DataFrame, pd.DataFrame] + df : DataFrameType Dataframe to be inserted in the collection. **kwargs : dict[str, typing.Any] Additional keyword arguments containing collection configuration. From 3b6408ed8cd23ad5cc392ee8c3c71f1cbc933646 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 09:56:56 -0700 Subject: [PATCH 08/27] Cleanup --- morpheus/service/vdb/milvus_vector_db_service.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 1f274b4c65..2802632141 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -20,8 +20,6 @@ import typing from functools import wraps -import pandas as pd - import cudf from morpheus.io.utils import truncate_string_cols_by_bytes @@ -752,7 +750,7 @@ def create_from_dataframe(self, ---------- name : str Name of the collection. - df : Union[cudf.DataFrame, pd.DataFrame] + df : DataFrameType The dataframe to create the collection from. overwrite : bool, optional Whether to overwrite the collection if it already exists. Default is False. From 9c5435c9b029f01058bf09273682e7f6aef57730 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 10:02:32 -0700 Subject: [PATCH 09/27] Exclude string type from max_length checking --- morpheus/service/vdb/milvus_vector_db_service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 2802632141..b9797ce8f0 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -248,7 +248,9 @@ def __init__(self, name: str, client: "MilvusClient", truncate_long_strings: boo if field.dtype == pymilvus.DataType.FLOAT_VECTOR: self._vector_field = field.name else: - if field.dtype in (pymilvus.DataType.VARCHAR, pymilvus.DataType.STRING): + # Intentionally excluding pymilvus.DataType.STRING, in our current version it isn't supported, and in + # some database systems string types don't have a max length. + if field.dtype == pymilvus.DataType.VARCHAR: max_length = field.params.get('max_length') if max_length is not None: self._fields_max_length[field.name] = max_length From dbe34dcc0e4eaa05d55eefd0994b241db8a334cf Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 11:37:46 -0700 Subject: [PATCH 10/27] Ensure truncate_long_strings parameter is set in configs and passed along to MilvusVectorDBResourceService --- examples/llm/vdb_upload/vdb_utils.py | 3 ++- morpheus/service/vdb/milvus_vector_db_service.py | 8 ++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index d3aed615d7..7740acbc7c 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -315,14 +315,15 @@ def build_cli_configs(source_type, cli_vdb_conf = { # Vector db upload has some significant transaction overhead, batch size here should be as large as possible 'batch_size': 16384, - 'resource_name': vector_db_resource_name, 'embedding_size': embedding_size, 'recreate': True, + 'resource_name': vector_db_resource_name, 'resource_schemas': { vector_db_resource_name: build_defualt_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None, }, 'service': vector_db_service, + 'truncate_long_strings': True, 'uri': vector_db_uri, } diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index b9797ce8f0..1a1fd74be5 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -605,13 +605,17 @@ def __init__(self, password: str = "", db_name: str = "", token: str = "", + truncate_long_strings: bool = False, **kwargs: dict[str, typing.Any]): + self._truncate_long_strings = truncate_long_strings self._client = MilvusClient(uri=uri, user=user, password=password, db_name=db_name, token=token, **kwargs) def load_resource(self, name: str, **kwargs: dict[str, typing.Any]) -> MilvusVectorDBResourceService: - - return MilvusVectorDBResourceService(name=name, client=self._client, **kwargs) + return MilvusVectorDBResourceService(name=name, + client=self._client, + truncate_long_strings=self._truncate_long_strings, + **kwargs) def has_store_object(self, name: str) -> bool: """ From 9acea9e273a446ca726fee923f9dffc882f97a1c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 12:07:26 -0700 Subject: [PATCH 11/27] Add docstring for truncate_long_strings --- morpheus/service/vdb/milvus_vector_db_service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 1a1fd74be5..d69c5eec00 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -591,6 +591,8 @@ class MilvusVectorDBService(VectorDBService): The port number for connecting to the Milvus server. alias : str, optional Alias for the Milvus connection, by default "default". + truncate_long_strings : bool, optional + When true, truncate strings values that are longer than the max length of the field **kwargs : dict Additional keyword arguments specific to the Milvus connection configuration. """ From 078cd195ee38879a6ac26e138ef98456834532cb Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 24 Apr 2024 12:40:51 -0700 Subject: [PATCH 12/27] Add docstring for warn_on_truncate --- morpheus/io/utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/morpheus/io/utils.py b/morpheus/io/utils.py index f5793bd32b..dcea3beab6 100644 --- a/morpheus/io/utils.py +++ b/morpheus/io/utils.py @@ -71,6 +71,8 @@ def truncate_string_cols_by_bytes(df: DataFrameType, The dataframe to truncate. column_max_bytes: dict[str, int] A mapping of string column names to the maximum number of bytes for each column. + warn_on_truncate: bool, default True + Whether to log a warning when truncating a column. Returns ------- From 2b7231313ca702bbdd2a968a15f311218b0156d7 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 08:36:02 -0700 Subject: [PATCH 13/27] Add type-alias for Series type --- morpheus/utils/type_aliases.py | 1 + 1 file changed, 1 insertion(+) diff --git a/morpheus/utils/type_aliases.py b/morpheus/utils/type_aliases.py index f944c3f9cb..cd394664e6 100644 --- a/morpheus/utils/type_aliases.py +++ b/morpheus/utils/type_aliases.py @@ -20,3 +20,4 @@ import cudf DataFrameType = typing.Union[pd.DataFrame, cudf.DataFrame] +SeriesType = typing.Union[pd.Series, cudf.Series] From f26ce68cfc6dafadc28bac76e26f21dbde27c5fd Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 08:36:49 -0700 Subject: [PATCH 14/27] Refactor cudf_string_cols_exceed_max_bytes and truncate_string_cols_by_bytes per PR feedback [no ci] --- morpheus/io/utils.py | 78 +++++++++++++++++++++++++-------------- tests/io/test_io_utils.py | 24 ++++++------ 2 files changed, 63 insertions(+), 39 deletions(-) diff --git a/morpheus/io/utils.py b/morpheus/io/utils.py index dcea3beab6..d8b286a8e8 100644 --- a/morpheus/io/utils.py +++ b/morpheus/io/utils.py @@ -21,6 +21,7 @@ import cudf from morpheus.utils.type_aliases import DataFrameType +from morpheus.utils.type_aliases import SeriesType logger = logging.getLogger(__name__) @@ -41,10 +42,29 @@ def filter_null_data(x: DataFrameType): return x[~x['data'].isna()] -def _cudf_needs_truncate(df: cudf.DataFrame, column_max_bytes: dict[str, int]) -> bool: +def cudf_string_cols_exceed_max_bytes(df: cudf.DataFrame, column_max_bytes: dict[str, int]) -> bool: """ - Optimization, cudf contains a byte_count() method that pandas lacks. + Checks a cudf DataFrame for string columns that exceed a maximum number of bytes and thus need to be truncated by + calling `truncate_string_cols_by_bytes`. + + This method utilizes a cudf method `Series.str.byte_count()` method that pandas lacks, which can avoid a costly + call to truncate_string_cols_by_bytes. + + Parameters + ---------- + df : DataFrameType + The dataframe to check. + column_max_bytes: dict[str, int] + A mapping of string column names to the maximum number of bytes for each column. + + Returns + ------- + bool + True if truncation is needed, False otherwise. """ + if not isinstance(df, cudf.DataFrame): + raise ValueError("Expected cudf DataFrame") + for (col, max_bytes) in column_max_bytes.items(): series: cudf.Series = df[col] @@ -58,12 +78,10 @@ def _cudf_needs_truncate(df: cudf.DataFrame, column_max_bytes: dict[str, int]) - def truncate_string_cols_by_bytes(df: DataFrameType, column_max_bytes: dict[str, int], - warn_on_truncate: bool = True) -> DataFrameType: + warn_on_truncate: bool = True) -> bool: """ - Truncates all string columns in a dataframe to a maximum number of bytes. - - If truncation is not needed, the original dataframe is returned. If `df` is a cudf.DataFrame, and truncating is - needed this function will convert to a pandas DataFrame to perform the truncation. + Truncates all string columns in a dataframe to a maximum number of bytes. This operation is performed in-place on + the dataframe. Parameters ---------- @@ -76,30 +94,36 @@ def truncate_string_cols_by_bytes(df: DataFrameType, Returns ------- - DataFrameType - The truncated dataframe, if needed. + bool + True if truncation was performed, False otherwise. """ - if isinstance(df, cudf.DataFrame): - # cudf specific optimization - if not _cudf_needs_truncate(df, column_max_bytes): - return df - - # If truncating is needed we need to convert to pandas to use the str.encode() method - df = df.to_pandas() + performed_truncation = False + is_cudf = isinstance(df, cudf.DataFrame) for (col, max_bytes) in column_max_bytes.items(): - series: pd.Series = df[col] - if series.dtype == 'object': - encoded_series = series.str.encode(encoding='utf-8', errors='strict') - if encoded_series.str.len().max() > max_bytes: - if warn_on_truncate: - logger.warning("Truncating column '%s' to %d bytes", col, max_bytes) + series: SeriesType = df[col] + + if is_cudf: + series: pd.Series = series.to_pandas() + + assert series.dtype == 'object', f"Expected string column '{col}'" + + encoded_series = series.str.encode(encoding='utf-8', errors='strict') + if encoded_series.str.len().max() > max_bytes: + performed_truncation = True + if warn_on_truncate: + logger.warning("Truncating column '%s' to %d bytes", col, max_bytes) + + truncated_series = encoded_series.str.slice(0, max_bytes) - sliced_series = encoded_series.str.slice(0, max_bytes) + # There is a possibility that slicing by max_len will slice a multi-byte character in half setting + # errors='ignore' will cause the resulting string to be truncated after the last full character + decoded_series = truncated_series.str.decode(encoding='utf-8', errors='ignore') - # There is a possibility that slicing by max_len will slice a multi-byte character in half setting - # errors='ignore' will cause the resulting string to be truncated after the last full character - df[col] = sliced_series.str.decode(encoding='utf-8', errors='ignore') + if is_cudf: + df[col] = cudf.Series.from_pandas(decoded_series) + else: + df[col] = decoded_series - return df + return performed_truncation diff --git a/tests/io/test_io_utils.py b/tests/io/test_io_utils.py index 0ec1c2c6ac..8246a7f680 100755 --- a/tests/io/test_io_utils.py +++ b/tests/io/test_io_utils.py @@ -25,7 +25,7 @@ from morpheus.io import utils as io_utils from morpheus.utils.type_aliases import DataFrameType -MULTI_BYTE_STRINGS = ["ñäμɛ", "Moρφευσ", "taç"] +MULTI_BYTE_STRINGS = ["ñäμɛ", "Moρφέας", "taç"] def _mk_df(df_class: Callable[..., DataFrameType], data: dict[str, list[str]]) -> DataFrameType: @@ -77,7 +77,7 @@ def _mk_df(df_class: Callable[..., DataFrameType], data: dict[str, list[str]]) - }, False)]) def test_cudf_needs_truncate(data: list[str], max_bytes: int, expected: bool): df = _mk_df(cudf.DataFrame, data) - assert io_utils._cudf_needs_truncate(df, max_bytes) is expected + assert io_utils.cudf_string_cols_exceed_max_bytes(df, max_bytes) is expected @pytest.mark.parametrize("warn_on_truncate", [True, False]) @@ -101,13 +101,13 @@ def test_cudf_needs_truncate(data: list[str], max_bytes: int, expected: bool): }, { "data": 8 }, { - "data": ["ñäμɛ", "Moρφε", "taç"] + "data": ["ñäμɛ", "Moρφέ", "taç"] }), ({ "data": MULTI_BYTE_STRINGS[:] }, { "data": 9 }, { - "data": ["ñäμɛ", "Moρφε", "taç"] + "data": ["ñäμɛ", "Moρφέ", "taç"] }), ({ "data": MULTI_BYTE_STRINGS[:] }, { @@ -120,16 +120,16 @@ def test_truncate_string_cols_by_bytes(dataset: DatasetManager, max_bytes: int, expected_data: dict[str, list[str]], warn_on_truncate: bool): - input_df = _mk_df(dataset.df_class, data) + df = _mk_df(dataset.df_class, data) - if data == expected_data: - expected_df_class = dataset.df_class - else: - expected_df_class = pd.DataFrame + expect_truncation = (data != expected_data) + expected_df_class = dataset.df_class expected_df = _mk_df(expected_df_class, expected_data) - result_df = io_utils.truncate_string_cols_by_bytes(input_df, max_bytes, warn_on_truncate=warn_on_truncate) + performed_truncation = io_utils.truncate_string_cols_by_bytes(df, max_bytes, warn_on_truncate=warn_on_truncate) - assert isinstance(result_df, expected_df_class) - dataset.assert_df_equal(result_df, expected_df) + assert performed_truncation is expect_truncation + assert isinstance(df, expected_df_class) + + dataset.assert_df_equal(df, expected_df) From 4a6de1bb50cb087810e465d804674719fe7f20b7 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 08:55:18 -0700 Subject: [PATCH 15/27] Refactor to call cudf_string_cols_exceed_max_bytes prior to converting to pandas, and call truncate_string_cols_by_bytes after converting to pandas [no ci] --- morpheus/service/vdb/milvus_vector_db_service.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index d69c5eec00..3d54daab18 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -22,6 +22,7 @@ import cudf +from morpheus.io.utils import cudf_string_cols_exceed_max_bytes from morpheus.io.utils import truncate_string_cols_by_bytes from morpheus.service.vdb.vector_db_service import VectorDBResourceService from morpheus.service.vdb.vector_db_service import VectorDBService @@ -320,8 +321,11 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - else: logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype) - if self._truncate_long_strings: - df = truncate_string_cols_by_bytes(df, self._fields_max_length, warn_on_truncate=True) + needs_truncate = self._truncate_long_strings + if isinstance(df, cudf.DataFrame): + # Cudf specific optimization, we can avoid a costly call to truncate_string_cols_by_bytes if all of the + # string columns are already below the max length + needs_truncate = cudf_string_cols_exceed_max_bytes(df, self._fields_max_length) # From the schema, this is the list of columns we need, excluding any auto_id columns column_names = [field.name for field in self._fields if not field.auto_id] @@ -330,6 +334,9 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - if isinstance(collection_df, cudf.DataFrame): collection_df = collection_df.to_pandas() + if needs_truncate: + truncate_string_cols_by_bytes(collection_df, self._fields_max_length, warn_on_truncate=True) + # Note: dataframe columns has to be in the order of collection schema fields.s result = self._collection.insert(data=collection_df, **kwargs) self._collection.flush() From 5f3804f140d71f5372ecd65ddc94396172673166 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 10:48:50 -0700 Subject: [PATCH 16/27] Test WIP [no ci] --- .../service/vdb/milvus_vector_db_service.py | 7 +- tests/conftest.py | 6 + tests/test_milvus_vector_db_service.py | 116 ++++++++++++++++++ .../milvus_string_collection_conf.json | 3 + 4 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 tests/tests_data/service/milvus_string_collection_conf.json diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index 3d54daab18..bada656d95 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -33,6 +33,11 @@ IMPORT_EXCEPTION = None IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed." +# Milvus has a max string length in bytes of 65,535. Multi-byte characters like "ñ" will have a string length of 1, the +# byte length encoded as UTF-8 will be 2 +# https://milvus.io/docs/limitations.md#Length-of-a-string +MAX_STRING_LENGTH_BYTES = 65_535 + try: import pymilvus from pymilvus.orm.mutation import MutationResult @@ -737,7 +742,7 @@ def _build_schema_conf(self, df: DataFrameType) -> list[dict]: } if (field_dict["dtype"] == pymilvus.DataType.VARCHAR): - field_dict["max_length"] = 65_535 + field_dict["max_length"] = MAX_STRING_LENGTH_BYTES if (field_dict["dtype"] == pymilvus.DataType.FLOAT_VECTOR or field_dict["dtype"] == pymilvus.DataType.BINARY_VECTOR): diff --git a/tests/conftest.py b/tests/conftest.py index 1f8f0ef425..30cc8f869d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1035,6 +1035,12 @@ def simple_collection_config_fixture(): yield load_json_file(filename="service/milvus_simple_collection_conf.json") +@pytest.fixture(scope="session", name="string_collection_config") +def string_collection_config_fixture(): + from _utils import load_json_file + yield load_json_file(filename="service/milvus_string_collection_conf.json") + + @pytest.fixture(name="nemollm", scope='session') def nemollm_fixture(fail_missing: bool): """ diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 723e7e7f8e..3621f3a22d 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -16,14 +16,18 @@ import json import random +import string import numpy as np import pymilvus import pytest from pymilvus import DataType +from pymilvus import MilvusException import cudf +from _utils.dataset_manager import DatasetManager +from morpheus.service.vdb.milvus_vector_db_service import MAX_STRING_LENGTH_BYTES from morpheus.service.vdb.milvus_vector_db_service import FieldSchemaEncoder from morpheus.service.vdb.milvus_vector_db_service import MilvusVectorDBService @@ -71,6 +75,38 @@ def sample_field_fixture(): return pymilvus.FieldSchema(name="test_field", dtype=pymilvus.DataType.INT64) +def _mk_long_string(source_chars: str) -> str: + """ + Yields a string longer than MAX_STRING_LENGTH_BYTES from source chars + """ + source_chars_byte_len = len(source_chars.encode("utf-8")) + source_data = list(source_chars) + + byte_len = 0 + long_str_data = [] + while byte_len <= MAX_STRING_LENGTH_BYTES: + long_str_data.extend(source_data) + byte_len += source_chars_byte_len + + return "".join(long_str_data) + + +@pytest.fixture(scope="module", name="long_ascii_string") +def long_ascii_string_fixture(): + """ + Yields a string longer than MAX_STRING_LENGTH_BYTES containing only ascii (single-byte) characters + """ + return _mk_long_string(string.ascii_letters) + + +@pytest.fixture(scope="module", name="long_multibyte_string") +def long_multibyte_string_fixture(): + """ + Yields a string longer than MAX_STRING_LENGTH_BYTES containing a mix of single and multi-byte characters + """ + return _mk_long_string("Moρφέας") + + @pytest.mark.milvus def test_create_and_drop_collection(idx_part_collection_config: dict, milvus_service: MilvusVectorDBService): collection_name = "test_collection" @@ -467,3 +503,83 @@ def test_fse_from_dict(): result = FieldSchemaEncoder.from_dict(data) assert result.name == "test_field" assert result.dtype == pymilvus.DataType.INT64 + + +@pytest.mark.milvus +@pytest.mark.parametrize("num_rows", [10, 100, 1000]) +@pytest.mark.parametrize("use_multi_byte_strings", [True, False], ids=["multi_byte", "ascii"]) +@pytest.mark.parametrize("truncate_long_strings", [True, False], ids=["truncate", "no_truncate"]) +@pytest.mark.parametrize("exceed_max_str_len", [True, False], ids=["exceed_max_len", "within_max_len"]) +def test_insert_dataframe(milvus_server_uri: str, + string_collection_config: dict, + num_rows: int, + dataset: DatasetManager, + use_multi_byte_strings: bool, + truncate_long_strings: bool, + exceed_max_str_len: bool, + long_ascii_string: str, + long_multibyte_string: str): + collection_name = "test_insert_dataframe" + + milvus_service = MilvusVectorDBService(uri=milvus_server_uri, truncate_long_strings=truncate_long_strings) + + # Make sure to drop any existing collection from previous runs. + milvus_service.drop(collection_name) + + # Create a collection. + milvus_service.create(collection_name, **string_collection_config) + + ids = [] + embedding_data = [] + long_str_col = [] + short_str_col = [] + for i in range(num_rows): + ids.append(i) + embedding_data.append([i / 10.0] * 3) + if use_multi_byte_strings: + long_str = long_multibyte_string + else: + long_str = long_ascii_string + + if exceed_max_str_len: + slice_end = len(long_str) + else: + slice_end = MAX_STRING_LENGTH_BYTES + + long_str_col.append(long_str[:slice_end]) + short_str_col.append(long_str[:7]) + + df = dataset.df_class({ + "id": ids, "embedding": embedding_data, "long_str_col": long_str_col, "short_str_col": short_str_col + }) + + expected_long_str = [] + for long_str in long_str_col: + if truncate_long_strings: + expected_long_str.append( + long_str.encode("utf-8")[:MAX_STRING_LENGTH_BYTES].decode("utf-8", errors="ignore")) + else: + expected_long_str.append(long_str) + + expected_df = dataset.df_class({ + "id": ids, "embedding": embedding_data, "long_str_col": expected_long_str, "short_str_col": short_str_col + }) + + if (exceed_max_str_len and (not truncate_long_strings)): + with pytest.raises(MilvusException, match="string exceeds max length"): + milvus_service.insert_dataframe(collection_name, df) + + return # Skip the rest of the test if the string column exceeds the maximum length. + else: + milvus_service.insert_dataframe(collection_name, df) + + # Retrieve inserted data by primary keys. + retrieved_data = milvus_service.retrieve_by_keys(collection_name, ids) + assert len(retrieved_data) == num_rows + + # Clean up the collection. + milvus_service.drop(collection_name) + + result_df = dataset.df_class(retrieved_data) + + dataset.compare_df(result_df, expected_df) diff --git a/tests/tests_data/service/milvus_string_collection_conf.json b/tests/tests_data/service/milvus_string_collection_conf.json new file mode 100644 index 0000000000..dd9c3360b3 --- /dev/null +++ b/tests/tests_data/service/milvus_string_collection_conf.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:3eb10ccec64cd704795c8c2b620eae3cc59d6ee05e1a8fc0a8cf0e6b28529e63 +size 1044 From 701e1f9c9f90d4862746d428bb3f61a8266d18ea Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 10:50:40 -0700 Subject: [PATCH 17/27] Fix bug where truncate was always enabled for cudf dataframes --- morpheus/service/vdb/milvus_vector_db_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/service/vdb/milvus_vector_db_service.py b/morpheus/service/vdb/milvus_vector_db_service.py index bada656d95..09c68f15cd 100644 --- a/morpheus/service/vdb/milvus_vector_db_service.py +++ b/morpheus/service/vdb/milvus_vector_db_service.py @@ -327,7 +327,7 @@ def insert_dataframe(self, df: DataFrameType, **kwargs: dict[str, typing.Any]) - logger.info("Skipped checking 'None' in the field: %s, with datatype: %s", field_name, dtype) needs_truncate = self._truncate_long_strings - if isinstance(df, cudf.DataFrame): + if needs_truncate and isinstance(df, cudf.DataFrame): # Cudf specific optimization, we can avoid a costly call to truncate_string_cols_by_bytes if all of the # string columns are already below the max length needs_truncate = cudf_string_cols_exceed_max_bytes(df, self._fields_max_length) From 38d31253ffa4ee4114da8e097f2f24b887d63d6f Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 11:20:36 -0700 Subject: [PATCH 18/27] Finish up tests --- tests/test_milvus_vector_db_service.py | 47 ++++++++++++++----- .../milvus_string_collection_conf.json | 4 +- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 3621f3a22d..429475685a 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -107,6 +107,13 @@ def long_multibyte_string_fixture(): return _mk_long_string("Moρφέας") +def _truncate_string_by_bytes(s: str, max_bytes: int) -> str: + """ + Truncates a string to the given number of bytes + """ + return s.encode("utf-8")[:max_bytes].decode("utf-8", errors="ignore") + + @pytest.mark.milvus def test_create_and_drop_collection(idx_part_collection_config: dict, milvus_service: MilvusVectorDBService): collection_name = "test_collection" @@ -506,7 +513,8 @@ def test_fse_from_dict(): @pytest.mark.milvus -@pytest.mark.parametrize("num_rows", [10, 100, 1000]) +@pytest.mark.slow +@pytest.mark.parametrize("num_rows", [10, 100]) @pytest.mark.parametrize("use_multi_byte_strings", [True, False], ids=["multi_byte", "ascii"]) @pytest.mark.parametrize("truncate_long_strings", [True, False], ids=["truncate", "no_truncate"]) @pytest.mark.parametrize("exceed_max_str_len", [True, False], ids=["exceed_max_len", "within_max_len"]) @@ -529,25 +537,40 @@ def test_insert_dataframe(milvus_server_uri: str, # Create a collection. milvus_service.create(collection_name, **string_collection_config) + short_str_col_len = -1 + long_str_col_len = -1 + for field_conf in string_collection_config["schema_conf"]["schema_fields"]: + if field_conf["name"] == "short_str_col": + short_str_col_len = field_conf["params"]["max_length"] + + elif field_conf["name"] == "long_str_col": + long_str_col_len = field_conf["params"]["max_length"] + + assert short_str_col_len > 0, "short_str_col length is not set" + assert long_str_col_len == MAX_STRING_LENGTH_BYTES, "long_str_col length is not set to MAX_STRING_LENGTH_BYTES" + + # Construct the dataframe. ids = [] embedding_data = [] long_str_col = [] short_str_col = [] + + if use_multi_byte_strings: + long_str = long_multibyte_string + else: + long_str = long_ascii_string + + short_str = long_str[:7] + if not exceed_max_str_len: + short_str = _truncate_string_by_bytes(short_str, short_str_col_len) + long_str = _truncate_string_by_bytes(long_str, MAX_STRING_LENGTH_BYTES) + for i in range(num_rows): ids.append(i) embedding_data.append([i / 10.0] * 3) - if use_multi_byte_strings: - long_str = long_multibyte_string - else: - long_str = long_ascii_string - - if exceed_max_str_len: - slice_end = len(long_str) - else: - slice_end = MAX_STRING_LENGTH_BYTES - long_str_col.append(long_str[:slice_end]) - short_str_col.append(long_str[:7]) + long_str_col.append(long_str) + short_str_col.append(short_str) df = dataset.df_class({ "id": ids, "embedding": embedding_data, "long_str_col": long_str_col, "short_str_col": short_str_col diff --git a/tests/tests_data/service/milvus_string_collection_conf.json b/tests/tests_data/service/milvus_string_collection_conf.json index dd9c3360b3..a75970a361 100644 --- a/tests/tests_data/service/milvus_string_collection_conf.json +++ b/tests/tests_data/service/milvus_string_collection_conf.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:3eb10ccec64cd704795c8c2b620eae3cc59d6ee05e1a8fc0a8cf0e6b28529e63 -size 1044 +oid sha256:adbc34ae22c1037c8308b5521a01597a81d0ea117cc691e72566b463c0be6e9a +size 1083 From a5f05e178eacc67fb6ba87598ceeb859109c18fa Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 11:24:44 -0700 Subject: [PATCH 19/27] Remove parametarization based on num_rows, not important for this test that is already slow --- tests/test_milvus_vector_db_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 429475685a..9c5d08672b 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -514,7 +514,6 @@ def test_fse_from_dict(): @pytest.mark.milvus @pytest.mark.slow -@pytest.mark.parametrize("num_rows", [10, 100]) @pytest.mark.parametrize("use_multi_byte_strings", [True, False], ids=["multi_byte", "ascii"]) @pytest.mark.parametrize("truncate_long_strings", [True, False], ids=["truncate", "no_truncate"]) @pytest.mark.parametrize("exceed_max_str_len", [True, False], ids=["exceed_max_len", "within_max_len"]) @@ -527,6 +526,7 @@ def test_insert_dataframe(milvus_server_uri: str, exceed_max_str_len: bool, long_ascii_string: str, long_multibyte_string: str): + num_rows = 10 collection_name = "test_insert_dataframe" milvus_service = MilvusVectorDBService(uri=milvus_server_uri, truncate_long_strings=truncate_long_strings) From 35dc56748412bc86e0d8a22dc3f325745de19aaf Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 11:25:22 -0700 Subject: [PATCH 20/27] Remove old param --- tests/test_milvus_vector_db_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 9c5d08672b..86d7daffd5 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -519,7 +519,6 @@ def test_fse_from_dict(): @pytest.mark.parametrize("exceed_max_str_len", [True, False], ids=["exceed_max_len", "within_max_len"]) def test_insert_dataframe(milvus_server_uri: str, string_collection_config: dict, - num_rows: int, dataset: DatasetManager, use_multi_byte_strings: bool, truncate_long_strings: bool, From a6cb1315b92538c982d9cf9f2f5c881df5097901 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 11:31:42 -0700 Subject: [PATCH 21/27] Lint fixes --- tests/io/test_io_utils.py | 1 - tests/test_milvus_vector_db_service.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/io/test_io_utils.py b/tests/io/test_io_utils.py index 8246a7f680..1ad46b75cb 100755 --- a/tests/io/test_io_utils.py +++ b/tests/io/test_io_utils.py @@ -16,7 +16,6 @@ from collections.abc import Callable -import pandas as pd import pytest import cudf diff --git a/tests/test_milvus_vector_db_service.py b/tests/test_milvus_vector_db_service.py index 86d7daffd5..3d0548176d 100644 --- a/tests/test_milvus_vector_db_service.py +++ b/tests/test_milvus_vector_db_service.py @@ -592,8 +592,8 @@ def test_insert_dataframe(milvus_server_uri: str, milvus_service.insert_dataframe(collection_name, df) return # Skip the rest of the test if the string column exceeds the maximum length. - else: - milvus_service.insert_dataframe(collection_name, df) + + milvus_service.insert_dataframe(collection_name, df) # Retrieve inserted data by primary keys. retrieved_data = milvus_service.retrieve_by_keys(collection_name, ids) From 148d398cc5bdf1290d7015da595b83cbd59a972f Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 11:48:37 -0700 Subject: [PATCH 22/27] Remove stray print method --- morpheus/stages/inference/inference_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index 8b1fa75d3a..936420bc55 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -287,7 +287,7 @@ def set_output_fut(resp: TensorMemory, inner_batch, batch_future: mrc.Future): _message_meta = CppMessageMeta(df=_df) _message.payload(_message_meta) _message.tensors().set_tensor("probs", output_message.get_probs_tensor()) - print(_df) + output_message = _message return output_message From d3529301d81394e0d035e77cca5e3392a1c0a2b5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 12:41:09 -0700 Subject: [PATCH 23/27] Don't hard-code the name of the probabilities tensor, don't assume its the only tensor --- morpheus/stages/inference/inference_stage.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/morpheus/stages/inference/inference_stage.py b/morpheus/stages/inference/inference_stage.py index 936420bc55..ab12afe4d3 100644 --- a/morpheus/stages/inference/inference_stage.py +++ b/morpheus/stages/inference/inference_stage.py @@ -286,7 +286,11 @@ def set_output_fut(resp: TensorMemory, inner_batch, batch_future: mrc.Future): if (_df is not None and not _df.empty): _message_meta = CppMessageMeta(df=_df) _message.payload(_message_meta) - _message.tensors().set_tensor("probs", output_message.get_probs_tensor()) + + response_tensors = output_message.tensors + cm_tensors = _message.tensors() + for (name, tensor) in response_tensors.items(): + cm_tensors.set_tensor(name, tensor) output_message = _message From bc1e3c230f05d7777cbbbbbcd1ca4745a8947d7c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 13:53:11 -0700 Subject: [PATCH 24/27] Re-work hard-coded probs->embeddings copy that used to exist in inference_stage.py as a stage in the pipeline --- examples/llm/vdb_upload/pipeline.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/examples/llm/vdb_upload/pipeline.py b/examples/llm/vdb_upload/pipeline.py index 494446d16c..17a9e55af3 100644 --- a/examples/llm/vdb_upload/pipeline.py +++ b/examples/llm/vdb_upload/pipeline.py @@ -19,7 +19,9 @@ from vdb_upload.helper import process_vdb_sources from morpheus.config import Config +from morpheus.messages import ControlMessage from morpheus.pipeline.pipeline import Pipeline +from morpheus.pipeline.stage_decorator import stage from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage @@ -78,6 +80,20 @@ def pipeline(pipeline_config: Config, monitor_2 = pipe.add_stage( MonitorStage(pipeline_config, description="Inference rate", unit="events", delayed_start=True)) + @stage + def embedding_tensor_to_df(message: ControlMessage, *, embedding_tensor_name='probs') -> ControlMessage: + """ + Copies the probs tensor to the 'embedding' field of the dataframe. + """ + msg_meta = message.payload() + with msg_meta.mutable_dataframe() as df: + embedding_tensor = message.tensors().get_tensor(embedding_tensor_name) + df['embedding'] = embedding_tensor.tolist() + + return message + + embedding_tensor_to_df_stage = pipe.add_stage(embedding_tensor_to_df(config=pipeline_config)) + vector_db = pipe.add_stage(WriteToVectorDBStage(pipeline_config, **vdb_config)) monitor_3 = pipe.add_stage( @@ -96,7 +112,8 @@ def pipeline(pipeline_config: Config, pipe.add_edge(nlp_stage, monitor_1) pipe.add_edge(monitor_1, embedding_stage) pipe.add_edge(embedding_stage, monitor_2) - pipe.add_edge(monitor_2, vector_db) + pipe.add_edge(monitor_2, embedding_tensor_to_df_stage) + pipe.add_edge(embedding_tensor_to_df_stage, vector_db) pipe.add_edge(vector_db, monitor_3) start_time = time.time() From 078780fe5b2f1427e7866835a2bb1220fa35d4d1 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 13:56:10 -0700 Subject: [PATCH 25/27] Lint fix --- examples/llm/vdb_upload/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/llm/vdb_upload/pipeline.py b/examples/llm/vdb_upload/pipeline.py index 17a9e55af3..5d5fbee8e4 100644 --- a/examples/llm/vdb_upload/pipeline.py +++ b/examples/llm/vdb_upload/pipeline.py @@ -92,7 +92,7 @@ def embedding_tensor_to_df(message: ControlMessage, *, embedding_tensor_name='pr return message - embedding_tensor_to_df_stage = pipe.add_stage(embedding_tensor_to_df(config=pipeline_config)) + embedding_tensor_to_df_stage = pipe.add_stage(embedding_tensor_to_df(pipeline_config)) vector_db = pipe.add_stage(WriteToVectorDBStage(pipeline_config, **vdb_config)) From f3d03343a0ab8a98aeb3722b05b24faf8c920942 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 14:42:47 -0700 Subject: [PATCH 26/27] Re-enable C++ mode support --- examples/llm/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/llm/cli.py b/examples/llm/cli.py index 1ea9198dc1..c8aea20320 100644 --- a/examples/llm/cli.py +++ b/examples/llm/cli.py @@ -32,7 +32,7 @@ callback=parse_log_level, help="Specify the logging level to use.") @click.option('--use_cpp', - default=False, + default=True, type=bool, help=("Whether or not to use C++ node and message types or to prefer python. " "Only use as a last resort if bugs are encountered")) From abeb8111004b82cf432e23ae7c4aee2e943a0d05 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 25 Apr 2024 15:02:35 -0700 Subject: [PATCH 27/27] Remove the two issues this PR should resolve --- docs/source/extra_info/known_issues.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/docs/source/extra_info/known_issues.md b/docs/source/extra_info/known_issues.md index 014fac3471..9eeb53508e 100644 --- a/docs/source/extra_info/known_issues.md +++ b/docs/source/extra_info/known_issues.md @@ -19,7 +19,5 @@ limitations under the License. - TrainAEStage fails with a Segmentation fault ([#1641](https://github.com/nv-morpheus/Morpheus/pull/1641)) - vdb_upload example pipeline triggers an internal error in Triton ([#1649](https://github.com/nv-morpheus/Morpheus/pull/1649)) -- vdb_upload example pipeline error on inserting large strings ([#1650](https://github.com/nv-morpheus/Morpheus/pull/1650)) -- vdb_upload example pipeline only works with C++ mode disabled ([#1651](https://github.com/nv-morpheus/Morpheus/pull/1651)) Refer to [open issues in the Morpheus project](https://github.com/nv-morpheus/Morpheus/issues)