From ddd657203f80cefee439ab38868e94cb38abc92a Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 20 Dec 2023 15:03:26 -0800 Subject: [PATCH] Update test --- tests/examples/log_parsing/test_inference.py | 88 +++++++++++--------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/tests/examples/log_parsing/test_inference.py b/tests/examples/log_parsing/test_inference.py index af9986bea7..3306ff9bdc 100644 --- a/tests/examples/log_parsing/test_inference.py +++ b/tests/examples/log_parsing/test_inference.py @@ -20,21 +20,19 @@ import cupy as cp import numpy as np -import pandas as pd import pytest -import cudf - from _utils import TEST_DIRS from morpheus.config import Config from morpheus.config import PipelineModes -from morpheus.messages import InferenceMemory from morpheus.messages import InferenceMemoryNLP from morpheus.messages import MessageMeta -from morpheus.messages import MultiInferenceMessage -from morpheus.messages import ResponseMemory +from morpheus.messages import MultiResponseMessage +from morpheus.messages import MultiInferenceNLPMessage +from morpheus.messages import TensorMemory from morpheus.stages.inference.triton_inference_stage import TritonInferenceWorker from morpheus.utils.producer_consumer_queue import ProducerConsumerQueue +from morpheus.utils.type_aliases import DataFrameType @pytest.fixture(name="config") @@ -43,7 +41,7 @@ def config_fixture(config: Config): yield config -def build_response_mem(log_test_data_dir: str) -> ResponseMemory: +def build_response_mem(log_test_data_dir: str) -> TensorMemory: # we have tensor data for the first five rows count = 5 tensors = {} @@ -52,15 +50,33 @@ def build_response_mem(log_test_data_dir: str) -> ResponseMemory: host_data = np.loadtxt(tensor_file, delimiter=',') tensors[tensor_name] = cp.asarray(host_data) - return ResponseMemory(count=count, **tensors) + return TensorMemory(count=count, tensors=tensors) + + +def build_resp_message(df: DataFrameType, num_cols: int = 2) -> MultiResponseMessage: + count = len(df) + seq_ids = cp.zeros((count, 3), dtype=cp.uint32) + seq_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32) + seq_ids[:, 2] = 42 + + meta = MessageMeta(df) + mem = TensorMemory(count=count, + tensors={ + 'confidences': cp.zeros((count, num_cols)), + 'labels': cp.zeros((count, num_cols)), + 'input_ids': cp.zeros((count, num_cols), dtype=cp.float32), + 'seq_ids': seq_ids + }) + + return MultiResponseMessage(meta=meta, mess_offset=0, mess_count=count, memory=mem, offset=0, count=count) -def build_inf_message(df: typing.Union[pd.DataFrame, cudf.DataFrame], +def build_inf_message(df: DataFrameType, mess_offset: int, mess_count: int, offset: int, count: int, - num_cols: int = 2) -> MultiInferenceMessage: + num_cols: int = 2) -> MultiInferenceNLPMessage: assert count >= mess_count tensor_length = offset + count seq_ids = cp.zeros((tensor_length, 3), dtype=cp.uint32) @@ -78,12 +94,12 @@ def build_inf_message(df: typing.Union[pd.DataFrame, cudf.DataFrame], input_mask=cp.zeros((tensor_length, num_cols), dtype=cp.float32), seq_ids=seq_ids) - return MultiInferenceMessage(meta=meta, - mess_offset=mess_offset, - mess_count=mess_count, - memory=mem, - offset=offset, - count=count) + return MultiInferenceNLPMessage(meta=meta, + mess_offset=mess_offset, + mess_count=mess_count, + memory=mem, + offset=offset, + count=count) def _check_worker(inference_mod: types.ModuleType, worker: TritonInferenceWorker, expected_mapping: dict[str, str]): @@ -119,8 +135,7 @@ def test_log_parsing_triton_inference_log_parsing_constructor(config: Config, @pytest.mark.import_mod([os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'inference.py')]) @pytest.mark.parametrize("mess_offset,mess_count,offset,count", [(0, 20, 0, 20), (5, 10, 5, 10)]) def test_log_parsing_triton_inference_log_parsing_build_output_message(config: Config, - filter_probs_df: typing.Union[pd.DataFrame, - cudf.DataFrame], + filter_probs_df: DataFrameType, import_mod: typing.List[types.ModuleType], mess_offset: int, mess_count: int, @@ -212,32 +227,25 @@ def test_log_parsing_inference_stage_get_inference_worker(config: Config, import @pytest.mark.use_python @pytest.mark.usefixtures("manual_seed", "config") -@pytest.mark.import_mod([ - os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'inference.py'), - os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'messages.py') -]) +@pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'log_parsing', 'inference.py')) @pytest.mark.parametrize("mess_offset,mess_count,offset,count", [(0, 5, 0, 5), (5, 5, 0, 5)]) def test_log_parsing_inference_stage_convert_one_response(import_mod: typing.List[types.ModuleType], - filter_probs_df: typing.Union[pd.DataFrame, cudf.DataFrame], + filter_probs_df: DataFrameType, mess_offset, mess_count, offset, count): - inference_mod, messages_mod = import_mod + inference_mod = import_mod ttl_count = len(filter_probs_df) input_res = build_response_mem(os.path.join(TEST_DIRS.tests_data_dir, 'examples/log_parsing')) # confidences, labels & input_ids all have the same shape - num_cols = input_res.confidences.shape[1] - input_mem = InferenceMemory(count=ttl_count, - tensors={ - 'confidences': cp.zeros((ttl_count, num_cols), dtype=cp.float32), - 'input_ids': cp.zeros((ttl_count, num_cols), dtype=cp.float32), - 'labels': cp.zeros((ttl_count, num_cols), dtype=cp.float32), - 'seq_ids': cp.zeros((ttl_count, 3), dtype=cp.uint32) - }) + num_cols = input_res.get_tensor('confidences').shape[1] + resp_msg = build_resp_message(filter_probs_df, num_cols=num_cols) + + orig_tensors = {k: v.copy() for (k, v) in resp_msg.memory.get_tensors().items()} input_inf = build_inf_message(filter_probs_df, mess_offset=mess_offset, @@ -246,11 +254,10 @@ def test_log_parsing_inference_stage_convert_one_response(import_mod: typing.Lis count=count, num_cols=num_cols) - output_msg = inference_mod.LogParsingInferenceStage._convert_one_response(input_mem, input_inf, input_res) + output_msg = inference_mod.LogParsingInferenceStage._convert_one_response(resp_msg, input_inf, input_res) - assert isinstance(output_msg, messages_mod.MultiPostprocLogParsingMessage) + assert isinstance(output_msg, MultiResponseMessage) assert output_msg.meta is input_inf.meta - assert output_msg.memory is input_mem assert output_msg.mess_offset == mess_offset assert output_msg.mess_count == mess_count assert output_msg.offset == offset @@ -258,11 +265,12 @@ def test_log_parsing_inference_stage_convert_one_response(import_mod: typing.Lis assert (output_msg.seq_ids == input_inf.seq_ids).all() assert (output_msg.input_ids == input_inf.input_ids).all() - assert (output_msg.confidences == input_res.confidences).all() - assert (output_msg.labels == input_res.labels).all() + assert (output_msg.confidences == input_res.get_tensor('confidences')).all() + assert (output_msg.labels == input_res.get_tensor('labels')).all() # Ensure we didn't write to the memory outside of the [offset:offset+count] bounds - tensors = input_mem.get_tensors() + tensors = resp_msg.memory.get_tensors() for (tensor_name, tensor) in tensors.items(): - assert (tensor[0:offset] == 0).all(), f"Out of bounds values for {tensor_name}" - assert (tensor[offset + count:] == 0).all(), f"Out of bounds values for {tensor_name}" + orig_tensor = orig_tensors[tensor_name] + assert (tensor[0:offset] == orig_tensor[0:offset]).all(), f"Out of bounds values for {tensor_name}" + assert (tensor[offset + count:] == orig_tensor[offset + count:]).all(), f"Out of bounds values for {tensor_name}"