From 54820e2ffbfec420fe3196b6dd3713d9ed858f68 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 13 May 2024 15:24:19 -0700 Subject: [PATCH 1/4] Add group by column stage --- .../preprocess/group_by_column_stage.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 morpheus/stages/preprocess/group_by_column_stage.py diff --git a/morpheus/stages/preprocess/group_by_column_stage.py b/morpheus/stages/preprocess/group_by_column_stage.py new file mode 100644 index 0000000000..dda1b26b8d --- /dev/null +++ b/morpheus/stages/preprocess/group_by_column_stage.py @@ -0,0 +1,66 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mrc +from mrc.core import operators as ops + +from morpheus.config import Config +from morpheus.messages import MessageMeta +from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin +from morpheus.pipeline.single_port_stage import SinglePortStage + + +class GroupByColumnStage(PassThruTypeMixin, SinglePortStage): + """ + Group the incoming message by a column in the DataFrame. + + Parameters + ---------- + config : morpheus.config.Config + Pipeline configuration instance + column_name : str + The column name in the message dataframe to group by + """ + + def __init__(self, config: Config, column_name: str): + super().__init__(config) + + self._column_name = column_name + + @property + def name(self) -> str: + return "group-by-column" + + def accepted_types(self) -> tuple: + return (MessageMeta, ) + + def supports_cpp_node(self) -> bool: + return False + + def on_data(self, message: MessageMeta) -> list[MessageMeta]: + with message.mutable_dataframe() as df: + grouper = df.groupby(self._column_name) + + ouptut_messages = [] + for group_name in grouper.groups.keys(): + group_df = grouper.get_group(group_name) + ouptut_messages.append(MessageMeta(group_df)) + + return ouptut_messages + + def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: + node = builder.make_node(self.unique_name, ops.map(self.on_data), ops.flatten()) + builder.make_edge(input_node, node) + + return node From cd739b74193f8e2aed6ba1cf5d804a5cfa587af0 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 16 May 2024 12:39:17 -0700 Subject: [PATCH 2/4] Group alphabetically for determinism --- morpheus/stages/preprocess/group_by_column_stage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morpheus/stages/preprocess/group_by_column_stage.py b/morpheus/stages/preprocess/group_by_column_stage.py index dda1b26b8d..97c8242ecd 100644 --- a/morpheus/stages/preprocess/group_by_column_stage.py +++ b/morpheus/stages/preprocess/group_by_column_stage.py @@ -53,7 +53,7 @@ def on_data(self, message: MessageMeta) -> list[MessageMeta]: grouper = df.groupby(self._column_name) ouptut_messages = [] - for group_name in grouper.groups.keys(): + for group_name in sorted(grouper.groups.keys()): group_df = grouper.get_group(group_name) ouptut_messages.append(MessageMeta(group_df)) From faa973d644b5ca9f08cb6017bf3ca8a29ba78637 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 16 May 2024 12:39:41 -0700 Subject: [PATCH 3/4] Test for GroupByColumnStage --- tests/stages/test_group_by_column_stage.py | 93 ++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100755 tests/stages/test_group_by_column_stage.py diff --git a/tests/stages/test_group_by_column_stage.py b/tests/stages/test_group_by_column_stage.py new file mode 100755 index 0000000000..100f2941f5 --- /dev/null +++ b/tests/stages/test_group_by_column_stage.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pandas as pd +import pytest + +import cudf + +from _utils import TEST_DIRS +from _utils import assert_results +from morpheus.config import Config +from morpheus.io.deserializers import read_file_to_df +from morpheus.messages import MessageMeta +from morpheus.pipeline import LinearPipeline +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +from morpheus.stages.preprocess.group_by_column_stage import GroupByColumnStage +from morpheus.utils.compare_df import compare_df +from morpheus.utils.type_aliases import DataFrameType + + +@pytest.fixture(name="_test_df", scope="module") +def _test_df_fixture(): + """ + Read the source data only once + """ + # Manually reading this in since we need lines=False + yield read_file_to_df(os.path.join(TEST_DIRS.tests_data_dir, 'azure_ad_logs.json'), + parser_kwargs={'lines': False}, + df_type='pandas') + + +@pytest.fixture(name="test_df") +def test_df_fixture(_test_df: DataFrameType): + """ + Ensure each test gets a unique copy + """ + yield _test_df.copy(deep=True) + + +@pytest.mark.parametrize("group_by_column", ["identity", "location"]) +def test_group_by_column_stage_pipe(config: Config, group_by_column: str, test_df: DataFrameType): + input_df = cudf.from_pandas(test_df) + input_df.drop(columns=["properties"], inplace=True) # Remove once #1527 is resolved + + # Intentionally constructing the expected data in a manual way not involving pandas or cudf to avoid using the same + # technology as the GroupByColumnStage + rows = test_df.to_dict(orient="records") + expected_data: dict[str, list[dict]] = {} + for row in rows: + key = row[group_by_column] + if key not in expected_data: + expected_data[key] = [] + + row.pop('properties') # Remove once #1527 is resolved + expected_data[key].append(row) + + expected_dataframes: list[DataFrameType] = [] + for key in sorted(expected_data.keys()): + df = pd.DataFrame(expected_data[key]) + expected_dataframes.append(df) + + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, dataframes=[input_df])) + pipe.add_stage(GroupByColumnStage(config, column_name=group_by_column)) + sink = pipe.add_stage(InMemorySinkStage(config)) + + pipe.run() + + messages: MessageMeta = sink.get_messages() + assert len(messages) == len(expected_dataframes) + for (i, message) in enumerate(messages): + output_df = message.copy_dataframe().to_pandas() + output_df.reset_index(drop=True, inplace=True) + + expected_df = expected_dataframes[i] + + assert_results(compare_df(expected_df, output_df)) From 001bd7d098d55ad82d628111896d637a8bfe9863 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 3 Jun 2024 08:50:55 -0700 Subject: [PATCH 4/4] Fix a spelling error and some docstrings --- .../preprocess/group_by_column_stage.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/morpheus/stages/preprocess/group_by_column_stage.py b/morpheus/stages/preprocess/group_by_column_stage.py index 97c8242ecd..d69504dd27 100644 --- a/morpheus/stages/preprocess/group_by_column_stage.py +++ b/morpheus/stages/preprocess/group_by_column_stage.py @@ -43,21 +43,35 @@ def name(self) -> str: return "group-by-column" def accepted_types(self) -> tuple: + """ + Returns accepted input types for this stage. + """ return (MessageMeta, ) def supports_cpp_node(self) -> bool: + """ + Indicates whether this stage supports C++ node. + """ return False def on_data(self, message: MessageMeta) -> list[MessageMeta]: + """ + Group the incoming message by a column in the DataFrame. + + Parameters + ---------- + message : MessageMeta + Incoming message + """ with message.mutable_dataframe() as df: grouper = df.groupby(self._column_name) - ouptut_messages = [] + output_messages = [] for group_name in sorted(grouper.groups.keys()): group_df = grouper.get_group(group_name) - ouptut_messages.append(MessageMeta(group_df)) + output_messages.append(MessageMeta(group_df)) - return ouptut_messages + return output_messages def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: node = builder.make_node(self.unique_name, ops.map(self.on_data), ops.flatten())