Skip to content

Commit

Permalink
Removing support for MultiMessage from stages (#1803)
Browse files Browse the repository at this point in the history
Part of finalizing `MultiMessage` deprecation. Previously the stages support processing both `ControlMessage` and `MultiMessage` - this PR removes the code paths for `MultiMessage` from the following stages (from both C++ & Python implementation):

- Pre-process stages:
  - `deserialize_stage`
  - `preprocess_base_stage`
  - `preprocess_ae_stage`
  - `preprocess_fil_stage`
  - `preprocess_nlp_stage`
  - `train_ae_stage`

- Post-process stages:
  - `serialize_stage`
  - `add_scores_stage_base_stage`
  - `add_scores_stage`
  - `add_classification_stage`
  - `filter_detections_stage`
  - `generate_viz_frames_stage`
  - `ml_flow_drift_stage`
  - `time_series_stage`
  - `validation_stage`

- Inference stages:
  - `auto_encoder_inference_stage`
  - `identity_inference_stage`
  - `inference_stage`
  - `pytorch_inference_stage`
  - `triton_inference_stage`

- Output stages:
  - `write_to_vector_db_stage`

The related unit tests are also updated to use only `ControlMessage`. 

Morpheus examples that imports stages/messages directly (rather than creating their own version) are also updated to get CI passed for this PR.

Closes #1887 

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Yuchen Zhang (https://github.com/yczhang-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - David Gardner (https://github.com/dagardner-nv)

URL: #1803
  • Loading branch information
yczhang-nv authored Sep 11, 2024
1 parent b8652f6 commit 1d02332
Show file tree
Hide file tree
Showing 180 changed files with 2,514 additions and 4,043 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/documentation_checks.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/download_kafka.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/gitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def add_files(*files_to_add):
@staticmethod
def get_file_add_date(file_path):
"""Return the date a given file was added to git"""
date_str = _git("log", "--follow", "--format=%as", "--", file_path, "|", "tail", "-n 1")
date_str = _run_cmd(f"git log --follow --format=%as -- {file_path} | tail -n 1")
return datetime.datetime.strptime(date_str, "%Y-%m-%d")

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion docs/source/devcontainer.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
25 changes: 16 additions & 9 deletions docs/source/developer_guide/guides/9_control_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ msg_meta == retrieved_payload # True

### Conversion from `MultiMessage` to `ControlMessage`

Starting with version 24.06, the `MultiMessage` type will be deprecated, and all usage should transition to `ControlMessage`. Each `MultiMessage` functionality has a corresponding equivalent in `ControlMessage`, as illustrated below.
**The `MultiMessage` type was deprecated in 24.06 and has been completely removed in version 24.10.**

When upgrading to 24.10, all uses of `MultiMessage` need to be converted to `ControlMessage`. Each `MultiMessage` functionality has a corresponding equivalent in `ControlMessage`, as illustrated below.
```python
import cudf
from morpheus.messages import MultiMessage, ControlMessage
Expand All @@ -88,12 +90,17 @@ data = cudf.DataFrame()
msg_meta = MessageMeta(data)
```

| **Functionality** | **MultiMessage** | **ControlMessage** |
| -------------------------------------------------------------- | ------------------------------------- | ------------------------------------------------------------------- |
| Initialization | `multi_msg = MultiMessage(msg_meta)` | `control_msg = ControlMessage()`<br>`control_msg.payload(msg_meta)` |
| Get columns from `cudf.DataFrame` | `multi_msg.get_meta(col_name)` | `control_msg.payload().get_data(col_name)` |
| Set columns values to `cudf.DataFrame` | `multi_msg.set_meta(col_name, value)` | `control_msg.payload().set_data(col_name, value)` |
| Get sliced `cudf.DataFrame` for given start and stop positions | `multi_msg.get_slice(start, stop)` | `control_msg.payload().get_slice(start, stop)` |
| Copy the `cudf.DataFrame` for given ranges of rows | `multi_msg.copy_ranges(ranges)` | `control_msg.payload().copy_ranges(ranges)` |
| **Functionality** | **MultiMessage** | **ControlMessage** |
| -------------------------------------------------------------- | ------------------------------------------ | ------------------------------------------------------------------- |
| Initialization | `multi_msg = MultiMessage(msg_meta)` | `control_msg = ControlMessage()`<br>`control_msg.payload(msg_meta)` |
| Get `cudf.DataFrame` | `multi_msg.get_meta()` | `control_msg.payload().get_data()` |
| Get columns from `cudf.DataFrame` | `multi_msg.get_meta(col_name)` | `control_msg.payload().get_data(col_name)` |
| Set columns values to `cudf.DataFrame` | `multi_msg.set_meta(col_name, value)` | `control_msg.payload().set_data(col_name, value)` |
| Get sliced `cudf.DataFrame` for given start and stop positions | `multi_msg.get_slice(start, stop)` | `control_msg.payload().get_slice(start, stop)` |
| Copy the `cudf.DataFrame` for given ranges of rows | `multi_msg.copy_ranges(ranges)` | `control_msg.payload().copy_ranges(ranges)` |
| | **MultiTensorMessage** | **ControlMessage** |
| Get the inference tensor `cupy.ndarray` | `multi_tensor_msg.tensor()` | `control_msg.tensors()` |
| Get a specific inference tensor | `multi_tensor_msg.get_tensor(tensor_name)` | `control_msg.tensors().get_tensor(tensor_name)` |


Note that the `get_slice()` and `copy_ranges()` functions in `ControlMessage` return the `MessageMeta` after slicing, whereas these functions in `MultiMessage` return a new `MultiMessage` instance.
Note that the `get_slice()` and `copy_ranges()` functions in `ControlMessage` return the `MessageMeta` after slicing, whereas these functions in `MultiMessage` return a new `MultiMessage` instance.
2 changes: 1 addition & 1 deletion docs/source/examples/abp_nvsmi_detection/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/abp_pcap_detection/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/doca/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/llm/agents/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/llm/completion/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/llm/rag/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/llm/vdb_upload/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/log_parsing/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/nlp_si_detection/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/ransomware_detection/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/root_cause_analysis/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion docs/source/examples/sid_visualization/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
36 changes: 19 additions & 17 deletions examples/abp_pcap_detection/abp_pcap_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@

import cudf

import morpheus._lib.messages as _messages
from morpheus.cli.register_stage import register_stage
from morpheus.common import TypeId
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import InferenceMemoryFIL
from morpheus.messages import MultiInferenceFILMessage
from morpheus.messages import MultiInferenceMessage
from morpheus.messages import MultiMessage
from morpheus.messages import ControlMessage
from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage


Expand Down Expand Up @@ -81,20 +79,22 @@ def supports_cpp_node(self):
return False

@staticmethod
def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str],
req_cols: typing.List[str]) -> MultiInferenceFILMessage:
def pre_process_batch(msg: ControlMessage, fea_len: int, fea_cols: typing.List[str],
req_cols: typing.List[str]) -> ControlMessage:
meta = msg.payload()
# Converts the int flags field into a binary string
flags_bin_series = x.get_meta("flags").to_pandas().apply(lambda x: format(int(x), "05b"))
flags_bin_series = meta.get_data("flags").to_pandas().apply(lambda x: format(int(x), "05b"))

# Expand binary string into an array
df = cudf.DataFrame(np.vstack(flags_bin_series.str.findall("[0-1]")).astype("int8"), index=x.get_meta().index)
df = cudf.DataFrame(np.vstack(flags_bin_series.str.findall("[0-1]")).astype("int8"),
index=meta.get_data().index)

# adding [ack, psh, rst, syn, fin] details from the binary flag
rename_cols_dct = {0: "ack", 1: "psh", 2: "rst", 3: "syn", 4: "fin"}
df = df.rename(columns=rename_cols_dct)

df["flags_bin"] = flags_bin_series
df["timestamp"] = x.get_meta("timestamp").astype("int64")
df["timestamp"] = meta.get_data("timestamp").astype("int64")

def round_time_kernel(timestamp, rollup_time, secs):
for i, time in enumerate(timestamp):
Expand All @@ -113,8 +113,8 @@ def round_time_kernel(timestamp, rollup_time, secs):
df["rollup_time"] = cudf.to_datetime(df["rollup_time"], unit="us").dt.strftime("%Y-%m-%d %H:%M")

# creating flow_id "src_ip:src_port=dst_ip:dst_port"
df["flow_id"] = (x.get_meta("src_ip") + ":" + x.get_meta("src_port").astype("str") + "=" +
x.get_meta("dest_ip") + ":" + x.get_meta("dest_port").astype("str"))
df["flow_id"] = (meta.get_data("src_ip") + ":" + meta.get_data("src_port").astype("str") + "=" +
meta.get_data("dest_ip") + ":" + meta.get_data("dest_port").astype("str"))
agg_dict = {
"ack": "sum",
"psh": "sum",
Expand All @@ -125,7 +125,7 @@ def round_time_kernel(timestamp, rollup_time, secs):
"flow_id": "count",
}

df["data_len"] = x.get_meta("data_len").astype("int16")
df["data_len"] = meta.get_data("data_len").astype("int16")

# group by operation
grouped_df = df.groupby(["rollup_time", "flow_id"]).agg(agg_dict)
Expand Down Expand Up @@ -175,22 +175,24 @@ def round_time_kernel(timestamp, rollup_time, secs):
count = data.shape[0]

for col in req_cols:
x.set_meta(col, merged_df[col])
meta.set_data(col, merged_df[col])

del merged_df

seq_ids = cp.zeros((count, 3), dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(x.mess_offset, x.mess_offset + count, dtype=cp.uint32)
seq_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32)
seq_ids[:, 2] = fea_len - 1

# Create the inference memory. Keep in mind count here could be > than input count
memory = InferenceMemoryFIL(count=count, input__0=data, seq_ids=seq_ids)
memory = _messages.InferenceMemoryFIL(count=count, input__0=data, seq_ids=seq_ids)

infer_message = MultiInferenceFILMessage.from_message(x, memory=memory)
infer_message = ControlMessage(msg)
infer_message.payload(meta)
infer_message.tensors(memory)

return infer_message

def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMessage]:
def _get_preprocess_fn(self) -> typing.Callable[[ControlMessage], ControlMessage]:
return partial(AbpPcapPreprocessingStage.pre_process_batch,
fea_len=self._fea_length,
fea_cols=self.features,
Expand Down
6 changes: 2 additions & 4 deletions examples/llm/agents/common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,6 @@
from langchain.llms.openai import OpenAI

from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
Expand Down Expand Up @@ -67,8 +66,7 @@ def build_common_pipeline(config: Config, pipe: LinearPipeline, task_payload: di
Construct the elements of the pipeline common to the simple and kafka agent pipelines.
This method should be called after the source stage has been set.
"""
pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=task_payload))
pipe.add_stage(DeserializeStage(config, task_type="llm_engine", task_payload=task_payload))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

Expand Down
4 changes: 1 addition & 3 deletions examples/llm/completion/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
Expand Down Expand Up @@ -116,8 +115,7 @@ def pipeline(num_threads: int,

pipe.set_source(InMemorySourceStage(config, dataframes=[source_df], repeat=repeat_count))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))
pipe.add_stage(DeserializeStage(config, task_type="llm_engine", task_payload=completion_task))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

Expand Down
4 changes: 1 addition & 3 deletions examples/llm/rag/standalone_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
Expand Down Expand Up @@ -114,8 +113,7 @@ def standalone(num_threads,

pipe.set_source(InMemorySourceStage(config, dataframes=source_dfs, repeat=repeat_count))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))
pipe.add_stage(DeserializeStage(config, task_type="llm_engine", task_payload=completion_task))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

Expand Down
Loading

0 comments on commit 1d02332

Please sign in to comment.