Skip to content

Commit

Permalink
Test CPU and GPU execution modes
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv committed Oct 10, 2024
1 parent dd54ce1 commit 55a5b97
Showing 1 changed file with 25 additions and 10 deletions.
35 changes: 25 additions & 10 deletions tests/test_multi_processing_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from _utils import assert_results
from _utils.dataset_manager import DatasetManager
from morpheus.config import Config
from morpheus.config import ExecutionMode
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.pipeline import LinearPipeline
Expand All @@ -49,6 +50,7 @@ def _process_df(df: pd.DataFrame, column: str, value: str) -> pd.DataFrame:
return df


@pytest.mark.gpu_and_cpu_mode
def test_create_stage_type_deduction(config: Config, dataset_pandas: DatasetManager):

# Test create() with normal function
Expand Down Expand Up @@ -105,26 +107,39 @@ def __init__(self,

self._add_column_name = add_column_name
self._shared_process_pool.set_usage(self.name, self._process_pool_usage)
self._execution_mode = c.execution_mode

@property
def name(self) -> str:
return "derived-multi-processing-stage"

def supported_execution_modes(self) -> tuple[ExecutionMode]:
"""
Returns a tuple of supported execution modes of this stage.
"""
return (ExecutionMode.GPU, ExecutionMode.CPU)

def _on_data(self, data: ControlMessage) -> ControlMessage:

input_df = data.payload().copy_dataframe()
pdf = input_df.to_pandas()
if self._execution_mode == ExecutionMode.GPU:
input_df = input_df.to_pandas()

partial_process_fn = partial(_process_df, column=self._add_column_name, value="Hello")

task = self._shared_process_pool.submit_task(self.name, partial_process_fn, pdf)
task = self._shared_process_pool.submit_task(self.name, partial_process_fn, input_df)

df = task.result()
if self._execution_mode == ExecutionMode.GPU:
df = cudf.DataFrame.from_pandas(df)

df = cudf.DataFrame.from_pandas(task.result())
meta = MessageMeta(df)
data.payload(meta)

return data


@pytest.mark.gpu_and_cpu_mode
def test_derived_stage_type_deduction(config: Config):

mp_stage = DerivedMultiProcessingStage(c=config, process_pool_usage=0.1, add_column_name="new_column")
Expand All @@ -137,12 +152,11 @@ def test_derived_stage_type_deduction(config: Config):


def pandas_dataframe_generator(dataset_pandas: DatasetManager, count: int) -> Generator[pd.DataFrame, None, None]:

df = dataset_pandas["csv_sample.csv"]
for _ in range(count):
yield df
yield dataset_pandas["csv_sample.csv"]


@pytest.mark.gpu_and_cpu_mode
def test_created_stage_pipe(config: Config, dataset_pandas: DatasetManager):

config.num_threads = os.cpu_count()
Expand Down Expand Up @@ -171,6 +185,7 @@ def test_created_stage_pipe(config: Config, dataset_pandas: DatasetManager):
assert df.equals(expected_df)


@pytest.mark.gpu_and_cpu_mode
def test_derived_stage_pipe(config: Config, dataset_pandas: DatasetManager):

config.num_threads = os.cpu_count()
Expand All @@ -181,7 +196,7 @@ def test_derived_stage_pipe(config: Config, dataset_pandas: DatasetManager):
expected_df[add_column_name] = "Hello"

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)]))
pipe.set_source(InMemorySourceStage(config, [input_df]))
pipe.add_stage(DeserializeStage(config, ensure_sliceable_index=True))
pipe.add_stage(DerivedMultiProcessingStage(c=config, process_pool_usage=0.1, add_column_name=add_column_name))
pipe.add_stage(SerializeStage(config))
Expand All @@ -192,6 +207,7 @@ def test_derived_stage_pipe(config: Config, dataset_pandas: DatasetManager):
assert_results(comp_stage.get_results())


@pytest.mark.gpu_and_cpu_mode
def test_multiple_stages_pipe(config: Config, dataset_pandas: DatasetManager):
config.num_threads = os.cpu_count()

Expand All @@ -206,9 +222,8 @@ def test_multiple_stages_pipe(config: Config, dataset_pandas: DatasetManager):

partial_fn = partial(_process_df, column="new_column_1", value="new_value")

@stage
def pdf_to_control_message_stage(pdf: pd.DataFrame) -> ControlMessage:
df = cudf.DataFrame.from_pandas(pdf)
@stage(execution_modes=(ExecutionMode.CPU, ExecutionMode.GPU))
def pdf_to_control_message_stage(df: pd.DataFrame) -> ControlMessage:
meta = MessageMeta(df)
msg = ControlMessage()
msg.payload(meta)
Expand Down

0 comments on commit 55a5b97

Please sign in to comment.