diff --git a/tests/morpheus/pipeline/test_file_in_out.py b/tests/morpheus/pipeline/test_file_in_out.py index b61e496bec..41ffe11337 100755 --- a/tests/morpheus/pipeline/test_file_in_out.py +++ b/tests/morpheus/pipeline/test_file_in_out.py @@ -22,10 +22,14 @@ import numpy as np import pytest +import cudf + from _utils import TEST_DIRS from _utils import assert_path_exists +from _utils import assert_results from _utils.dataset_manager import DatasetManager from morpheus.common import FileTypes +from morpheus.common import TypeId from morpheus.config import Config from morpheus.config import CppConfig from morpheus.io.deserializers import read_file_to_df @@ -33,7 +37,10 @@ from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta from morpheus.pipeline import LinearPipeline +from morpheus.pipeline.stage_decorator import stage from morpheus.stages.input.file_source_stage import FileSourceStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage @@ -300,3 +307,34 @@ def test_file_rw_serialize_deserialize_multi_segment_pipe(tmp_path: pathlib.Path # Somehow 0.7 ends up being 0.7000000000000001 output_data = np.around(output_data, 2) assert output_data.tolist() == input_data.tolist() + + +@pytest.mark.slow +def test_null_json_output(config: Config): + """ + Test reproduces Morpheus issue #2011 + Issue occurrs when the length of the dataframe is larger than the pipeline batch size + """ + config.pipeline_batch_size = 256 + + input_df = cudf.DataFrame({"a": range(1024)}) + expected_df = cudf.DataFrame({"a": range(1024), "copy": range(1024)}) + + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, dataframes=[input_df])) + pipe.add_stage(DeserializeStage(config)) + + @stage(execution_modes=(config.execution_mode, ), needed_columns={"copy": TypeId.INT64}) + def copy_col(msg: ControlMessage) -> ControlMessage: + meta = msg.payload() + a_col = meta.get_data('a') + meta.set_data("copy", a_col) + + return msg + + pipe.add_stage(copy_col(config)) + pipe.add_stage(SerializeStage(config)) + cmp_stage = pipe.add_stage(CompareDataFrameStage(config, compare_df=expected_df)) + pipe.run() + + assert_results(cmp_stage.get_results())