From 55a5b972ccb1b913af662a3f431bd4aab385d579 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 10 Oct 2024 11:48:07 -0700 Subject: [PATCH] Test CPU and GPU execution modes --- tests/test_multi_processing_stage.py | 35 ++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/tests/test_multi_processing_stage.py b/tests/test_multi_processing_stage.py index d83e9b8d3d..abf3a25dda 100644 --- a/tests/test_multi_processing_stage.py +++ b/tests/test_multi_processing_stage.py @@ -26,6 +26,7 @@ from _utils import assert_results from _utils.dataset_manager import DatasetManager from morpheus.config import Config +from morpheus.config import ExecutionMode from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta from morpheus.pipeline import LinearPipeline @@ -49,6 +50,7 @@ def _process_df(df: pd.DataFrame, column: str, value: str) -> pd.DataFrame: return df +@pytest.mark.gpu_and_cpu_mode def test_create_stage_type_deduction(config: Config, dataset_pandas: DatasetManager): # Test create() with normal function @@ -105,26 +107,39 @@ def __init__(self, self._add_column_name = add_column_name self._shared_process_pool.set_usage(self.name, self._process_pool_usage) + self._execution_mode = c.execution_mode @property def name(self) -> str: return "derived-multi-processing-stage" + def supported_execution_modes(self) -> tuple[ExecutionMode]: + """ + Returns a tuple of supported execution modes of this stage. + """ + return (ExecutionMode.GPU, ExecutionMode.CPU) + def _on_data(self, data: ControlMessage) -> ControlMessage: input_df = data.payload().copy_dataframe() - pdf = input_df.to_pandas() + if self._execution_mode == ExecutionMode.GPU: + input_df = input_df.to_pandas() + partial_process_fn = partial(_process_df, column=self._add_column_name, value="Hello") - task = self._shared_process_pool.submit_task(self.name, partial_process_fn, pdf) + task = self._shared_process_pool.submit_task(self.name, partial_process_fn, input_df) + + df = task.result() + if self._execution_mode == ExecutionMode.GPU: + df = cudf.DataFrame.from_pandas(df) - df = cudf.DataFrame.from_pandas(task.result()) meta = MessageMeta(df) data.payload(meta) return data +@pytest.mark.gpu_and_cpu_mode def test_derived_stage_type_deduction(config: Config): mp_stage = DerivedMultiProcessingStage(c=config, process_pool_usage=0.1, add_column_name="new_column") @@ -137,12 +152,11 @@ def test_derived_stage_type_deduction(config: Config): def pandas_dataframe_generator(dataset_pandas: DatasetManager, count: int) -> Generator[pd.DataFrame, None, None]: - - df = dataset_pandas["csv_sample.csv"] for _ in range(count): - yield df + yield dataset_pandas["csv_sample.csv"] +@pytest.mark.gpu_and_cpu_mode def test_created_stage_pipe(config: Config, dataset_pandas: DatasetManager): config.num_threads = os.cpu_count() @@ -171,6 +185,7 @@ def test_created_stage_pipe(config: Config, dataset_pandas: DatasetManager): assert df.equals(expected_df) +@pytest.mark.gpu_and_cpu_mode def test_derived_stage_pipe(config: Config, dataset_pandas: DatasetManager): config.num_threads = os.cpu_count() @@ -181,7 +196,7 @@ def test_derived_stage_pipe(config: Config, dataset_pandas: DatasetManager): expected_df[add_column_name] = "Hello" pipe = LinearPipeline(config) - pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)])) + pipe.set_source(InMemorySourceStage(config, [input_df])) pipe.add_stage(DeserializeStage(config, ensure_sliceable_index=True)) pipe.add_stage(DerivedMultiProcessingStage(c=config, process_pool_usage=0.1, add_column_name=add_column_name)) pipe.add_stage(SerializeStage(config)) @@ -192,6 +207,7 @@ def test_derived_stage_pipe(config: Config, dataset_pandas: DatasetManager): assert_results(comp_stage.get_results()) +@pytest.mark.gpu_and_cpu_mode def test_multiple_stages_pipe(config: Config, dataset_pandas: DatasetManager): config.num_threads = os.cpu_count() @@ -206,9 +222,8 @@ def test_multiple_stages_pipe(config: Config, dataset_pandas: DatasetManager): partial_fn = partial(_process_df, column="new_column_1", value="new_value") - @stage - def pdf_to_control_message_stage(pdf: pd.DataFrame) -> ControlMessage: - df = cudf.DataFrame.from_pandas(pdf) + @stage(execution_modes=(ExecutionMode.CPU, ExecutionMode.GPU)) + def pdf_to_control_message_stage(df: pd.DataFrame) -> ControlMessage: meta = MessageMeta(df) msg = ControlMessage() msg.payload(meta)