From 406dbeb0bb234fb6bcaeb3a6b655e76eff05fe60 Mon Sep 17 00:00:00 2001 From: Yuchen Zhang <134643420+yczhang-nv@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:06:33 -0700 Subject: [PATCH] fix unit test for multi_processing_stage --- .../stages/general/multi_processing_stage.py | 58 +++++++------------ .../morpheus/utils/shared_process_pool.py | 2 +- tests/test_multi_processing_stage.py | 39 ++++++++++--- 3 files changed, 53 insertions(+), 46 deletions(-) diff --git a/python/morpheus/morpheus/stages/general/multi_processing_stage.py b/python/morpheus/morpheus/stages/general/multi_processing_stage.py index 2001be4413..2456e811e6 100644 --- a/python/morpheus/morpheus/stages/general/multi_processing_stage.py +++ b/python/morpheus/morpheus/stages/general/multi_processing_stage.py @@ -1,13 +1,30 @@ -from abc import ABC, abstractmethod +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import typing +from abc import ABC +from abc import abstractmethod -from morpheus.pipeline.stage_schema import StageSchema -from morpheus.utils.shared_process_pool import SharedProcessPool -from morpheus.config import Config -from morpheus.pipeline.single_port_stage import SinglePortStage import mrc import mrc.core.operators as ops +from morpheus.config import Config +from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.pipeline.stage_schema import StageSchema +from morpheus.utils.shared_process_pool import SharedProcessPool + InputT = typing.TypeVar('InputT') OutputT = typing.TypeVar('OutputT') @@ -56,7 +73,6 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> return node - class MultiProcessingStage(MultiProcessingBaseStage[InputT, OutputT]): def __init__(self, @@ -84,33 +100,3 @@ def _on_data(self, data: InputT) -> OutputT: def create(*, c: Config, process_fn: typing.Callable[[InputT], OutputT], process_pool_usage: float): return MultiProcessingStage[InputT, OutputT](c=c, process_pool_usage=process_pool_usage, process_fn=process_fn) - - -# pipe = LinearPipeline(config) - -# # ...add other stages... - -# # You can derive from the base class if you need to use self inside the process function -# class MyCustomMultiProcessStage(MultiProcessStage[ControlMessage, ControlMessage]): - -# def __init__(self, *, c: Config, process_pool_usage: float, add_column_name: str): -# super().__init__(self, c=c, process_pool_usage=process_pool_usage) - -# self._add_column_name = add_column_name - -# def _on_data(self, data: ControlMessage) -> ControlMessage: - -# with data.payload().mutable_dataframe() as df: -# df[self._add_column_name] = "hello" - -# return data - -# # Add an instance of the custom stage -# pipe.add_stage(MyCustomMultiProcessStage(c=config, process_pool_usage, add_column_name="NewCol") - -# # If you just want to supply a function pointer -# def print_process_id(message): -# print(os.pid()) -# return message - -# pipe.add_stage(MultiProcessingStage.create(c=config, process_fn=print_process_id)) diff --git a/python/morpheus/morpheus/utils/shared_process_pool.py b/python/morpheus/morpheus/utils/shared_process_pool.py index d43820a1bc..9345b868bb 100644 --- a/python/morpheus/morpheus/utils/shared_process_pool.py +++ b/python/morpheus/morpheus/utils/shared_process_pool.py @@ -99,7 +99,7 @@ def _initialize(self, total_max_workers): @property def total_max_workers(self): return self._total_max_workers - + @staticmethod def _worker(task_queues, stage_semaphores, shutdown_in_progress): logger.debug("Worker process %s has been started.", os.getpid()) diff --git a/tests/test_multi_processing_stage.py b/tests/test_multi_processing_stage.py index 14574a9d73..e728a34111 100644 --- a/tests/test_multi_processing_stage.py +++ b/tests/test_multi_processing_stage.py @@ -1,8 +1,26 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os from typing import Tuple -import cudf + import pytest -import os +import cudf + +from _utils import assert_results from _utils.dataset_manager import DatasetManager from morpheus.config import Config from morpheus.messages import ControlMessage @@ -10,6 +28,8 @@ from morpheus.stages.general.multi_processing_stage import MultiProcessingBaseStage from morpheus.stages.general.multi_processing_stage import MultiProcessingStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage +from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage @@ -43,22 +63,23 @@ def _on_data(self, data: ControlMessage) -> ControlMessage: return data + @pytest.mark.use_python def test_stage_pipe(config: Config, dataset_pandas: DatasetManager): config.num_threads = os.cpu_count() input_df = dataset_pandas["filter_probs.csv"] + add_column_name = "new_column" + expected_df = input_df.copy() + expected_df[add_column_name] = "Hello" pipe = LinearPipeline(config) pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)])) pipe.add_stage(DeserializeStage(config, ensure_sliceable_index=True, message_type=ControlMessage)) - pipe.add_stage(DerivedMultiProcessingStage(c=config, process_pool_usage=0.5, add_column_name="new_column")) + pipe.add_stage(DerivedMultiProcessingStage(c=config, process_pool_usage=0.5, add_column_name=add_column_name)) + pipe.add_stage(SerializeStage(config)) + comp_stage = pipe.add_stage(CompareDataFrameStage(config, expected_df)) pipe.run() - -# if __name__ == "__main__": -# config = Config() -# dataset_pandas = DatasetManager() -# # test_constructor(config) -# test_stage_pipe(config, dataset_pandas) + assert_results(comp_stage.get_results())