diff --git a/morpheus/pipeline/boundary_stage_mixin.py b/morpheus/pipeline/boundary_stage_mixin.py new file mode 100644 index 0000000000..f721bcfa33 --- /dev/null +++ b/morpheus/pipeline/boundary_stage_mixin.py @@ -0,0 +1,22 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC + + +class BoundaryStageMixin(ABC): + """ + Mixin intended to be added to both ingress and egress boundary stages, currently this only adds the ability to + identify boundary stages. + """ diff --git a/morpheus/pipeline/pipeline.py b/morpheus/pipeline/pipeline.py index 568c848df4..4859244c10 100644 --- a/morpheus/pipeline/pipeline.py +++ b/morpheus/pipeline/pipeline.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ from tqdm import tqdm from morpheus.config import Config +from morpheus.pipeline.boundary_stage_mixin import BoundaryStageMixin from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.receiver import Receiver from morpheus.pipeline.sender import Sender @@ -175,9 +176,9 @@ def add_edge(self, end_port_idx=end_port.port_number) def add_segment_edge(self, - egress_stage: Stage, + egress_stage: BoundaryStageMixin, egress_segment: str, - ingress_stage: Stage, + ingress_stage: BoundaryStageMixin, ingress_segment: str, port_pair: typing.Union[str, typing.Tuple[str, typing.Type, bool]]): """ @@ -205,6 +206,7 @@ def add_segment_edge(self, * bool: If the type is a shared pointer (typically should be `False`) """ self._assert_not_built() + assert isinstance(egress_stage, BoundaryStageMixin), "Egress stage must be a BoundaryStageMixin" egress_edges = self._segments[egress_segment]["egress_ports"] egress_edges.append({ "port_pair": port_pair, @@ -213,6 +215,7 @@ def add_segment_edge(self, "receiver_segment": ingress_segment }) + assert isinstance(ingress_stage, BoundaryStageMixin), "Ingress stage must be a BoundaryStageMixin" ingress_edges = self._segments[ingress_segment]["ingress_ports"] ingress_edges.append({ "port_pair": port_pair, diff --git a/morpheus/stages/boundary/linear_boundary_stage.py b/morpheus/stages/boundary/linear_boundary_stage.py index 14d1db1858..ad8db9ebc2 100644 --- a/morpheus/stages/boundary/linear_boundary_stage.py +++ b/morpheus/stages/boundary/linear_boundary_stage.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ from mrc.core import operators as ops from morpheus.config import Config +from morpheus.pipeline.boundary_stage_mixin import BoundaryStageMixin from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource @@ -28,7 +29,7 @@ logger = logging.getLogger(__name__) -class LinearBoundaryEgressStage(PassThruTypeMixin, SinglePortStage): +class LinearBoundaryEgressStage(BoundaryStageMixin, PassThruTypeMixin, SinglePortStage): """ The LinearBoundaryEgressStage acts as an egress point from one linear segment to another. Given an existing linear pipeline that we want to connect to another segment, a linear boundary egress stage would be added, in conjunction @@ -81,7 +82,7 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> return input_node -class LinearBoundaryIngressStage(PreallocatorMixin, SingleOutputSource): +class LinearBoundaryIngressStage(BoundaryStageMixin, PreallocatorMixin, SingleOutputSource): """ The LinearBoundaryIngressStage acts as source ingress point from a corresponding egress in another linear segment. Given an existing linear pipeline that we want to connect to another segment, a linear boundary egress stage would diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 40ca0b9612..1d93dc22c0 100755 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -26,8 +26,13 @@ from _utils.stages.multi_message_pass_thru import MultiMessagePassThruStage from _utils.stages.multi_port_pass_thru import MultiPortPassThruStage from morpheus.config import Config +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta +from morpheus.messages import MultiMessage from morpheus.pipeline import LinearPipeline from morpheus.pipeline import Pipeline +from morpheus.stages.boundary.linear_boundary_stage import LinearBoundaryEgressStage +from morpheus.stages.boundary.linear_boundary_stage import LinearBoundaryIngressStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage @@ -195,3 +200,58 @@ def test_add_edge_input_port_errors(config: Config, num_inputs: int): with pytest.raises(AssertionError): pipe.add_edge(start_stage.output_ports[0], end_stage) + + +@pytest.mark.parametrize("data_type", [int, float, str, MessageMeta, ControlMessage, MultiMessage]) +def test_add_segment_edge(config: Config, data_type: type): + pipe = Pipeline(config) + + boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=data_type) + boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=data_type) + + pipe.add_stage(boundary_egress, "seg_1") + pipe.add_stage(boundary_ingress, "seg_2") + pipe.add_segment_edge(boundary_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False)) + + +def test_add_segment_edge_assert_not_built(config: Config): + pipe = Pipeline(config) + + src_stage = InMemSourceXStage(config, data=list(range(3))) + boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=int) + boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=int) + + pipe.add_stage(src_stage, "seg_1") + pipe.add_stage(boundary_egress, "seg_1") + pipe.add_edge(src_stage, boundary_egress, "seg_1") + pipe.add_stage(boundary_ingress, "seg_2") + pipe.build() + + with pytest.raises(AssertionError): + pipe.add_segment_edge(boundary_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False)) + + +def test_add_segment_edge_bad_egress(config: Config): + pipe = Pipeline(config) + + bad_egress = InMemorySinkStage(config) + boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=int) + + pipe.add_stage(bad_egress, "seg_1") + pipe.add_stage(boundary_ingress, "seg_2") + + with pytest.raises(AssertionError): + pipe.add_segment_edge(bad_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False)) + + +def test_add_segment_edge_bad_ingress(config: Config): + pipe = Pipeline(config) + + boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=int) + bad_ingress = InMemSourceXStage(config, data=list(range(3))) + + pipe.add_stage(boundary_egress, "seg_1") + pipe.add_stage(bad_ingress, "seg_2") + + with pytest.raises(AssertionError): + pipe.add_segment_edge(boundary_egress, "seg_1", bad_ingress, "seg_2", ("seg_1", object, False))