From 1b52ab865eac6d0cbb9cf0fa3cfbc7c2efd143f6 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Mon, 22 Jan 2024 09:13:26 -0800 Subject: [PATCH] Create a base mixin class for ingress & egress stages (#1473) * Creates a new `BoundaryStageMixin` class * Allowing `LinearBoundaryEgressStage` and `LinearBoundaryIngressStage` to share a common class and be distinguished from other stages Closes #638 ## By Submitting this PR I confirm: - I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md). - When the PR is ready for review, new or existing tests cover these changes. - When the PR is ready for review, the documentation is up to date with these changes. Authors: - David Gardner (https://github.com/dagardner-nv) Approvers: - Devin Robison (https://github.com/drobison00) - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/1473 --- morpheus/pipeline/boundary_stage_mixin.py | 22 +++++++ morpheus/pipeline/pipeline.py | 9 ++- .../stages/boundary/linear_boundary_stage.py | 7 ++- tests/pipeline/test_pipeline.py | 62 ++++++++++++++++++- 4 files changed, 93 insertions(+), 7 deletions(-) create mode 100644 morpheus/pipeline/boundary_stage_mixin.py diff --git a/morpheus/pipeline/boundary_stage_mixin.py b/morpheus/pipeline/boundary_stage_mixin.py new file mode 100644 index 0000000000..f721bcfa33 --- /dev/null +++ b/morpheus/pipeline/boundary_stage_mixin.py @@ -0,0 +1,22 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC + + +class BoundaryStageMixin(ABC): + """ + Mixin intended to be added to both ingress and egress boundary stages, currently this only adds the ability to + identify boundary stages. + """ diff --git a/morpheus/pipeline/pipeline.py b/morpheus/pipeline/pipeline.py index 568c848df4..4859244c10 100644 --- a/morpheus/pipeline/pipeline.py +++ b/morpheus/pipeline/pipeline.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ from tqdm import tqdm from morpheus.config import Config +from morpheus.pipeline.boundary_stage_mixin import BoundaryStageMixin from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.receiver import Receiver from morpheus.pipeline.sender import Sender @@ -175,9 +176,9 @@ def add_edge(self, end_port_idx=end_port.port_number) def add_segment_edge(self, - egress_stage: Stage, + egress_stage: BoundaryStageMixin, egress_segment: str, - ingress_stage: Stage, + ingress_stage: BoundaryStageMixin, ingress_segment: str, port_pair: typing.Union[str, typing.Tuple[str, typing.Type, bool]]): """ @@ -205,6 +206,7 @@ def add_segment_edge(self, * bool: If the type is a shared pointer (typically should be `False`) """ self._assert_not_built() + assert isinstance(egress_stage, BoundaryStageMixin), "Egress stage must be a BoundaryStageMixin" egress_edges = self._segments[egress_segment]["egress_ports"] egress_edges.append({ "port_pair": port_pair, @@ -213,6 +215,7 @@ def add_segment_edge(self, "receiver_segment": ingress_segment }) + assert isinstance(ingress_stage, BoundaryStageMixin), "Ingress stage must be a BoundaryStageMixin" ingress_edges = self._segments[ingress_segment]["ingress_ports"] ingress_edges.append({ "port_pair": port_pair, diff --git a/morpheus/stages/boundary/linear_boundary_stage.py b/morpheus/stages/boundary/linear_boundary_stage.py index 14d1db1858..ad8db9ebc2 100644 --- a/morpheus/stages/boundary/linear_boundary_stage.py +++ b/morpheus/stages/boundary/linear_boundary_stage.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ from mrc.core import operators as ops from morpheus.config import Config +from morpheus.pipeline.boundary_stage_mixin import BoundaryStageMixin from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource @@ -28,7 +29,7 @@ logger = logging.getLogger(__name__) -class LinearBoundaryEgressStage(PassThruTypeMixin, SinglePortStage): +class LinearBoundaryEgressStage(BoundaryStageMixin, PassThruTypeMixin, SinglePortStage): """ The LinearBoundaryEgressStage acts as an egress point from one linear segment to another. Given an existing linear pipeline that we want to connect to another segment, a linear boundary egress stage would be added, in conjunction @@ -81,7 +82,7 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> return input_node -class LinearBoundaryIngressStage(PreallocatorMixin, SingleOutputSource): +class LinearBoundaryIngressStage(BoundaryStageMixin, PreallocatorMixin, SingleOutputSource): """ The LinearBoundaryIngressStage acts as source ingress point from a corresponding egress in another linear segment. Given an existing linear pipeline that we want to connect to another segment, a linear boundary egress stage would diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 40ca0b9612..1d93dc22c0 100755 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -26,8 +26,13 @@ from _utils.stages.multi_message_pass_thru import MultiMessagePassThruStage from _utils.stages.multi_port_pass_thru import MultiPortPassThruStage from morpheus.config import Config +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta +from morpheus.messages import MultiMessage from morpheus.pipeline import LinearPipeline from morpheus.pipeline import Pipeline +from morpheus.stages.boundary.linear_boundary_stage import LinearBoundaryEgressStage +from morpheus.stages.boundary.linear_boundary_stage import LinearBoundaryIngressStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage @@ -195,3 +200,58 @@ def test_add_edge_input_port_errors(config: Config, num_inputs: int): with pytest.raises(AssertionError): pipe.add_edge(start_stage.output_ports[0], end_stage) + + +@pytest.mark.parametrize("data_type", [int, float, str, MessageMeta, ControlMessage, MultiMessage]) +def test_add_segment_edge(config: Config, data_type: type): + pipe = Pipeline(config) + + boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=data_type) + boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=data_type) + + pipe.add_stage(boundary_egress, "seg_1") + pipe.add_stage(boundary_ingress, "seg_2") + pipe.add_segment_edge(boundary_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False)) + + +def test_add_segment_edge_assert_not_built(config: Config): + pipe = Pipeline(config) + + src_stage = InMemSourceXStage(config, data=list(range(3))) + boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=int) + boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=int) + + pipe.add_stage(src_stage, "seg_1") + pipe.add_stage(boundary_egress, "seg_1") + pipe.add_edge(src_stage, boundary_egress, "seg_1") + pipe.add_stage(boundary_ingress, "seg_2") + pipe.build() + + with pytest.raises(AssertionError): + pipe.add_segment_edge(boundary_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False)) + + +def test_add_segment_edge_bad_egress(config: Config): + pipe = Pipeline(config) + + bad_egress = InMemorySinkStage(config) + boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=int) + + pipe.add_stage(bad_egress, "seg_1") + pipe.add_stage(boundary_ingress, "seg_2") + + with pytest.raises(AssertionError): + pipe.add_segment_edge(bad_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False)) + + +def test_add_segment_edge_bad_ingress(config: Config): + pipe = Pipeline(config) + + boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=int) + bad_ingress = InMemSourceXStage(config, data=list(range(3))) + + pipe.add_stage(boundary_egress, "seg_1") + pipe.add_stage(bad_ingress, "seg_2") + + with pytest.raises(AssertionError): + pipe.add_segment_edge(boundary_egress, "seg_1", bad_ingress, "seg_2", ("seg_1", object, False))