Skip to content

Commit

Permalink
Merge branch 'branch-24.06' into david-group-by-column_stage
Browse files Browse the repository at this point in the history
  • Loading branch information
dagardner-nv authored Jun 3, 2024
2 parents 001bd7d + d9e6474 commit a2365c1
Show file tree
Hide file tree
Showing 25 changed files with 511 additions and 109 deletions.
4 changes: 2 additions & 2 deletions docs/source/developer_guide/guides.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ in both Python and C++.
- [Simple C++ Stage](./guides/3_simple_cpp_stage.md)
- [Creating a C++ Source Stage](./guides/4_source_cpp_stage.md)

> **Note**: The code for the above guides can be found in the `examples/developer_guide` directory of the Morpheus repository. To build the C++ examples, pass `-DMORPHEUS_BUILD_EXAMPLES=ON -DMORPHEUS_PYTHON_PERFORM_INSTALL=ON` to CMake when building Morpheus. Users building Morpheus with the provided `scripts/compile.sh` script can do do by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable:
> **Note**: The code for the above guides can be found in the `examples/developer_guide` directory of the Morpheus repository. To build the C++ examples, pass `-DMORPHEUS_BUILD_EXAMPLES=ON` to CMake when building Morpheus. Users building Morpheus with the provided `scripts/compile.sh` script can do do by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable:
> ```bash
> CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON -DMORPHEUS_PYTHON_PERFORM_INSTALL=ON" ./scripts/compile.sh
> CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh
> ```
## Morpheus Modules
Expand Down
6 changes: 3 additions & 3 deletions docs/source/developer_guide/guides/3_simple_cpp_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.

# Simple C++ Stage
## Building the Example
The code for this guide can be found in the `examples/developer_guide/3_simple_cpp_stage` directory of the Morpheus repository. There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` and `-DMORPHEUS_PYTHON_PERFORM_INSTALL=ON` flags to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable:
The code for this guide can be found in the `examples/developer_guide/3_simple_cpp_stage` directory of the Morpheus repository. There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` flag to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable:
```bash
CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON -DMORPHEUS_PYTHON_PERFORM_INSTALL=ON" ./scripts/compile.sh
CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh
```

The second method is to build the example as a standalone project. From the root of the Morpheus repo execute:
Expand Down Expand Up @@ -122,7 +122,7 @@ We explicitly set the visibility for the stage object to default by importing:
```cpp
#include <morpheus/export.h>
```
Then adding `MORPHEUS_EXPORT`, which is defined in `/build/autogenerated/include/morpheus/export.h` and is compiler agnostic, to the definition of the stage object.
Then adding `MORPHEUS_EXPORT`, which is defined in `/build/autogenerated/include/morpheus/export.h` and is compiler agnostic, to the definition of the stage object.
This is due to a pybind11 requirement for module implementations to default symbol visibility to hidden (`-fvisibility=hidden`). More details about this can be found in the [pybind11 documentation](https://pybind11.readthedocs.io/en/stable/faq.html#someclass-declared-with-greater-visibility-than-the-type-of-its-field-someclass-member-wattributes).
Any object, struct, or function that is intended to be exported should have `MORPHEUS_EXPORT` included in the definition.

Expand Down
4 changes: 2 additions & 2 deletions docs/source/developer_guide/guides/4_source_cpp_stage.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.

# Creating a C++ Source Stage
## Building the Example
The code for this guide can be found in the `examples/developer_guide/4_rabbitmq_cpp_stage` directory of the Morpheus repository. There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` and `-DMORPHEUS_PYTHON_PERFORM_INSTALL=ON` flags to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable:
The code for this guide can be found in the `examples/developer_guide/4_rabbitmq_cpp_stage` directory of the Morpheus repository. There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` flag to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable:
```bash
CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON -DMORPHEUS_PYTHON_PERFORM_INSTALL=ON" ./scripts/compile.sh
CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh
```

The second method is to build the example as a standalone project. From the root of the Morpheus repo execute:
Expand Down
21 changes: 21 additions & 0 deletions docs/source/developer_guide/guides/9_control_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,24 @@ retrieved_payload = msg.payload()

msg_meta == retrieved_payload # True
```

### Conversion from `MultiMessage` to `ControlMessage`

Starting with version 24.06, the `MultiMessage` type will be deprecated, and all usage should transition to `ControlMessage`. Each `MultiMessage` functionality has a corresponding equivalent in `ControlMessage`, as illustrated below.
```python
import cudf
from morpheus.messages import MultiMessage, ControlMessage

data = cudf.DataFrame()
msg_meta = MessageMeta(data)
```

| **Functionality** | **MultiMessage** | **ControlMessage** |
| -------------------------------------------------------------- | ------------------------------------- | ------------------------------------------------------------------- |
| Initialization | `multi_msg = MultiMessage(msg_meta)` | `control_msg = ControlMessage()`<br>`control_msg.payload(msg_meta)` |
| Get columns from `cudf.DataFrame` | `multi_msg.get_meta(col_name)` | `control_msg.payload().get_data(col_name)` |
| Set columns values to `cudf.DataFrame` | `multi_msg.set_meta(col_name, value)` | `control_msg.payload().set_data(col_name, value)` |
| Get sliced `cudf.DataFrame` for given start and stop positions | `multi_msg.get_slice(start, stop)` | `control_msg.payload().get_slice(start, stop)` |
| Copy the `cudf.DataFrame` for given ranges of rows | `multi_msg.copy_ranges(ranges)` | `control_msg.payload().copy_ranges(ranges)` |

Note that the `get_slice()` and `copy_ranges()` functions in `ControlMessage` return the `MessageMeta` after slicing, whereas these functions in `MultiMessage` return a new `MultiMessage` instance.
4 changes: 2 additions & 2 deletions examples/developer_guide/4_rabbitmq_cpp_stage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pip install -r examples/developer_guide/4_rabbitmq_cpp_stage/requirements.txt
```

## Building the Example
There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` and `-DMORPHEUS_PYTHON_PERFORM_INSTALL=ON` flags to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable:
There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` flag to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable:
```bash
CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON -DMORPHEUS_PYTHON_PERFORM_INSTALL=ON" ./scripts/compile.sh
CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh
```

The second is to build the example as a standalone project. From the root of the Morpheus repo execute:
Expand Down
14 changes: 8 additions & 6 deletions morpheus/cli/register_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,7 @@ def set_options_param_type(options_kwargs: dict, annotation, doc_type: str):
if (is_union_type(annotation)):
raise RuntimeError("Union types are not supported for auto registering stages.")

if (issubtype(annotation, typing.List)):
# For variable length array, use multiple=True
options_kwargs["multiple"] = True
options_kwargs["type"] = get_args(annotation)[0]

elif (issubtype(annotation, pathlib.Path)):
if (issubtype(annotation, pathlib.Path)):
# For paths, use the Path option and apply any kwargs
options_kwargs["type"] = partial_pop_kwargs(click.Path, doc_type_kwargs)()

Expand Down Expand Up @@ -216,6 +211,13 @@ def set_options_param_type(options_kwargs: dict, annotation, doc_type: str):
options_kwargs["type"] = click.Tuple([str, str])
options_kwargs["callback"] = lambda ctx, param, value: dict(value)

elif (issubtype(annotation, typing.List) or issubtype(annotation, typing.Tuple)
or issubtype(annotation, typing.Iterable)) and not (issubtype(annotation, str)):

# For variable length array, use multiple=True
options_kwargs["multiple"] = True
options_kwargs["type"] = get_args(annotation)[0]

else:
options_kwargs["type"] = annotation

Expand Down
27 changes: 22 additions & 5 deletions morpheus/llm/nodes/langchain_agent_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,35 @@ def __init__(self, agent_executor: "AgentExecutor"):
def get_input_names(self):
return self._input_names

async def _run_single(self, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]:
@staticmethod
def _is_all_lists(data: dict[str, typing.Any]) -> bool:
return all(isinstance(v, list) for v in data.values())

all_lists = all(isinstance(v, list) for v in kwargs.values())
@staticmethod
def _transform_dict_of_lists(data: dict[str, typing.Any]) -> list[dict[str, typing.Any]]:
return [dict(zip(data, t)) for t in zip(*data.values())]

async def _run_single(self, metadata: dict[str, typing.Any] = None, **kwargs) -> dict[str, typing.Any]:

all_lists = self._is_all_lists(kwargs)

# Check if all values are a list
if all_lists:

# Transform from dict[str, list[Any]] to list[dict[str, Any]]
input_list = [dict(zip(kwargs, t)) for t in zip(*kwargs.values())]
input_list = self._transform_dict_of_lists(kwargs)

# If all metadata values are lists of the same length and the same length as the input list
# then transform them the same way as the input list
if (metadata is not None and self._is_all_lists(metadata)
and all(len(v) == len(input_list) for v in metadata.values())):
metadata_list = self._transform_dict_of_lists(metadata)

else:
metadata_list = [metadata] * len(input_list)

# Run multiple again
results_async = [self._run_single(**x) for x in input_list]
results_async = [self._run_single(metadata=metadata_list[i], **x) for (i, x) in enumerate(input_list)]

results = await asyncio.gather(*results_async, return_exceptions=True)

Expand All @@ -67,7 +84,7 @@ async def _run_single(self, **kwargs: dict[str, typing.Any]) -> dict[str, typing

# We are not dealing with a list, so run single
try:
return await self._agent_executor.arun(**kwargs)
return await self._agent_executor.arun(metadata=metadata, **kwargs)
except Exception as e:
logger.exception("Error running agent: %s", e)
return e
Expand Down
2 changes: 1 addition & 1 deletion morpheus/messages/memory/response_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ResponseMemory(TensorMemory, cpp_class=_messages.ResponseMemory):
"""Output memory block holding the results of inference."""

def __new__(cls, *args, **kwargs):
morpheus_logger.deprecated_message_warning(logger, cls, TensorMemory)
morpheus_logger.deprecated_message_warning(cls, TensorMemory)
return super().__new__(cls, *args, **kwargs)

def get_output(self, name: str):
Expand Down
9 changes: 9 additions & 0 deletions morpheus/messages/message_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
import functools
import typing

from typing_utils import issubtype

from morpheus import messages
from morpheus.config import CppConfig
from morpheus.messages import ControlMessage
from morpheus.utils import logger as morpheus_logger


class MessageImpl(abc.ABCMeta):
Expand All @@ -44,6 +49,10 @@ def __new__(cls, name, bases, namespace, /, cpp_class=None, **kwargs):
@functools.wraps(result.__new__)
def _internal_new(other_cls, *args, **kwargs):

# Instantiating MultiMessage and its subclasses from Python or C++ will generate a deprecation warning
if issubtype(other_cls, messages.MultiMessage):
morpheus_logger.deprecated_message_warning(other_cls, ControlMessage)

# If _cpp_class is set, and use_cpp is enabled, create the C++ instance
if (getattr(other_cls, "_cpp_class", None) is not None and CppConfig.get_should_use_cpp()):
return cpp_class(*args, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion morpheus/messages/multi_response_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class MultiResponseProbsMessage(MultiResponseMessage, cpp_class=_messages.MultiR
required_tensors: typing.ClassVar[typing.List[str]] = ["probs"]

def __new__(cls, *args, **kwargs):
morpheus_logger.deprecated_message_warning(logger, cls, MultiResponseMessage)
morpheus_logger.deprecated_message_warning(cls, MultiResponseMessage)
return super(MultiResponseMessage, cls).__new__(cls, *args, **kwargs)

def __init__(self,
Expand Down
18 changes: 12 additions & 6 deletions morpheus/stages/input/http_client_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class HttpClientSourceStage(PreallocatorMixin, SingleOutputSource):
Number of seconds to wait for the server to send data before giving up and raising an exception.
max_errors : int, default 10
Maximum number of consequtive errors to receive before raising an error.
accept_status_codes : typing.List[HTTPStatus], optional, multiple = True
List of status codes to accept. If the response status code is not in this tuple, then the request will be
accept_status_codes : typing.Iterable[int], optional, multiple = True
List of status codes to accept. If the response status code is not in this collection, then the request will be
considered an error
max_retries : int, default 10
Maximum number of times to retry the request fails, receives a redirect or returns a status in the
Expand All @@ -80,6 +80,9 @@ class HttpClientSourceStage(PreallocatorMixin, SingleOutputSource):
to contain a JSON objects separated by end-of-line characters.
stop_after : int, default 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
payload_to_df_fn : callable, default None
A callable that takes the HTTP payload bytes as the first argument and the `lines` parameter is passed in as
the second argument and returns a cudf.DataFrame. If unset cudf.read_json is used.
**request_kwargs : dict
Additional arguments to pass to the `requests.request` function.
"""
Expand All @@ -94,10 +97,11 @@ def __init__(self,
error_sleep_time: float = 0.1,
respect_retry_after_header: bool = True,
request_timeout_secs: int = 30,
accept_status_codes: typing.List[HTTPStatus] = (HTTPStatus.OK, ),
accept_status_codes: typing.Iterable[int] = (HTTPStatus.OK, ),
max_retries: int = 10,
lines: bool = False,
stop_after: int = 0,
payload_to_df_fn: typing.Callable[[bytes, bool], cudf.DataFrame] = None,
**request_kwargs):
super().__init__(config)
self._url = http_utils.prepare_url(url)
Expand Down Expand Up @@ -135,6 +139,7 @@ def __init__(self,

self._stop_after = stop_after
self._lines = lines
self._payload_to_df_fn = payload_to_df_fn
self._requst_kwargs = request_kwargs

@property
Expand All @@ -154,10 +159,11 @@ def _parse_response(self, response: requests.Response) -> typing.Union[cudf.Data
Returns a DataFrame parsed from the response payload. If the response payload is empty, then `None` is returned.
"""
payload = response.content
if len(payload) > 2: # work-around for https://github.com/rapidsai/cudf/issues/5712
return cudf.read_json(payload, lines=self._lines, engine='cudf')

return None
if self._payload_to_df_fn is not None:
return self._payload_to_df_fn(payload, self._lines)

return cudf.read_json(payload, lines=self._lines, engine='cudf')

def _generate_frames(self) -> typing.Iterator[MessageMeta]:
# Running counter of the number of messages emitted by this source
Expand Down
18 changes: 14 additions & 4 deletions morpheus/stages/input/http_server_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class HttpServerSourceStage(PreallocatorMixin, SingleOutputSource):
expect each request to be a JSON object per line.
stop_after : int, default 0
Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0`
payload_to_df_fn : callable, default None
A callable that takes the HTTP payload string as the first argument and the `lines` parameter is passed in as
the second argument and returns a cudf.DataFrame. When supplied, the C++ implementation of this stage is
disabled, and the Python impl is used.
"""

def __init__(self,
Expand All @@ -93,7 +97,8 @@ def __init__(self,
max_payload_size: int = 10,
request_timeout_secs: int = 30,
lines: bool = False,
stop_after: int = 0):
stop_after: int = 0,
payload_to_df_fn: typing.Callable[[str, bool], cudf.DataFrame] = None):
super().__init__(config)
self._bind_address = bind_address
self._port = port
Expand All @@ -108,6 +113,7 @@ def __init__(self,
self._request_timeout_secs = request_timeout_secs
self._lines = lines
self._stop_after = stop_after
self._payload_to_df_fn = payload_to_df_fn

# These are only used when C++ mode is disabled
self._queue = None
Expand All @@ -134,8 +140,12 @@ def compute_schema(self, schema: StageSchema):

def _parse_payload(self, payload: str) -> HttpParseResponse:
try:
# engine='cudf' is needed when lines=False to avoid using pandas
df = cudf.read_json(payload, lines=self._lines, engine='cudf')
if self._payload_to_df_fn is not None:
df = self._payload_to_df_fn(payload, self._lines)
else:
# engine='cudf' is needed when lines=False to avoid using pandas
df = cudf.read_json(payload, lines=self._lines, engine='cudf')

except Exception as e:
err_msg = "Error occurred converting HTTP payload to Dataframe"
logger.error("%s: %s", err_msg, e)
Expand Down Expand Up @@ -206,7 +216,7 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]:
self._processing = False

def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
if self._build_cpp_node():
if self._build_cpp_node() and self._payload_to_df_fn is None:
import morpheus._lib.stages as _stages
node = _stages.HttpServerSourceStage(builder,
self.unique_name,
Expand Down
Loading

0 comments on commit a2365c1

Please sign in to comment.