Skip to content

Commit

Permalink
Implement MultiProcessingStage (#1878)
Browse files Browse the repository at this point in the history
Implemented `SharedProcessPool` and `MultiProcessingStage`.

**SharedProcessPool**
  - A singleton process pool that manages tasks from multiple stages, each with its own queue. Allows setting stage-specific process usage to cap the number of workers a stage can utilize.
  - The total number of workers is determined by the environment variable `SHARED_PROCESS_POOL_CPU_USAGE`, which is multiplied by the system’s available CPU cores (`os.sched_getaffinity(0)`). 
    - If the variable is not set, the default is 50% of available CPUs.
  - Accepts process function with arbitrary argument signatures.

**MultiProcessingStage**
  - A versatile base stage that uses SharedProcessPool to handle incoming messages in a multiprocessing manner.
  - Accepts processing functions with a single argument, capable of handling dynamic input and output types. The types are deducted at runtime based on the provided processing function.
  - Two ways to use the stage:
    - Extend `MultiProcessingBaseStage` and implement the data processing logic in the `_on_data()` method.
    - Use `MultiProcessingStage.create()` by passing in the processing function directly.
  - Able to run multiple instances of the stage in the same pipeline. 

Closes #1850 

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Yuchen Zhang (https://github.com/yczhang-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1878
  • Loading branch information
yczhang-nv authored Sep 26, 2024
1 parent 958cb7b commit fdfcbfc
Show file tree
Hide file tree
Showing 4 changed files with 1,187 additions and 0 deletions.
200 changes: 200 additions & 0 deletions python/morpheus/morpheus/stages/general/multi_processing_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import inspect
import typing
from abc import abstractmethod

import mrc
import mrc.core.operators as ops

from morpheus.config import Config
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.shared_process_pool import SharedProcessPool

InputT = typing.TypeVar('InputT')
OutputT = typing.TypeVar('OutputT')


class MultiProcessingBaseStage(SinglePortStage, typing.Generic[InputT, OutputT]):

def __init__(self, *, c: Config, process_pool_usage: float, max_in_flight_messages: int = None):
super().__init__(c=c)

if not 0 <= process_pool_usage <= 1:
raise ValueError("process_pool_usage must be between 0 and 1.")
self._process_pool_usage = process_pool_usage

self._shared_process_pool = SharedProcessPool()
self._shared_process_pool.wait_until_ready()

if max_in_flight_messages is None:
# set the multiplier to 1.5 to keep the workers busy
self._max_in_flight_messages = int(self._shared_process_pool.total_max_workers * 1.5)
else:
self._max_in_flight_messages = max_in_flight_messages

def accepted_types(self) -> typing.Tuple:
"""
There are two approaches to inherit from this class:
- With generic types: MultiProcessingDerivedStage(MultiProcessingBaseStage[InputT, OutputT])
- With concrete types: MultiProcessingDerivedStage(MultiProcessingBaseStage[int, str])
When inheriting with generic types, the derived class can be instantiated like this:
stage = MultiProcessingDerivedStage[int, str]()
In this case, typing.Generic stores the stage type in stage.__orig_class__, the concrete types can be accessed
as below:
input_type = typing.get_args(stage.__orig_class__)[0] # int
output_type = typing.get_args(stage.__orig_class__)[1] # str
However, when instantiating a stage which inherits with concrete types:
stage = MultiProcessingDerivedStage()
The stage instance does not have __orig_class__ attribute (since it is not a generic type). Thus, the concrete
types need be retrieved from its base class (which is a generic type):
input_type = typing.get_args(stage.__orig_bases__[0])[0] # int
output_type = typing.get_args(stage.__orig_bases__[0])[1] # str
Raises:
RuntimeError: if the accepted cannot be deducted from either __orig_class__ or __orig_bases__
Returns:
typing.Tuple: accepted input types
"""
if hasattr(self, "__orig_class__"):
# inherited with generic types
input_type = typing.get_args(self.__orig_class__)[0] # pylint: disable=no-member

elif hasattr(self, "__orig_bases__"):
# inherited with concrete types
input_type = typing.get_args(self.__orig_bases__[0])[0] # pylint: disable=no-member

else:
raise RuntimeError("Could not deduct input type")

return (input_type, )

def compute_schema(self, schema: StageSchema):
"""
See the comment on `accepted_types` for more information on accessing the input and output types.
Args:
schema (StageSchema): StageSchema
Raises:
RuntimeError: if the output type cannot be deducted from either __orig_class__ or __orig_bases__
"""
if hasattr(self, "__orig_class__"):
# inherited with abstract types
output_type = typing.get_args(self.__orig_class__)[1] # pylint: disable=no-member

elif hasattr(self, "__orig_bases__"):
# inherited with concrete types
output_type = typing.get_args(self.__orig_bases__[0])[1]

else:
raise RuntimeError("Could not deduct output type")

schema.output_schema.set_type(output_type)

def supports_cpp_node(self):
return False

@abstractmethod
def _on_data(self, data: InputT) -> OutputT:
pass

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.name, ops.map(self._on_data))
node.launch_options.pe_count = self._max_in_flight_messages

builder.make_edge(input_node, node)

return node


def _get_func_signature(func: typing.Callable[[InputT], OutputT]) -> tuple[type, type]:
signature = inspect.signature(func)

if isinstance(func, functools.partial):
# If the function is a partial, find the type of the first unbound argument
bound_args = func.keywords
input_arg = None

for param in signature.parameters.values():
if param.name not in bound_args:
if input_arg is not None:
raise ValueError("Found more than one unbound arguments in partial function")
input_arg = param

if input_arg is None:
raise ValueError("Cannot find unbound argument in partial function")
input_t = input_arg.annotation

else:
if len(signature.parameters) != 1:
raise ValueError("Function must have exactly one argument")

input_t = next(iter(signature.parameters.values())).annotation

output_t = signature.return_annotation

return (input_t, output_t)


class MultiProcessingStage(MultiProcessingBaseStage[InputT, OutputT]):

def __init__(self,
*,
c: Config,
unique_name: str,
process_fn: typing.Callable[[InputT], OutputT],
process_pool_usage: float,
max_in_flight_messages: int = None):
super().__init__(c=c, process_pool_usage=process_pool_usage, max_in_flight_messages=max_in_flight_messages)

self._name = unique_name
self._process_fn = process_fn
self._shared_process_pool.set_usage(self.name, self._process_pool_usage)

@property
def name(self) -> str:
return self._name

def _on_data(self, data: InputT) -> OutputT:
task = self._shared_process_pool.submit_task(self.name, self._process_fn, data)
result = task.result()

return result

@staticmethod
def create(*,
c: Config,
unique_name: str,
process_fn: typing.Callable[[InputT], OutputT],
process_pool_usage: float):

input_t, output_t = _get_func_signature(process_fn)
return MultiProcessingStage[input_t, output_t](c=c,
unique_name=unique_name,
process_pool_usage=process_pool_usage,
process_fn=process_fn)
Loading

0 comments on commit fdfcbfc

Please sign in to comment.