Skip to content

Commit

Permalink
Create a base mixin class for ingress & egress stages (#1473)
Browse files Browse the repository at this point in the history
* Creates a new `BoundaryStageMixin` class 
* Allowing `LinearBoundaryEgressStage` and `LinearBoundaryIngressStage` to share a common class and be distinguished from other stages

Closes #638 

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Devin Robison (https://github.com/drobison00)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1473
  • Loading branch information
dagardner-nv authored Jan 22, 2024
1 parent 04389b8 commit 1b52ab8
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 7 deletions.
22 changes: 22 additions & 0 deletions morpheus/pipeline/boundary_stage_mixin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC


class BoundaryStageMixin(ABC):
"""
Mixin intended to be added to both ingress and egress boundary stages, currently this only adds the ability to
identify boundary stages.
"""
9 changes: 6 additions & 3 deletions morpheus/pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
from tqdm import tqdm

from morpheus.config import Config
from morpheus.pipeline.boundary_stage_mixin import BoundaryStageMixin
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.receiver import Receiver
from morpheus.pipeline.sender import Sender
Expand Down Expand Up @@ -175,9 +176,9 @@ def add_edge(self,
end_port_idx=end_port.port_number)

def add_segment_edge(self,
egress_stage: Stage,
egress_stage: BoundaryStageMixin,
egress_segment: str,
ingress_stage: Stage,
ingress_stage: BoundaryStageMixin,
ingress_segment: str,
port_pair: typing.Union[str, typing.Tuple[str, typing.Type, bool]]):
"""
Expand Down Expand Up @@ -205,6 +206,7 @@ def add_segment_edge(self,
* bool: If the type is a shared pointer (typically should be `False`)
"""
self._assert_not_built()
assert isinstance(egress_stage, BoundaryStageMixin), "Egress stage must be a BoundaryStageMixin"
egress_edges = self._segments[egress_segment]["egress_ports"]
egress_edges.append({
"port_pair": port_pair,
Expand All @@ -213,6 +215,7 @@ def add_segment_edge(self,
"receiver_segment": ingress_segment
})

assert isinstance(ingress_stage, BoundaryStageMixin), "Ingress stage must be a BoundaryStageMixin"
ingress_edges = self._segments[ingress_segment]["ingress_ports"]
ingress_edges.append({
"port_pair": port_pair,
Expand Down
7 changes: 4 additions & 3 deletions morpheus/stages/boundary/linear_boundary_stage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
from mrc.core import operators as ops

from morpheus.config import Config
from morpheus.pipeline.boundary_stage_mixin import BoundaryStageMixin
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
Expand All @@ -28,7 +29,7 @@
logger = logging.getLogger(__name__)


class LinearBoundaryEgressStage(PassThruTypeMixin, SinglePortStage):
class LinearBoundaryEgressStage(BoundaryStageMixin, PassThruTypeMixin, SinglePortStage):
"""
The LinearBoundaryEgressStage acts as an egress point from one linear segment to another. Given an existing linear
pipeline that we want to connect to another segment, a linear boundary egress stage would be added, in conjunction
Expand Down Expand Up @@ -81,7 +82,7 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) ->
return input_node


class LinearBoundaryIngressStage(PreallocatorMixin, SingleOutputSource):
class LinearBoundaryIngressStage(BoundaryStageMixin, PreallocatorMixin, SingleOutputSource):
"""
The LinearBoundaryIngressStage acts as source ingress point from a corresponding egress in another linear segment.
Given an existing linear pipeline that we want to connect to another segment, a linear boundary egress stage would
Expand Down
62 changes: 61 additions & 1 deletion tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -26,8 +26,13 @@
from _utils.stages.multi_message_pass_thru import MultiMessagePassThruStage
from _utils.stages.multi_port_pass_thru import MultiPortPassThruStage
from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.messages import MultiMessage
from morpheus.pipeline import LinearPipeline
from morpheus.pipeline import Pipeline
from morpheus.stages.boundary.linear_boundary_stage import LinearBoundaryEgressStage
from morpheus.stages.boundary.linear_boundary_stage import LinearBoundaryIngressStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
Expand Down Expand Up @@ -195,3 +200,58 @@ def test_add_edge_input_port_errors(config: Config, num_inputs: int):

with pytest.raises(AssertionError):
pipe.add_edge(start_stage.output_ports[0], end_stage)


@pytest.mark.parametrize("data_type", [int, float, str, MessageMeta, ControlMessage, MultiMessage])
def test_add_segment_edge(config: Config, data_type: type):
pipe = Pipeline(config)

boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=data_type)
boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=data_type)

pipe.add_stage(boundary_egress, "seg_1")
pipe.add_stage(boundary_ingress, "seg_2")
pipe.add_segment_edge(boundary_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False))


def test_add_segment_edge_assert_not_built(config: Config):
pipe = Pipeline(config)

src_stage = InMemSourceXStage(config, data=list(range(3)))
boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=int)
boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=int)

pipe.add_stage(src_stage, "seg_1")
pipe.add_stage(boundary_egress, "seg_1")
pipe.add_edge(src_stage, boundary_egress, "seg_1")
pipe.add_stage(boundary_ingress, "seg_2")
pipe.build()

with pytest.raises(AssertionError):
pipe.add_segment_edge(boundary_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False))


def test_add_segment_edge_bad_egress(config: Config):
pipe = Pipeline(config)

bad_egress = InMemorySinkStage(config)
boundary_ingress = LinearBoundaryIngressStage(config, boundary_port_id="seg_1", data_type=int)

pipe.add_stage(bad_egress, "seg_1")
pipe.add_stage(boundary_ingress, "seg_2")

with pytest.raises(AssertionError):
pipe.add_segment_edge(bad_egress, "seg_1", boundary_ingress, "seg_2", ("seg_1", object, False))


def test_add_segment_edge_bad_ingress(config: Config):
pipe = Pipeline(config)

boundary_egress = LinearBoundaryEgressStage(config, boundary_port_id="seg_1", data_type=int)
bad_ingress = InMemSourceXStage(config, data=list(range(3)))

pipe.add_stage(boundary_egress, "seg_1")
pipe.add_stage(bad_ingress, "seg_2")

with pytest.raises(AssertionError):
pipe.add_segment_edge(boundary_egress, "seg_1", bad_ingress, "seg_2", ("seg_1", object, False))

0 comments on commit 1b52ab8

Please sign in to comment.