Skip to content

Commit

Permalink
Simple repro test for Morpheus issue nv-morpheus#2002
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Oct 29, 2024
1 parent 6e2f4f1 commit c7d23b4
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions tests/morpheus/pipeline/test_file_in_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,25 @@
import numpy as np
import pytest

import cudf

from _utils import TEST_DIRS
from _utils import assert_path_exists
from _utils import assert_results
from _utils.dataset_manager import DatasetManager
from morpheus.common import FileTypes
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.io.deserializers import read_file_to_df
from morpheus.io.serializers import write_df_to_file
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.pipeline.stage_decorator import stage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
Expand Down Expand Up @@ -300,3 +307,34 @@ def test_file_rw_serialize_deserialize_multi_segment_pipe(tmp_path: pathlib.Path
# Somehow 0.7 ends up being 0.7000000000000001
output_data = np.around(output_data, 2)
assert output_data.tolist() == input_data.tolist()


@pytest.mark.slow
def test_null_json_output(config: Config):
"""
Test reproduces Morpheus issue #2011
Issue occurrs when the length of the dataframe is larger than the pipeline batch size
"""
config.pipeline_batch_size = 256

input_df = cudf.DataFrame({"a": range(1024)})
expected_df = cudf.DataFrame({"a": range(1024), "copy": range(1024)})

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, dataframes=[input_df]))
pipe.add_stage(DeserializeStage(config))

@stage(execution_modes=(config.execution_mode, ), needed_columns={"copy": TypeId.INT64})
def copy_col(msg: ControlMessage) -> ControlMessage:
meta = msg.payload()
a_col = meta.get_data('a')
meta.set_data("copy", a_col)

return msg

pipe.add_stage(copy_col(config))
pipe.add_stage(SerializeStage(config))
cmp_stage = pipe.add_stage(CompareDataFrameStage(config, compare_df=expected_df))
pipe.run()

assert_results(cmp_stage.get_results())

0 comments on commit c7d23b4

Please sign in to comment.