Skip to content

Commit

Permalink
Merge branch 'branch-24.06' of github.com:nv-morpheus/Morpheus into d…
Browse files Browse the repository at this point in the history
…avid-docker-compose-1744
  • Loading branch information
dagardner-nv committed Jun 24, 2024
2 parents f07277e + 00aa144 commit ed2ec5e
Show file tree
Hide file tree
Showing 17 changed files with 81 additions and 74 deletions.
2 changes: 1 addition & 1 deletion ci/runner/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ RUN rapids-dependency-file-generator \

ENV MORPHEUS_SUPPORT_DOCA=ON

COPY ./docker/optional_deps/doca.sh /tmp/doca/
COPY ./.devcontainer/docker/optional_deps/doca.sh /tmp/doca/

RUN apt update && \
DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ ARG MORPHEUS_SUPPORT_DOCA="FALSE"
ENV MORPHEUS_SUPPORT_DOCA=${MORPHEUS_SUPPORT_DOCA}

# Copy all of the optional dependency scripts
COPY ${MORPHEUS_ROOT_HOST}/docker/optional_deps docker/optional_deps
COPY ${MORPHEUS_ROOT_HOST}/.devcontainer/docker/optional_deps docker/optional_deps

# Install DOCA (If requested)
RUN --mount=type=cache,id=doca,target=/tmp/doca,sharing=locked \
Expand Down
1 change: 0 additions & 1 deletion docker/optional_deps/doca.sh

This file was deleted.

4 changes: 2 additions & 2 deletions examples/developer_guide/2_2_rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ In a second terminal from the root of the Morpheus repo execute:
python examples/developer_guide/2_2_rabbitmq/read_simple.py
```

This will read from a RabbitMQ exchange named 'logs', and write the results to `/tmp/results.json`.
This will read from a RabbitMQ exchange named 'logs', and write the results to `results.json`.

If no exchange named 'logs' exists in RabbitMQ it will be created. By default the `read_simple.py` script will utilize the class-based `RabbitMQSourceStage`, alternately using the `--use_source_function` flag will utilize the function-based `rabbitmq_source` stage.

Expand All @@ -64,7 +64,7 @@ morpheus --log_level=INFO --plugin examples/developer_guide/2_2_rabbitmq/rabbitm
run pipeline-other \
from-rabbitmq --host=localhost --exchange=logs \
monitor \
to-file --filename=/tmp/results.json --overwrite
to-file --filename=results.json --overwrite
```

### Write Pipeline
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/2_2_rabbitmq/read_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def run_pipeline(use_source_function: bool):
pipeline.add_stage(MonitorStage(config))

# Write the to the output file
pipeline.add_stage(WriteToFileStage(config, filename='/tmp/results.json', file_type=FileTypes.JSON, overwrite=True))
pipeline.add_stage(WriteToFileStage(config, filename='results.json', file_type=FileTypes.JSON, overwrite=True))

# Run the pipeline
pipeline.run()
Expand Down
2 changes: 1 addition & 1 deletion examples/developer_guide/4_rabbitmq_cpp_stage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ In a second terminal from the root of the Morpheus repo execute:
python examples/developer_guide/4_rabbitmq_cpp_stage/src/read_simple.py
```

This will read from a RabbitMQ exchange named 'logs', and write the results to `/tmp/results.json`.
This will read from a RabbitMQ exchange named 'logs', and write the results to `results.json`.

If no exchange named 'logs' exists in RabbitMQ it will be created.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def run_pipeline(use_cpp, num_threads):
pipeline.add_stage(MonitorStage(config))

# Write the to the output file
pipeline.add_stage(WriteToFileStage(config, filename='/tmp/results.json', file_type=FileTypes.JSON, overwrite=True))
pipeline.add_stage(WriteToFileStage(config, filename='results.json', file_type=FileTypes.JSON, overwrite=True))

# Run the pipeline
pipeline.run()
Expand Down
111 changes: 63 additions & 48 deletions examples/doca/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ See the License for the specific language governing permissions and
limitations under the License.
-->

# DOCA Sensitive Information Detection Example
# DOCA GPU Real-Time traffic analysis

Examples in this directory use the DOCA Source Stage to receive and pre-process network packets in real-time before passing packets info to the next Morphues stages.

## Obtaining the Morpheus DOCA Container
DOCA Support is in early access and may only be used via the Morpheus DOCA Container found in NGC. Please speak to your NVIDIA Morpheus contact for more information.

Expand All @@ -25,7 +28,17 @@ The container must be run in privileged mode and mount in hugepages as configure
docker run -v /dev/hugepages:/dev/hugepages --privileged --rm -ti --runtime=nvidia --net=host --gpus=all --cap-add=sys_nice ${MORPHEUS_DOCA_IMAGE} bash
```

# Preparing the environment

Prior to running the example, the `rdma-core` conda package needs to be _removed by force_ from the conda environment, otherwise the environment is incompatible with the DOCA-provided packages.
```
conda remove --force rdma-core
```

For more info about how to configure the machine, please refer to the [DOCA GPUNetIO programming guide](https://docs.nvidia.com/doca/sdk/doca+gpunetio/index.html).

## Finding the GPU and NIC PCIe Addresses

The DOCA example requires specifying the PCIe Address of both the GPU and NIC explicitly. Determining the correct GPU and NIC PCIe Addresses is non-trivial and requires coordinating with those who have configured the physical hardware and firmware according to the DOCA GPUNetIO documentation, but the following commands can help find a NIC and GPU situation on the same NUMA node.
```
$ lspci -tv | grep -E "NVIDIA|ella|(^\+)|(^\-)"
Expand Down Expand Up @@ -59,17 +72,61 @@ cf:00.0 3D controller: NVIDIA Corporation Device 20b9 (rev a1)
```
We can see the GPU's PCIe address is `cf:00.0`, and we can infer from the above commands that the nearest ConnectX-6 NIC's PCIe address is `cc:00.*`. In this case, we have port `1` physically connected to the network, so we use PCIe Address `cc:00.1`.

## Running the Example
The DOCA example is similar to the Sensitive Information Detection (SID) example in that it uses the `sid-minibert` model in conjunction with the `TritonInferenceStage` to detect sensitive information. The difference is that the sensitive information we will be detecting is obtained from a live TCP packet stream provided by a `DocaSourceStage`.

Prior to running the example, the `rdma-core` conda package needs to be _removed by force_ from the conda environment, otherwise the environment is incompatible with the DOCA-provided packages.
## Running the example for UDP traffic analysis

In case of UDP traffic, the sample will launch a simple pipeline with the DOCA Source Stage followed by a Monitor Stage to report number of received packets.

```
conda remove --force rdma-core
python3 ./examples/doca/run_udp_raw.py --nic_addr 17:00.1 --gpu_addr ca:00.0 --traffic_type udp
```
UDP traffic can be easily sent with nping to the interface where Morpheus is listening:
```
nping --udp -c 100000 -p 4100 192.168.2.27 --data-length 1024 --delay 0.1ms
```

Morpheus output would be:
```
====Pipeline Pre-build====
====Pre-Building Segment: linear_segment_0====
====Pre-Building Segment Complete!====
====Pipeline Pre-build Complete!====
====Registering Pipeline====
====Building Pipeline====
EAL: Detected CPU lcores: 64
EAL: Detected NUMA nodes: 2
EAL: Detected shared linkage of DPDK
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: VFIO support initialized
TELEMETRY: No legacy callbacks, legacy socket not created
EAL: Probe PCI driver: mlx5_pci (15b3:a2dc) device: 0000:ca:00.0 (socket 1)
EAL: Probe PCI driver: gpu_cuda (10de:2331) device: 0000:17:00.0 (socket 0)
====Building Pipeline Complete!====
DOCA GPUNetIO rate: 0 pkts [00:00, ? pkts/s]====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Segment: linear_segment_0====
Added source: <from-doca-0; DocaSourceStage(nic_pci_address=ca:00.0, gpu_pci_address=17:00.0, traffic_type=udp)>
└─> morpheus.MessageMeta
Added stage: <monitor-1; MonitorStage(description=DOCA GPUNetIO rate, smoothing=0.05, unit=pkts, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
DOCA GPUNetIO rate: 100000 pkts [00:12, 10963.39 pkts/s]
```

As the DOCA Source stage output packets in the new RawMessage format that not all the Morpheus stages may support, there is an additional stage named DOCA Convert Stage which transform the data RawMessage to the Messagemeta format.

```
python3 ./examples/doca/run_udp_convert.py --nic_addr 17:00.1 --gpu_addr ca:00.0 --traffic_type udp
```

## Doca Sensitive Information Detection example for TCP traffic

The DOCA example is similar to the Sensitive Information Detection (SID) example in that it uses the `sid-minibert` model in conjunction with the `TritonInferenceStage` to detect sensitive information. The difference is that the sensitive information we will be detecting is obtained from a live TCP packet stream provided by a `DocaSourceStage`.
To run the example from the Morpheus root directory and capture all TCP network traffic from the given NIC, use the following command and replace the `nic_addr` and `gpu_addr` arguments with your NIC and GPU PCIe addresses.
```
# python examples/doca/run.py --nic_addr cc:00.1 --gpu_addr cf:00.0 --traffic_type tcp
# python examples/doca/run_tcp.py --nic_addr cc:00.1 --gpu_addr cf:00.0 --traffic_type tcp
```
```
====Registering Pipeline====
Expand Down Expand Up @@ -119,45 +176,3 @@ AddClass rate: 0 pkts [00:09, ? pkts/s]
```
The output can be found in `doca_output.csv`

## Running the Example for UDP traffic

In case of UDP traffic, the sample will launch a simple pipeline with the DOCA Source Stage followed by a Monitor Stage to report number of received packets.
Command line is similar to the TCP example.

```
python3 ./examples/doca/run.py --nic_addr 17:00.1 --gpu_addr ca:00.0 --traffic_type udp
```
UDP traffic can be easily sent with nping to the interface where Morpheus is listening:
```
nping --udp -c 100000 -p 4100 192.168.2.27 --data-length 1024 --delay 0.1ms
```

Morpheus output would be:
```
====Pipeline Pre-build====
====Pre-Building Segment: linear_segment_0====
====Pre-Building Segment Complete!====
====Pipeline Pre-build Complete!====
====Registering Pipeline====
====Building Pipeline====
EAL: Detected CPU lcores: 64
EAL: Detected NUMA nodes: 2
EAL: Detected shared linkage of DPDK
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: VFIO support initialized
TELEMETRY: No legacy callbacks, legacy socket not created
EAL: Probe PCI driver: mlx5_pci (15b3:a2dc) device: 0000:ca:00.0 (socket 1)
EAL: Probe PCI driver: gpu_cuda (10de:2331) device: 0000:17:00.0 (socket 0)
====Building Pipeline Complete!====
DOCA GPUNetIO rate: 0 pkts [00:00, ? pkts/s]====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Segment: linear_segment_0====
Added source: <from-doca-0; DocaSourceStage(nic_pci_address=ca:00.0, gpu_pci_address=17:00.0, traffic_type=udp)>
└─> morpheus.MessageMeta
Added stage: <monitor-1; MonitorStage(description=DOCA GPUNetIO rate, smoothing=0.05, unit=pkts, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
DOCA GPUNetIO rate: 100000 pkts [00:12, 10963.39 pkts/s]
```
6 changes: 2 additions & 4 deletions examples/doca/run_tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ def run_pipeline(pipeline_batch_size, model_max_batch_size, model_fea_length, ou
config.mode = PipelineModes.NLP

# Below properties are specified by the command line
config.num_threads = 1
config.num_threads = 5
config.edge_buffer_size = 1024
config.pipeline_batch_size = pipeline_batch_size
config.model_max_batch_size = model_max_batch_size
config.feature_length = model_fea_length
config.mode = PipelineModes.NLP

config.class_labels = [
'address',
Expand All @@ -96,8 +96,6 @@ def run_pipeline(pipeline_batch_size, model_max_batch_size, model_fea_length, ou
'user'
]

config.edge_buffer_size = 128

pipeline = LinearPipeline(config)

# add doca source stage
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def run_cli():

from llm.cli import cli

cli(obj={}, auto_envvar_prefix='MORPHEUS_LLM', show_default=True, prog_name="morpheus_llm")
cli(obj={}, auto_envvar_prefix='MORPHEUS_LLM', show_default=True)


if __name__ == '__main__':
Expand Down
1 change: 1 addition & 0 deletions examples/llm/rag/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def run():
"--question",
type=str,
multiple=True,
default=["What are some new attacks discovered in the cyber security industry?"] * 5,
help="The question to answer with the RAG pipeline. Specify multiple times to answer multiple questions at once.",
)
def pipeline(**kwargs):
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def run():
@click.option("--source_type",
multiple=True,
type=click.Choice(['rss', 'filesystem', 'doca'], case_sensitive=False),
default=[],
default=['rss'],
show_default=True,
help="The type of source to use. Can specify multiple times for different source types.")
@click.option(
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/vdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ vdb_pipeline:
force_convert_inputs: true
model_name: "all-MiniLM-L6-v2"
server_url: "http://localhost:8001"
use_shared_memory: true
use_shared_memory: false

pipeline:
edge_buffer_size: 128
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/vdb_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def build_cli_configs(source_type,
"force_convert_inputs": True,
"model_name": embedding_model_name,
"server_url": triton_server_url,
"use_shared_memory": True,
"use_shared_memory": False,
},
"num_threads": num_threads,
}
Expand Down
9 changes: 2 additions & 7 deletions examples/sid_visualization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,6 @@ Inside the container, compile Morpheus:
BUILD_DIR=build-docker ./scripts/compile.sh
```

Install Morpheus with an extra dependency:
```bash
pip install -e .
```

Verify Morpheus is installed:
```bash
morpheus --version
Expand All @@ -105,7 +100,7 @@ git lfs install
After the GUI has been launched, Morpheus now needs to be started. In the same shell used to build Morpheus (the one running the Morpheus Dev container), run the following:
```bash
python examples/sid_visualization/run.py \
--debug --use_cpp=False --num_threads=1 \
--debug \
--triton_server_url=triton:8001 \
--input_file=./examples/data/sid_visualization/group1-benign-2nodes.jsonlines \
--input_file=./examples/data/sid_visualization/group2-benign-50nodes.jsonlines \
Expand Down Expand Up @@ -153,7 +148,7 @@ DEMO_DATASET="examples/data/sid_visualization/group1-benign-2nodes.jsonlines"

```bash
morpheus --log_level=DEBUG \
run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=32 --edge_buffer_size=4 --use_cpp=False \
run --pipeline_batch_size=1024 --model_max_batch_size=32 --edge_buffer_size=4 \
pipeline-nlp --model_seq_length=256 \
from-file --filename=${DEMO_DATASET} \
deserialize \
Expand Down
2 changes: 1 addition & 1 deletion examples/sid_visualization/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _generate_frames(self):

@click.command()
@click.option("--debug/--no-debug", default=False)
@click.option('--use_cpp', default=False)
@click.option('--use_cpp', default=True)
@click.option(
"--num_threads",
default=os.cpu_count(),
Expand Down
3 changes: 1 addition & 2 deletions morpheus/stages/doca/doca_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import RawPacketMessage
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema

logger = logging.getLogger(__name__)


@register_stage("from-doca-source", modes=[PipelineModes.NLP])
class DocaSourceStage(PreallocatorMixin, SingleOutputSource):
class DocaSourceStage(SingleOutputSource):
"""
A source stage used to receive raw packet data from a ConnectX-6 Dx NIC.
Expand Down

0 comments on commit ed2ec5e

Please sign in to comment.