diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 2d05b68a23..86a42253fb 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,11 +13,92 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM rapidsai/devcontainers:23.12-cpp-mambaforge-ubuntu22.04 AS base +ARG BASE +ARG PYTHON_PACKAGE_MANAGER -ENV MORPHEUS_SUPPORT_DOCA=ON +FROM node:22 as node + +FROM ${BASE} as base + +# ===== install common packages ================================================================== + +RUN </dev/null' + +ln -s /usr/local/bin/node /usr/local/bin/nodejs +ln -s /usr/local/lib/node_modules/npm/bin/npm-cli.js /usr/local/bin/npm +ln -s /usr/local/lib/node_modules/npm/bin/npx-cli.js /usr/local/bin/npx + +echo "node version: $(node --version)" +echo " npm version: $(npm --version)" +echo "yarn version: $(yarn --version)" + +npm install -g camouflage-server@0.15 +EOF + + +FROM base as pip-base + +ENV DEFAULT_VIRTUAL_ENV=morpheus + +FROM base as conda-base + +ENV DEFAULT_CONDA_ENV=morpheus + +FROM ${PYTHON_PACKAGE_MANAGER}-base + +ENV PROJECT_MANIFEST_YML="/home/coder/morpheus/manifest.yaml" +ENV PATH="${PATH}:/home/coder/morpheus/.devcontainer/bin" + +ARG CUDA +ENV CUDAARCHS="RAPIDS" +ENV CUDA_VERSION="${CUDA_VERSION:-${CUDA}}" + +ARG RAPIDS +ENV RAPIDS=${RAPIDS} + +ARG PYTHON_PACKAGE_MANAGER +ENV PYTHON_PACKAGE_MANAGER="${PYTHON_PACKAGE_MANAGER}" + +ENV SCCACHE_REGION="us-east-2" +ENV SCCACHE_BUCKET="rapids-sccache-devs" +ENV VAULT_HOST="https://vault.ops.k8s.rapids.ai" +ENV HISTFILE="/home/coder/.cache/._bash_history" + +ENV MORPHEUS_SUPPORT_DOCA=ON COPY ./docker/optional_deps docker/optional_deps RUN ./docker/optional_deps/doca.sh /tmp/doca - -ENV PATH="${PATH}:/workspaces/morpheus/.devcontainer/bin" diff --git a/.devcontainer/cuda12.1-conda/devcontainer.json b/.devcontainer/cuda12.1-conda/devcontainer.json new file mode 100644 index 0000000000..fa5d24c6d8 --- /dev/null +++ b/.devcontainer/cuda12.1-conda/devcontainer.json @@ -0,0 +1,112 @@ +{ + "build": { + "context": "${localWorkspaceFolder}/.devcontainer", + "dockerfile": "${localWorkspaceFolder}/.devcontainer/Dockerfile", + "args": { + "CUDA": "12.1", + "PYTHON_PACKAGE_MANAGER": "conda", + "BASE": "rapidsai/devcontainers:24.06-cpp-mambaforge-ubuntu22.04" + } + }, + "privileged": true, + "hostRequirements": { + "gpu": "optional" + }, + "capAdd": [ + "SYS_NICE", + "SYS_PTRACE" + ], + "securityOpt": [ + "seccomp=unconfined" + ], + "runArgs": [ + "--network=morpheus" + ], + "containerEnv": { + "HOST_MORPHEUS_ROOT": "${localWorkspaceFolder}", + "MORPHEUS_ROOT": "${containerWorkspaceFolder}/morpheus", + "DISPLAY": "${localEnv:DISPLAY:-}", + "XAUTHORITY": "${localEnv:XAUTHORITY:-}", + "XDG_SESSION_TYPE": "${localEnv:XDG_SESSION_TYPE:-}", + "XDG_RUNTIME_DIR": "${localEnv:XDG_RUNTIME_DIR:-}", + "DBUS_SESSION_BUS_ADDRESS": "${localEnv:DBUS_SESSION_BUS_ADDRESS:-}" + }, + "features": { + "ghcr.io/rapidsai/devcontainers/features/rapids-build-utils:24.6": {}, + "ghcr.io/devcontainers/features/docker-outside-of-docker:1": {}, + "ghcr.io/devcontainers/features/dotnet:1": { + "version": "6.0", + "installUsingApt": false + } + }, + "overrideFeatureInstallOrder": [ + "ghcr.io/rapidsai/devcontainers/features/rapids-build-utils", + "ghcr.io/devcontainers/features/docker-outside-of-docker", + "ghcr.io/devcontainers/features/dotnet" + ], + "initializeCommand": [ + "/bin/bash", + "-c", + "${localWorkspaceFolder}/.devcontainer/initialize-command.sh && mkdir -m 0755 -p ${localWorkspaceFolder}/../.{aws,cache,config,conda/pkgs,conda/${localWorkspaceFolderBasename}-cuda12.1-envs}" + ], + "postAttachCommand": [ + "/bin/bash", + "-c", + "if [ ${CODESPACES:-false} = 'true' ]; then . devcontainer-utils-post-attach-command; . rapids-post-attach-command; fi" + ], + "workspaceFolder": "/home/coder", + "workspaceMount": "source=${localWorkspaceFolder},target=/home/coder/morpheus,type=bind,consistency=consistent", + "mounts": [ + "source=/tmp/.X11-unix,target=/tmp/.X11-unix,type=bind", + "source=${localEnv:XDG_RUNTIME_DIR},target=${localEnv:XDG_RUNTIME_DIR},type=bind", + "source=/run/dbus/system_bus_socket,target=/run/dbus/system_bus_socket,type=bind", + "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind", + "source=/dev/hugepages,target=/dev/hugepages,type=bind,consistency=consistent", + "source=${localWorkspaceFolder}/../.aws,target=/home/coder/.aws,type=bind,consistency=consistent", + "source=${localWorkspaceFolder}/../.cache,target=/home/coder/.cache,type=bind,consistency=consistent", + "source=${localWorkspaceFolder}/../.config,target=/home/coder/.config,type=bind,consistency=consistent", + "source=${localWorkspaceFolder}/../.conda/pkgs,target=/home/coder/.conda/pkgs,type=bind,consistency=consistent", + "source=${localWorkspaceFolder}/../.conda/${localWorkspaceFolderBasename}-cuda12.1-envs,target=/home/coder/.conda/envs,type=bind,consistency=consistent" + ], + "customizations": { + "vscode": { + "extensions": [ + "cschlosser.doxdocgen", // Adding docstrings to C++ code + "eamodio.gitlens", // Enhanced Git support + "eeyore.yapf", // Python code formatter + "josetr.cmake-language-support-vscode", // CMake language support + "llvm-vs-code-extensions.vscode-clangd", + "llvm-vs-code-extensions.vscode-clangd", // Clangd language server for C++ + "matepek.vscode-catch2-test-adapter", + "mechatroner.rainbow-csv", // Colorizing CSV files + "mhutchie.git-graph", // Visualizing Git history and branching + "ms-azuretools.vscode-docker", // Docker support + "ms-python.debugpy", // Python debugger + "ms-python.flake8", // Python linter + "ms-python.isort", // Python import sorter + "ms-python.pylint", // Python linter + "ms-python.python", // Python language support + "ms-python.vscode-pylance", // Python language server + "ms-toolsai.jupyter", // Jupyter notebook support + "ms-vscode.cmake-tools", // CMake support for building Morpheus + "ms-vscode.cpptools", // C++ language support + "njpwerner.autodocstring", // Adding docstrings to python code + "nvidia.nsight-vscode-edition", // CUDA integration and debugging + "stkb.rewrap", // Wrapping all text in any language + "twxs.cmake", + "vadimcn.vscode-lldb", // LLDB debugger (better than GDB for C++ debugging) + "xaver.clang-format" + ], + "settings": { + "cmake.cmakePath": "/tmp/.current-conda-env/bin/cmake", + "C_Cpp.intelliSenseEngine": "disabled", + "python.terminal.activateEnvironment": false, + "files.watcherExclude": { + "**/.git/objects/**": true, + "**/.git/subtree-cache/**": true, + "**/.cache/**": true + } + } + } + } +} \ No newline at end of file diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json deleted file mode 100644 index 25326954a0..0000000000 --- a/.devcontainer/devcontainer.json +++ /dev/null @@ -1,128 +0,0 @@ -// !/bin/bash -// SPDX-FileCopyrightText: Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -// SPDX-License-Identifier: Apache-2.0 -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -{ - "name": "Morpheus", - "build": { - "dockerfile": "Dockerfile" - }, - "privileged": true, - - "hostRequirements": { - "gpu": true - }, - - "capAdd":[ - "SYS_NICE", - "SYS_PTRACE" - ], - - "securityOpt": [ - "seccomp=unconfined" - ], - - "runArgs": [ - "--network=morpheus", - "-v=/dev/hugepages:/dev/hugepages" - ], - - "containerEnv": { - "HOST_MORPHEUS_ROOT": "${localWorkspaceFolder}", - "MORPHEUS_ROOT": "${containerWorkspaceFolder}", - "DEFAULT_CONDA_ENV": "morpheus", - "MAMBA_NO_BANNER": "1", - "CONDA_ALWAYS_YES": "true", - "VAULT_HOST": "https://vault.ops.k8s.rapids.ai", - "DISPLAY": "${localEnv:DISPLAY:-}", - "XAUTHORITY": "${localEnv:XAUTHORITY:-}", - "XDG_SESSION_TYPE": "${localEnv:XDG_SESSION_TYPE:-}", - "XDG_RUNTIME_DIR": "${localEnv:XDG_RUNTIME_DIR:-}", - "DBUS_SESSION_BUS_ADDRESS": "${localEnv:DBUS_SESSION_BUS_ADDRESS:-}" - }, - - "initializeCommand": [ "${localWorkspaceFolder}/.devcontainer/initialize-command.sh" ], - - "remoteUser": "coder", - "mounts": [ - { - "type": "bind", - "source": "/tmp/.X11-unix", - "target": "/tmp/.X11-unix" - }, - { - "type": "bind", - "source": "${localEnv:XDG_RUNTIME_DIR}", - "target": "${localEnv:XDG_RUNTIME_DIR}" - }, - { - "type": "bind", - "source": "/run/dbus/system_bus_socket", - "target": "/run/dbus/system_bus_socket" - }, - { - "type":"bind", - "source": "/var/run/docker.sock", - "target": "/var/run/docker.sock" - }, - { - "type": "bind", - "source": "${localWorkspaceFolder}/.cache/conda/envs", - "target": "/home/coder/.conda/envs" - }, - { - "type": "bind", - "source": "${localWorkspaceFolder}/../.conda/pkgs", - "target": "/home/coder/.conda/pkgs" - }, - { - "type":"bind", - "source": "${localWorkspaceFolder}/../.config", // parent folder because sister repos are sibling dirs - "target": "/home/coder/.config" - }, - { - "type": "bind", - "source": "${localWorkspaceFolder}/.devcontainer/opt/morpheus", - "target": "/opt/morpheus" - } - ], - - "features": { - "ghcr.io/devcontainers/features/docker-from-docker": {}, - "ghcr.io/devcontainers/features/dotnet:1": { - "version": "6.0", - "installUsingApt": false - } - }, - - "customizations": { - "vscode": { - "extensions": [ - "eamodio.gitlens", - "llvm-vs-code-extensions.vscode-clangd", - "ms-python.python", - "ms-vscode.cpptools", - "ms-vscode.cmake-tools", - "nvidia.nsight-vscode-edition", - "twxs.cmake", - "xaver.clang-format" - ], - "settings": { - "cmake.cmakePath": "/tmp/.current-conda-env/bin/cmake", - "C_Cpp.intelliSenseEngine": "disabled", - "python.terminal.activateEnvironment": false - } - } - }, -} diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index f9fdf0d858..dfac10e129 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -version: "3" - services: triton: @@ -54,5 +52,5 @@ services: networks: default: - external: - name: morpheus + name: morpheus + external: true diff --git a/.devcontainer/initialize-command.sh b/.devcontainer/initialize-command.sh index bdc70d1820..9741e2464a 100755 --- a/.devcontainer/initialize-command.sh +++ b/.devcontainer/initialize-command.sh @@ -16,12 +16,5 @@ set -e -# create a docker network for morpheus +# create a docker network for morpheus if it does not exist docker network inspect morpheus >/dev/null 2>&1 || docker network create morpheus - -# create the parent conda folder so it's found when mounting -mkdir -p .cache/conda/envs -mkdir -p ../.conda/pkgs - -# create a config directory if it does not exist so it's found when mounting -mkdir -p ../.config diff --git a/.devcontainer/opt/morpheus/bin/post-attach-command.sh b/.devcontainer/opt/morpheus/bin/post-attach-command.sh deleted file mode 100755 index 00a0ab3b5c..0000000000 --- a/.devcontainer/opt/morpheus/bin/post-attach-command.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/bash -# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Ensure our ~/.config directory has the correct permissions. If ~/.config did -# not exist, and you mount ~/.config/gh from the host, then ~/.config will be -# created with root permissions which can break things - -conda_env_find(){ - conda env list | grep "${@}" >/dev/null 2>/dev/null -} - -ENV_NAME=${ENV_NAME:-morpheus} - -sed -ri "s/conda activate base/conda activate $ENV_NAME/g" ~/.bashrc; - -if conda_env_find "${ENV_NAME}" ; \ -then mamba env update --name ${ENV_NAME} -f ${MORPHEUS_ROOT}/conda/environments/all_cuda-121_arch-x86_64.yaml --prune; \ -else mamba env create --name ${ENV_NAME} -f ${MORPHEUS_ROOT}/conda/environments/all_cuda-121_arch-x86_64.yaml; \ -fi diff --git a/.devcontainer/opt/morpheus/etc/.gdbinit b/.devcontainer/opt/morpheus/etc/.gdbinit deleted file mode 100644 index 115ac5e1f9..0000000000 --- a/.devcontainer/opt/morpheus/etc/.gdbinit +++ /dev/null @@ -1,3 +0,0 @@ -add-auto-load-safe-path /opt/conda - -source /opt/morpheus/etc/enable_conda_libstd_pretty_print.py diff --git a/.devcontainer/opt/morpheus/etc/enable_conda_libstd_pretty_print.py b/.devcontainer/opt/morpheus/etc/enable_conda_libstd_pretty_print.py deleted file mode 100644 index bfe973663c..0000000000 --- a/.devcontainer/opt/morpheus/etc/enable_conda_libstd_pretty_print.py +++ /dev/null @@ -1,55 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# -*- python -*- -import os -import subprocess -import sys - -import gdb - -conda_env_path = os.environ.get("CONDA_PREFIX", None) - -if (conda_env_path is not None): - - gcc_path = os.environ.get("GCC", None) - - if (gcc_path is None): - print( - "Could not find gcc from $GCC: '{}'. Ensure gxx_linux-64, gcc_linux-64, sysroot_linux-64, and gdb have been installed into the current conda environment" - .format(gcc_path)) - else: - # Get the GCC version - result = subprocess.run([gcc_path, '-dumpversion'], stdout=subprocess.PIPE) - gcc_version = result.stdout.decode("utf-8").strip() - - # Build the gcc python path - gcc_python_path = os.path.join(conda_env_path, "share", "gcc-{}".format(gcc_version), "python") - - if (os.path.exists(gcc_python_path)): - - # Add to the path for the pretty printer - sys.path.insert(0, gcc_python_path) - - # Now register the pretty printers - from libstdcxx.v6 import register_libstdcxx_printers - register_libstdcxx_printers(gdb.current_objfile()) - - print("Loaded stdlibc++ pretty printers") - else: - print("Could not find gcc python files at: {}".format(gcc_python_path)) - print( - "Ensure gxx_linux-64, gcc_linux-64, sysroot_linux-64, and gdb have been installed into the current conda environment" - ) diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index e928fafd7f..7fc58687e2 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -95,7 +95,7 @@ jobs: # Disable conda upload for now, once we have morpheus packages in conda forge set the value to # !fromJSON(needs.prepare.outputs.is_pr) && (fromJSON(needs.prepare.outputs.is_main_branch) && 'main' || 'dev') || '' conda_upload_label: "" - container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-240221 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-240221 + container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-build-240524 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-240524 secrets: NGC_API_KEY: ${{ secrets.NGC_API_KEY }} diff --git a/ci/runner/Dockerfile b/ci/runner/Dockerfile index 57cc66f666..df34c327d0 100644 --- a/ci/runner/Dockerfile +++ b/ci/runner/Dockerfile @@ -37,7 +37,6 @@ SHELL ["/bin/bash", "-c"] # Create conda environment COPY ./dependencies.yaml /tmp/conda/ - # ============ build ================== FROM base as build @@ -56,6 +55,20 @@ RUN rapids-dependency-file-generator \ conda clean -afy && \ rm -rf /tmp/conda +ENV MORPHEUS_SUPPORT_DOCA=ON + +COPY ./docker/optional_deps/doca.sh /tmp/doca/ + +RUN apt update && \ + DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ + apt install --no-install-recommends -y \ + build-essential \ + libtool \ + automake && \ + apt clean && \ + /tmp/doca/doca.sh /tmp/doca && \ + rm -rf /tmp/doca + # ============ test ================== FROM base as test diff --git a/ci/scripts/github/common.sh b/ci/scripts/github/common.sh index a4269828c2..5f50779726 100644 --- a/ci/scripts/github/common.sh +++ b/ci/scripts/github/common.sh @@ -82,6 +82,10 @@ fi export CMAKE_BUILD_ALL_FEATURES="${_FLAGS[@]}" unset _FLAGS +if [[ ${MORPHEUS_SUPPORT_DOCA} == @(TRUE|ON) ]]; then + export CMAKE_BUILD_ALL_FEATURES="${CMAKE_BUILD_ALL_FEATURES} -DMORPHEUS_SUPPORT_DOCA=ON" +fi + export FETCH_STATUS=0 print_env_vars diff --git a/ci/scripts/run_ci_local.sh b/ci/scripts/run_ci_local.sh index 979fd07e23..444d4a5ebb 100755 --- a/ci/scripts/run_ci_local.sh +++ b/ci/scripts/run_ci_local.sh @@ -58,7 +58,7 @@ GIT_BRANCH=$(git branch --show-current) GIT_COMMIT=$(git log -n 1 --pretty=format:%H) LOCAL_CI_TMP=${LOCAL_CI_TMP:-${MORPHEUS_ROOT}/.tmp/local_ci_tmp} -CONTAINER_VER=${CONTAINER_VER:-240221} +CONTAINER_VER=${CONTAINER_VER:-240524} CUDA_VER=${CUDA_VER:-12.1} DOCKER_EXTRA_ARGS=${DOCKER_EXTRA_ARGS:-""} diff --git a/docker/optional_deps/doca.sh b/docker/optional_deps/doca.sh deleted file mode 100755 index f1975fbfe0..0000000000 --- a/docker/optional_deps/doca.sh +++ /dev/null @@ -1,64 +0,0 @@ -#!/bin/bash -# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -e - -MORPHEUS_SUPPORT_DOCA=${MORPHEUS_SUPPORT_DOCA:-OFF} -LINUX_DISTRO=${LINUX_DISTRO:-ubuntu} -LINUX_VER=${LINUX_VER:-22.04} -DOCA_VERSION=${DOCA_VERSION:-2.7.0} - -# Exit early if nothing to do -if [[ ${MORPHEUS_SUPPORT_DOCA} != @(TRUE|ON) ]]; then - exit 0 -fi - -WORKING_DIR=$1 - -echo "Installing DOCA using directory: ${WORKING_DIR}" - -DEB_DIR=${WORKING_DIR}/deb - -mkdir -p ${DEB_DIR} - -DOCA_OS_VERSION="ubuntu2204" -DOCA_PKG_LINK="https://www.mellanox.com/downloads/DOCA/DOCA_v${DOCA_VERSION}/host/doca-host_${DOCA_VERSION}-204000-24.04-${DOCA_OS_VERSION}_amd64.deb" - -# Upgrade the base packages (diff between image and Canonical upstream repo) -apt update -y -apt upgrade -y - -# Install wget -apt install -y --no-install-recommends wget - -wget -qO - ${DOCA_PKG_LINK} -O doca-host.deb -apt install ./doca-host.deb -apt update -apt install -y doca-all -apt install -y doca-gpu doca-gpu-dev - -# Now install the gdrcopy library according to: https://github.com/NVIDIA/gdrcopy -GDRCOPY_DIR=${WORKING_DIR}/gdrcopy - -if [[ ! -d "${GDRCOPY_DIR}" ]] ; then - git clone https://github.com/NVIDIA/gdrcopy.git ${GDRCOPY_DIR} - cd ${GDRCOPY_DIR} -else - cd ${GDRCOPY_DIR} - git pull https://github.com/NVIDIA/gdrcopy.git -fi - -make lib lib_install diff --git a/docker/optional_deps/doca.sh b/docker/optional_deps/doca.sh new file mode 120000 index 0000000000..3c62fefefe --- /dev/null +++ b/docker/optional_deps/doca.sh @@ -0,0 +1 @@ +../../.devcontainer/docker/optional_deps/doca.sh \ No newline at end of file diff --git a/docs/source/modules/examples/digital_fingerprinting/dfp_deployment.md b/docs/source/modules/examples/digital_fingerprinting/dfp_deployment.md index e63d791e54..ad094ee81a 100644 --- a/docs/source/modules/examples/digital_fingerprinting/dfp_deployment.md +++ b/docs/source/modules/examples/digital_fingerprinting/dfp_deployment.md @@ -102,7 +102,7 @@ This module function sets up modular Digital Fingerprinting Pipeline instance. | Parameter | Type | Description | Example Value | Default Value | |-------------------------|--------|-------------------------------------------------------------|---------------|---------------| -| `cache_mode` | string | The user ID to use if the user ID is not found | "batch" | `batch` | +| `cache_mode` | string | Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions are met. Otherwise, continue to aggregate user's history. | "batch" | `batch` | | `min_history` | int | Minimum history to trigger a new training event | 1 | `1` | | `max_history` | int | Maximum history to include in a new training event | 0 | `0` | | `timestamp_column_name` | string | Name of the column containing timestamps | "timestamp" | `timestamp` | diff --git a/docs/source/modules/examples/digital_fingerprinting/dfp_inference_pipe.md b/docs/source/modules/examples/digital_fingerprinting/dfp_inference_pipe.md index 5827b0a40d..b13fc42b10 100644 --- a/docs/source/modules/examples/digital_fingerprinting/dfp_inference_pipe.md +++ b/docs/source/modules/examples/digital_fingerprinting/dfp_inference_pipe.md @@ -60,7 +60,7 @@ into a single module. | Parameter | Type | Description | Example Value | Default Value | |-------------------------|--------|-------------------------------------------------------------|---------------|---------------| -| `cache_mode` | string | The user ID to use if the user ID is not found | "batch" | `batch` | +| `cache_mode` | string | Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions are met. Otherwise, continue to aggregate user's history. | "batch" | `batch` | | `min_history` | int | Minimum history to trigger a new training event | 1 | `1` | | `max_history` | int | Maximum history to include in a new training event | 0 | `0` | | `timestamp_column_name` | string | Name of the column containing timestamps | "timestamp" | `timestamp` | @@ -105,7 +105,7 @@ into a single module. }, "stream_aggregation_options": { "timestamp_column_name": "timestamp", - "cache_mode": "MEMORY", + "cache_mode": "batch", "trigger_on_min_history": true, "aggregation_span": "1D", "trigger_on_min_increment": true, diff --git a/docs/source/modules/examples/digital_fingerprinting/dfp_rolling_window.md b/docs/source/modules/examples/digital_fingerprinting/dfp_rolling_window.md index b6b8c068f2..7104937956 100644 --- a/docs/source/modules/examples/digital_fingerprinting/dfp_rolling_window.md +++ b/docs/source/modules/examples/digital_fingerprinting/dfp_rolling_window.md @@ -23,7 +23,7 @@ This module is responsible for maintaining a rolling window of historical data, | Parameter | Type | Description | Example Value | Default Value | |--------------------------|--------|--------------------------------------------------------------|---------------|---------------| -| cache_mode | string | The user ID to use if the user ID is not found | "batch" | "batch" | +| cache_mode | string | Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions are met. Otherwise, continue to aggregate user's history. | "batch" | "batch" | | trigger_on_min_history | int | Minimum history to trigger a new training event | 1 | 1 | | trigger_on_min_increment | int | Minmum increment from the last trained to new training event | 0 | 0 | | timestamp_column_name | string | Name of the column containing timestamps | "timestamp" | "timestamp" | diff --git a/docs/source/modules/examples/digital_fingerprinting/dfp_training_pipe.md b/docs/source/modules/examples/digital_fingerprinting/dfp_training_pipe.md index 388f2851ea..c1a9699886 100644 --- a/docs/source/modules/examples/digital_fingerprinting/dfp_training_pipe.md +++ b/docs/source/modules/examples/digital_fingerprinting/dfp_training_pipe.md @@ -59,7 +59,7 @@ This module function consolidates multiple DFP pipeline modules relevant to the | Key | Type | Description | Example Value | Default Value | |-------------------------|--------|-------------------------------------------------------------|---------------|---------------| -| `cache_mode` | string | The user ID to use if the user ID is not found | "batch" | `batch` | +| `cache_mode` | string | Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions are met. Otherwise, continue to aggregate user's history. | "batch" | `batch` | | `min_history` | int | Minimum history to trigger a new training event | 1 | `1` | | `max_history` | int | Maximum history to include in a new training event | 0 | `0` | | `timestamp_column_name` | string | Name of the column containing timestamps | 'timestamp' | `timestamp` | diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_deployment.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_deployment.py index 9ac3061b6b..e4744c32b6 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_deployment.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_deployment.py @@ -98,7 +98,8 @@ def dfp_deployment(builder: mrc.Builder): - timestamp_column_name (str): Name of the timestamp column; Example: `timestamp`; Default: timestamp stream_aggregation_options: - - cache_mode (str): The user ID to use if the user ID is not found; Example: 'batch'; Default: 'batch' + - cache_mode (str): Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions are + met. Otherwise, continue to aggregate user's history.; Example: 'batch'; Default: 'batch' - trigger_on_min_history (int): Minimum history to trigger a new training event; Example: 1; Default: 1 - trigger_on_min_increment (int): Minmum increment from the last trained to new training event; Example: 0; Default: 0 diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipe.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipe.py index b5dbbcf09c..24f72c278a 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipe.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipe.py @@ -94,7 +94,8 @@ def dfp_inference_pipe(builder: mrc.Builder): - timestamp_column_name (str): Name of the timestamp column; Example: "timestamp"; Default: timestamp stream_aggregation_options: - - cache_mode (str): The user ID to use if the user ID is not found; Example: 'batch'; Default: 'batch' + - cache_mode (str): Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions + are met. Otherwise, continue to aggregate user's history.; Example: 'batch'; Default: 'batch' - trigger_on_min_history (int): Minimum history to trigger a new training event; Example: 1; Default: 1 - trigger_on_min_increment (int): Minimum increment from the last trained to new training event; Example: 0; Default: 0 diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_rolling_window.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_rolling_window.py index c815f0e7f1..54a793b253 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_rolling_window.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_rolling_window.py @@ -48,7 +48,8 @@ def dfp_rolling_window(builder: mrc.Builder): Notes ----- Configurable parameters: - - cache_mode (str): The user ID to use if the user ID is not found; Example: 'batch'; Default: 'batch' + - cache_mode (str): Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions are + met. Otherwise, continue to aggregate user's history.; Example: 'batch'; Default: 'batch' - trigger_on_min_history (int): Minimum history to trigger a new training event; Example: 1; Default: 1 - trigger_on_min_increment (int): Minmum increment from the last trained to new training event; Example: 0; Default: 0 diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipe.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipe.py index a3fd39edf4..9fcbd946af 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipe.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipe.py @@ -87,7 +87,8 @@ def dfp_training_pipe(builder: mrc.Builder): - timestamp_column_name (str): Name of the timestamp column; Example: `timestamp`; Default: timestamp stream_aggregation_options: - - cache_mode (str): The user ID to use if the user ID is not found; Example: 'batch'; Default: 'batch' + - cache_mode (str): Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions + are met. Otherwise, continue to aggregate user's history.; Example: 'batch'; Default: 'batch' - trigger_on_min_history (int): Minimum history to trigger a new training event; Example: 1; Default: 1 - trigger_on_min_increment (int): Minimum increment from the last trained to new training event; Example: 0; Default: 0 diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/config_generator.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/config_generator.py index 74befdbb51..daeb21ac21 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/utils/config_generator.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/config_generator.py @@ -84,7 +84,7 @@ def infer_module_conf(self): "stream_aggregation_options": { "aggregation_span": "1d", "cache_to_disk": False, - "cache_mode": "streaming", + "cache_mode": "batch", }, "preprocessing_options": { "schema": { @@ -139,7 +139,7 @@ def train_module_conf(self): "stream_aggregation_options": { "aggregation_span": "60d", "cache_to_disk": False, - "cache_mode": "streaming", + "cache_mode": "aggregate", "trigger_on_min_history": 300, "trigger_on_min_increment": 300 }, diff --git a/examples/digital_fingerprinting/production/morpheus/notebooks/dfp_azure_integrated_training.ipynb b/examples/digital_fingerprinting/production/morpheus/notebooks/dfp_azure_integrated_training.ipynb index 40e6f6fd9c..1d77c364d9 100644 --- a/examples/digital_fingerprinting/production/morpheus/notebooks/dfp_azure_integrated_training.ipynb +++ b/examples/digital_fingerprinting/production/morpheus/notebooks/dfp_azure_integrated_training.ipynb @@ -309,7 +309,7 @@ "\n", "| Parameter | Type | Description | Example Value | Default Value |\n", "|-------------------------|--------|-------------------------------------------------------------|---------------|---------------|\n", - "| `cache_mode` | string | The user ID to use if the user ID is not found | \"batch\" | `batch` |\n", + "| `cache_mode` | string | Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions are met. Otherwise, continue to aggregate user's history. | \"batch\" | `batch` |\n", "| `min_history` | int | Minimum history to trigger a new training event | 1 | `1` |\n", "| `max_history` | int | Maximum history to include in a new training event | 0 | `0` |\n", "| `timestamp_column_name` | string | Name of the column containing timestamps | \"timestamp\" | `timestamp` |\n", diff --git a/examples/digital_fingerprinting/production/morpheus/notebooks/dfp_duo_integrated_training.ipynb b/examples/digital_fingerprinting/production/morpheus/notebooks/dfp_duo_integrated_training.ipynb index b87f57356c..16fff565cc 100644 --- a/examples/digital_fingerprinting/production/morpheus/notebooks/dfp_duo_integrated_training.ipynb +++ b/examples/digital_fingerprinting/production/morpheus/notebooks/dfp_duo_integrated_training.ipynb @@ -311,7 +311,7 @@ "\n", "| Parameter | Type | Description | Example Value | Default Value |\n", "|-------------------------|--------|-------------------------------------------------------------|---------------|---------------|\n", - "| `cache_mode` | string | The user ID to use if the user ID is not found | \"batch\" | `batch` |\n", + "| `cache_mode` | string | Mode for managing user cache. Setting to `batch` flushes cache once trigger conditions are met. Otherwise, continue to aggregate user's history. | \"batch\" | `batch` |\n", "| `min_history` | int | Minimum history to trigger a new training event | 1 | `1` |\n", "| `max_history` | int | Maximum history to include in a new training event | 0 | `0` |\n", "| `timestamp_column_name` | string | Name of the column containing timestamps | \"timestamp\" | `timestamp` |\n", diff --git a/examples/gnn_fraud_detection_pipeline/stages/model.py b/examples/gnn_fraud_detection_pipeline/stages/model.py index eac1a6f2e6..d82419e87e 100644 --- a/examples/gnn_fraud_detection_pipeline/stages/model.py +++ b/examples/gnn_fraud_detection_pipeline/stages/model.py @@ -165,7 +165,7 @@ def inference(self, """ # create sampler and test dataloaders - full_sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts=[4, 3]) + full_sampler = dgl.dataloading.MultiLayerFullNeighborSampler(num_layers=3) test_dataloader = dgl.dataloading.DataLoader(input_graph, {target_node: test_idx}, full_sampler, batch_size=batch_size, diff --git a/.devcontainer/opt/morpheus/bin/update-content-command.sh b/manifest.yaml old mode 100755 new mode 100644 similarity index 57% rename from .devcontainer/opt/morpheus/bin/update-content-command.sh rename to manifest.yaml index df003d1af2..dd97b1db41 --- a/.devcontainer/opt/morpheus/bin/update-content-command.sh +++ b/manifest.yaml @@ -1,5 +1,4 @@ -#!/bin/bash -# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,10 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -# Ensure our ~/.config directory has the correct permissions. If ~/.config did -# not exist, and you mount ~/.config/gh from the host, then ~/.config will be -# created with root permissions which can break things - -if [[ ! -f ~/.config/.gdbinit ]]; then - cp /opt/morpheus/etc/.gdbinit ~/.config/.gdbinit -fi +repos: +- name: morpheus + path: morpheus + cpp: + - name: morpheus + sub_dir: "" + args: | + -DMORPHEUS_PYTHON_INPLACE_BUILD=ON + python: + - name: morpheus + sub_dir: "" + git: + host: github + tag: branch-24.06 + upstream: nv-morpheus + repo: morpheus diff --git a/morpheus/_lib/cmake/libmorpheus.cmake b/morpheus/_lib/cmake/libmorpheus.cmake index 954620dfac..b4d3e5baaf 100644 --- a/morpheus/_lib/cmake/libmorpheus.cmake +++ b/morpheus/_lib/cmake/libmorpheus.cmake @@ -70,7 +70,7 @@ add_library(morpheus src/stages/add_scores.cpp src/stages/deserialize.cpp src/stages/file_source.cpp - src/stages/filter_detection.cpp + src/stages/filter_detections.cpp src/stages/http_server_source_stage.cpp src/stages/inference_client_stage.cpp src/stages/kafka_source.cpp diff --git a/morpheus/_lib/include/morpheus/stages/filter_detection.hpp b/morpheus/_lib/include/morpheus/stages/filter_detections.hpp similarity index 61% rename from morpheus/_lib/include/morpheus/stages/filter_detection.hpp rename to morpheus/_lib/include/morpheus/stages/filter_detections.hpp index 5e78a8322e..1e2d0cd86b 100644 --- a/morpheus/_lib/include/morpheus/stages/filter_detection.hpp +++ b/morpheus/_lib/include/morpheus/stages/filter_detections.hpp @@ -17,22 +17,22 @@ #pragma once -#include "morpheus/export.h" -#include "morpheus/messages/multi.hpp" -#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo -#include "morpheus/objects/filter_source.hpp" +#include "morpheus/export.h" // for MORPHEUS_EXPORT +#include "morpheus/messages/control.hpp" // for ControlMessage +#include "morpheus/messages/multi.hpp" // for MultiMessage +#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo +#include "morpheus/objects/filter_source.hpp" // for FilterSource -#include -#include -#include -#include -#include +#include // for cudaMemcpy +#include // for Builder +#include // for Object +#include // for PythonNode +#include // for observable_member, trace_activity, map, decay_t, from #include // for size_t -#include -#include -#include -#include +#include // for map +#include // for allocator, shared_ptr +#include // for string namespace morpheus { /****** Component public implementations *******************/ @@ -68,11 +68,12 @@ namespace morpheus { * Depending on the downstream stages, this can cause performance issues, especially if those stages need to acquire * the Python GIL. */ +template class MORPHEUS_EXPORT FilterDetectionsStage - : public mrc::pymrc::PythonNode, std::shared_ptr> + : public mrc::pymrc::PythonNode, std::shared_ptr> { public: - using base_t = mrc::pymrc::PythonNode, std::shared_ptr>; + using base_t = mrc::pymrc::PythonNode, std::shared_ptr>; using typename base_t::sink_type_t; using typename base_t::source_type_t; using typename base_t::subscribe_fn_t; @@ -90,8 +91,8 @@ class MORPHEUS_EXPORT FilterDetectionsStage private: subscribe_fn_t build_operator(); - DevMemInfo get_tensor_filter_source(const std::shared_ptr& x); - DevMemInfo get_column_filter_source(const std::shared_ptr& x); + DevMemInfo get_tensor_filter_source(const sink_type_t& x); + DevMemInfo get_column_filter_source(const sink_type_t& x); float m_threshold; bool m_copy; @@ -101,6 +102,11 @@ class MORPHEUS_EXPORT FilterDetectionsStage std::map m_idx2label; }; +using FilterDetectionsStageMM = // NOLINT(readability-identifier-naming) + FilterDetectionsStage; +using FilterDetectionsStageCM = // NOLINT(readability-identifier-naming) + FilterDetectionsStage; + /****** FilterDetectionStageInterfaceProxy******************/ /** * @brief Interface proxy, used to insulate python bindings. @@ -108,7 +114,27 @@ class MORPHEUS_EXPORT FilterDetectionsStage struct MORPHEUS_EXPORT FilterDetectionStageInterfaceProxy { /** - * @brief Create and initialize a FilterDetectionStage, and return the result + * @brief Create and initialize a FilterDetectionStage that receives MultiMessage and emits MultiMessage, and return + * the result + * + * @param builder : Pipeline context object reference + * @param name : Name of a stage reference + * @param threshold : Threshold to classify + * @param copy : Whether or not to perform a copy default=true + * @param filter_source : Indicate if the values used for filtering exist in either an output tensor + * (`FilterSource::TENSOR`) or a column in a Dataframe (`FilterSource::DATAFRAME`). + * @param field_name : Name of the tensor or Dataframe column to filter on default="probs" + * @return std::shared_ptr>> + */ + static std::shared_ptr> init_mm(mrc::segment::Builder& builder, + const std::string& name, + float threshold, + bool copy, + FilterSource filter_source, + std::string field_name); + /** + * @brief Create and initialize a FilterDetectionStage that receives ControlMessage and emits ControlMessage, and + * return the result * * @param builder : Pipeline context object reference * @param name : Name of a stage reference @@ -117,14 +143,15 @@ struct MORPHEUS_EXPORT FilterDetectionStageInterfaceProxy * @param filter_source : Indicate if the values used for filtering exist in either an output tensor * (`FilterSource::TENSOR`) or a column in a Dataframe (`FilterSource::DATAFRAME`). * @param field_name : Name of the tensor or Dataframe column to filter on default="probs" - * @return std::shared_ptr> + * @return std::shared_ptr>> */ - static std::shared_ptr> init(mrc::segment::Builder& builder, - const std::string& name, - float threshold, - bool copy, - FilterSource filter_source, - std::string field_name); + static std::shared_ptr> init_cm(mrc::segment::Builder& builder, + const std::string& name, + float threshold, + bool copy, + FilterSource filter_source, + std::string field_name); }; + /** @} */ // end of group } // namespace morpheus diff --git a/morpheus/_lib/src/stages/add_scores_stage_base.cpp b/morpheus/_lib/src/stages/add_scores_stage_base.cpp index b7ff58ca67..4bb76420de 100644 --- a/morpheus/_lib/src/stages/add_scores_stage_base.cpp +++ b/morpheus/_lib/src/stages/add_scores_stage_base.cpp @@ -18,26 +18,24 @@ #include "morpheus/stages/add_scores_stage_base.hpp" #include "morpheus/messages/memory/tensor_memory.hpp" // for TensorMemory -#include "morpheus/messages/meta.hpp" -#include "morpheus/messages/multi_response.hpp" // for MultiResponseMessage -#include "morpheus/objects/dtype.hpp" // for DType -#include "morpheus/objects/tensor.hpp" // for Tensor -#include "morpheus/objects/tensor_object.hpp" // for TensorObject -#include "morpheus/types.hpp" // for TensorIndex -#include "morpheus/utilities/matx_util.hpp" // for MatxUtil -#include "morpheus/utilities/string_util.hpp" // for StringUtil -#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils - -#include // for CHECK, COMPACT_GOOGLE_LOG_FATAL, LogMessageFatal, COMP... +#include "morpheus/messages/meta.hpp" // for MessageMeta +#include "morpheus/messages/multi_response.hpp" // for MultiResponseMessage +#include "morpheus/objects/dtype.hpp" // for DType +#include "morpheus/objects/tensor.hpp" // for Tensor +#include "morpheus/objects/tensor_object.hpp" // for TensorObject +#include "morpheus/types.hpp" // for TensorIndex +#include "morpheus/utilities/matx_util.hpp" // for MatxUtil +#include "morpheus/utilities/string_util.hpp" // for StringUtil +#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils + +#include // for CHECK, COMPACT_GOOGLE_LOG_FATAL, LogMessageFatal #include // for observable_member, trace_activity, decay_t, operator| #include // for size_t #include // for reverse_iterator #include // for shared_ptr, allocator, __shared_ptr_access #include // for basic_ostream, operator<<, basic_ostream::operator<< -#include // for runtime_error #include // for is_same_v -#include // for type_info #include // for move, pair #include // for vector // IWYU thinks we need __alloc_traits<>::value_type for vector assignments @@ -72,12 +70,10 @@ AddScoresStageBase::source_type_t AddScoresStageBaseon_control_message(x); } - // sink_type_t not supported else { - std::string error_msg{"AddScoresStageBase receives unsupported input type: " + std::string(typeid(x).name())}; - LOG(ERROR) << error_msg; - throw std::runtime_error(error_msg); + // sink_type_t not supported + static_assert(!sizeof(sink_type_t), "AddScoresStageBase receives unsupported input type"); } return x; } diff --git a/morpheus/_lib/src/stages/filter_detection.cpp b/morpheus/_lib/src/stages/filter_detection.cpp deleted file mode 100644 index 199d716e5b..0000000000 --- a/morpheus/_lib/src/stages/filter_detection.cpp +++ /dev/null @@ -1,217 +0,0 @@ -/* - * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "morpheus/stages/filter_detection.hpp" // IWYU pragma: accosiated - -#include "mrc/segment/builder.hpp" -#include "mrc/segment/object.hpp" -#include "pymrc/node.hpp" - -#include "morpheus/messages/multi_tensor.hpp" -#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo -#include "morpheus/objects/dtype.hpp" // for DataType -#include "morpheus/objects/memory_descriptor.hpp" -#include "morpheus/objects/table_info.hpp" -#include "morpheus/objects/tensor_object.hpp" // for TensorIndex, TensorObject -#include "morpheus/types.hpp" // for RangeType -#include "morpheus/utilities/matx_util.hpp" -#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils::get_element_stride - -#include // for cudaMemcpy, cudaMemcpyDeviceToDevice, cudaMemcpyDeviceToHost -#include -#include -#include // for CHECK, CHECK_NE -#include // for MRC_CHECK_CUDA -#include -#include // for device_buffer -#include - -#include -#include // for uint8_t -#include -#include -#include -#include // needed for glog -#include -#include // for pair -#include -// IWYU thinks we need ext/new_allocator.h for size_t for some reason -// IWYU pragma: no_include - -namespace morpheus { - -// Component public implementations -// ************ FilterDetectionStage **************************** // -FilterDetectionsStage::FilterDetectionsStage(float threshold, - bool copy, - FilterSource filter_source, - std::string field_name) : - PythonNode(base_t::op_factory_from_sub_fn(build_operator())), - m_threshold(threshold), - m_copy(copy), - m_filter_source(filter_source), - m_field_name(std::move(field_name)) -{ - CHECK(m_filter_source != FilterSource::Auto); // The python stage should determine this -} - -DevMemInfo FilterDetectionsStage::get_tensor_filter_source(const std::shared_ptr& x) -{ - // The pipeline build will check to ensure that our input is a MultiResponseMessage - const auto& filter_source = std::static_pointer_cast(x)->get_tensor(m_field_name); - CHECK(filter_source.rank() > 0 && filter_source.rank() <= 2) - << "C++ impl of the FilterDetectionsStage currently only supports one and two dimensional " - "arrays"; - - // Depending on the input the stride is given in bytes or elements, convert to elements - auto stride = morpheus::TensorUtils::get_element_stride(filter_source.get_stride()); - return {filter_source.data(), filter_source.dtype(), filter_source.get_memory(), filter_source.get_shape(), stride}; -} - -DevMemInfo FilterDetectionsStage::get_column_filter_source(const std::shared_ptr& x) -{ - auto table_info = x->get_meta(m_field_name); - - // since we only asked for one column, we know its the first - const auto& col = table_info.get_column(0); - auto dtype = morpheus::DType::from_cudf(col.type().id()); - auto num_rows = col.size(); - auto data = - const_cast(static_cast(col.head() + col.offset() * dtype.item_size())); - - return { - data, - std::move(dtype), - std::make_shared(rmm::cuda_stream_per_thread, rmm::mr::get_current_device_resource()), - {num_rows, 1}, - {1, 0}, - }; -} - -FilterDetectionsStage::subscribe_fn_t FilterDetectionsStage::build_operator() -{ - return [this](rxcpp::observable input, rxcpp::subscriber output) { - std::function& x)> get_filter_source; - - if (m_filter_source == FilterSource::TENSOR) - { - get_filter_source = [this](auto x) { - return get_tensor_filter_source(x); - }; - } - else - { - get_filter_source = [this](auto x) { - return get_column_filter_source(x); - }; - } - - return input.subscribe(rxcpp::make_observer( - [this, &output, &get_filter_source](sink_type_t x) { - auto tmp_buffer = get_filter_source(x); - - const auto num_rows = tmp_buffer.shape(0); - const auto num_columns = tmp_buffer.shape(1); - - bool by_row = (num_columns > 1); - - // Now call the threshold function - auto thresh_bool_buffer = MatxUtil::threshold(tmp_buffer, m_threshold, by_row); - - std::vector host_bool_values(num_rows); - - // Copy bools back to host - MRC_CHECK_CUDA(cudaMemcpy(host_bool_values.data(), - thresh_bool_buffer->data(), - thresh_bool_buffer->size(), - cudaMemcpyDeviceToHost)); - - // Only used when m_copy is true - std::vector selected_ranges; - std::size_t num_selected_rows = 0; - - // We are slicing by rows, using num_rows as our marker for undefined - std::size_t slice_start = num_rows; - for (std::size_t row = 0; row < num_rows; ++row) - { - bool above_threshold = host_bool_values[row]; - - if (above_threshold && slice_start == num_rows) - { - slice_start = row; - } - else if (!above_threshold && slice_start != num_rows) - { - if (m_copy) - { - selected_ranges.emplace_back(std::pair{slice_start, row}); - num_selected_rows += (row - slice_start); - } - else - { - output.on_next(x->get_slice(slice_start, row)); - } - - slice_start = num_rows; - } - } - - if (slice_start != num_rows) - { - // Last row was above the threshold - if (m_copy) - { - selected_ranges.emplace_back(std::pair{slice_start, num_rows}); - num_selected_rows += (num_rows - slice_start); - } - else - { - output.on_next(x->get_slice(slice_start, num_rows)); - } - } - - // num_selected_rows will always be 0 when m_copy is false, - // or when m_copy is true, but none of the rows matched the output - if (num_selected_rows > 0) - { - DCHECK(m_copy); - output.on_next(x->copy_ranges(selected_ranges, num_selected_rows)); - } - }, - [&](std::exception_ptr error_ptr) { - output.on_error(error_ptr); - }, - [&]() { - output.on_completed(); - })); - }; -} - -// ************ FilterDetectionStageInterfaceProxy ************* // -std::shared_ptr> FilterDetectionStageInterfaceProxy::init( - mrc::segment::Builder& builder, - const std::string& name, - float threshold, - bool copy, - FilterSource filter_source, - std::string field_name) -{ - auto stage = builder.construct_object(name, threshold, copy, filter_source, field_name); - - return stage; -} -} // namespace morpheus diff --git a/morpheus/_lib/src/stages/filter_detections.cpp b/morpheus/_lib/src/stages/filter_detections.cpp new file mode 100644 index 0000000000..c1cf8b7036 --- /dev/null +++ b/morpheus/_lib/src/stages/filter_detections.cpp @@ -0,0 +1,310 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "morpheus/stages/filter_detections.hpp" + +#include "mrc/segment/builder.hpp" // for Builder +#include "mrc/segment/object.hpp" // for Object + +#include "morpheus/messages/control.hpp" // for ControlMessage +#include "morpheus/messages/multi.hpp" // for MultiMessage +#include "morpheus/messages/multi_tensor.hpp" // for MultiTensorMessage +#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo +#include "morpheus/objects/dtype.hpp" // for DType +#include "morpheus/objects/memory_descriptor.hpp" // for MemoryDescriptor +#include "morpheus/objects/table_info.hpp" // for TableInfo +#include "morpheus/types.hpp" // for RangeType +#include "morpheus/utilities/matx_util.hpp" // for MatxUtil +#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils + +#include // for cudaMemcpy, cudaMemcpyKind +#include // for column_view +#include // for data_type +#include // for COMPACT_GOOGLE_LOG_FATAL, LogMessageFatal, CHECK, DCHECK +#include // for MRC_CHECK_CUDA +#include // for cuda_stream_per_thread +#include // for get_current_device_resource + +#include // for size_t +#include // for uint8_t +#include // for exception_ptr +#include // for function +#include // for make_shared, shared_ptr, __shared_ptr_access +#include // for operator<<, basic_ostream +#include // for char_traits, string +#include // for move, pair +#include // for vector + +namespace morpheus { + +// Component public implementations +// ************ FilterDetectionStage **************************** // +template +FilterDetectionsStage::FilterDetectionsStage(float threshold, + bool copy, + FilterSource filter_source, + std::string field_name) : + base_t(base_t::op_factory_from_sub_fn(build_operator())), + m_threshold(threshold), + m_copy(copy), + m_filter_source(filter_source), + m_field_name(std::move(field_name)) +{ + CHECK(m_filter_source != FilterSource::Auto); // The python stage should determine this +} + +template +DevMemInfo FilterDetectionsStage::get_tensor_filter_source(const sink_type_t& x) +{ + if constexpr (std::is_same_v) + { + // The pipeline build will check to ensure that our input is a MultiResponseMessage + const auto& filter_source = std::static_pointer_cast(x)->get_tensor(m_field_name); + CHECK(filter_source.rank() > 0 && filter_source.rank() <= 2) + << "C++ impl of the FilterDetectionsStage currently only supports one and two dimensional " + "arrays"; + + // Depending on the input the stride is given in bytes or elements, convert to elements + auto stride = TensorUtils::get_element_stride(filter_source.get_stride()); + return { + filter_source.data(), filter_source.dtype(), filter_source.get_memory(), filter_source.get_shape(), stride}; + } + else if constexpr (std::is_same_v) + { + const auto& filter_source = x->tensors()->get_tensor(m_field_name); + CHECK(filter_source.rank() > 0 && filter_source.rank() <= 2) + << "C++ impl of the FilterDetectionsStage currently only supports one and two dimensional " + "arrays"; + + // Depending on the input the stride is given in bytes or elements, convert to elements + auto stride = TensorUtils::get_element_stride(filter_source.get_stride()); + return { + filter_source.data(), filter_source.dtype(), filter_source.get_memory(), filter_source.get_shape(), stride}; + } + else + { + // sink_type_t not supported + static_assert(!sizeof(sink_type_t), "FilterDetectionsStage receives unsupported input type"); + } +} + +template +DevMemInfo FilterDetectionsStage::get_column_filter_source(const sink_type_t& x) +{ + TableInfo table_info; + if constexpr (std::is_same_v) + { + table_info = x->get_meta(m_field_name); + } + else if constexpr (std::is_same_v) + { + table_info = x->payload()->get_info(m_field_name); + } + else + { + // sink_type_t not supported + static_assert(!sizeof(sink_type_t), "FilterDetectionsStage receives unsupported input type"); + } + + // since we only asked for one column, we know its the first + const auto& col = table_info.get_column(0); + auto dtype = DType::from_cudf(col.type().id()); + auto num_rows = col.size(); + auto data = + const_cast(static_cast(col.head() + col.offset() * dtype.item_size())); + + return { + data, + std::move(dtype), + std::make_shared(rmm::cuda_stream_per_thread, rmm::mr::get_current_device_resource()), + {num_rows, 1}, + {1, 0}, + }; +} + +template +FilterDetectionsStage::subscribe_fn_t FilterDetectionsStage::build_operator() +{ + return [this](rxcpp::observable input, rxcpp::subscriber output) { + std::function get_filter_source; + + if (m_filter_source == FilterSource::TENSOR) + { + get_filter_source = [this](auto x) { + return get_tensor_filter_source(x); + }; + } + else + { + get_filter_source = [this](auto x) { + return get_column_filter_source(x); + }; + } + + return input.subscribe(rxcpp::make_observer( + [this, &output, &get_filter_source](sink_type_t x) { + auto tmp_buffer = get_filter_source(x); + + const auto num_rows = tmp_buffer.shape(0); + const auto num_columns = tmp_buffer.shape(1); + + bool by_row = (num_columns > 1); + + // Now call the threshold function + auto thresh_bool_buffer = MatxUtil::threshold(tmp_buffer, m_threshold, by_row); + + std::vector host_bool_values(num_rows); + + // Copy bools back to host + MRC_CHECK_CUDA(cudaMemcpy(host_bool_values.data(), + thresh_bool_buffer->data(), + thresh_bool_buffer->size(), + cudaMemcpyDeviceToHost)); + + // Only used when m_copy is true + std::vector selected_ranges; + std::size_t num_selected_rows = 0; + + // We are slicing by rows, using num_rows as our marker for undefined + std::size_t slice_start = num_rows; + for (std::size_t row = 0; row < num_rows; ++row) + { + bool above_threshold = host_bool_values[row]; + + if (above_threshold && slice_start == num_rows) + { + slice_start = row; + } + else if (!above_threshold && slice_start != num_rows) + { + if (m_copy) + { + selected_ranges.emplace_back(std::pair{slice_start, row}); + num_selected_rows += (row - slice_start); + } + else + { + if constexpr (std::is_same_v) + { + output.on_next(x->get_slice(slice_start, row)); + } + else if constexpr (std::is_same_v) + { + auto meta = x->payload(); + std::shared_ptr sliced_cm = std::make_shared(*x); + sliced_cm->payload(meta->get_slice(slice_start, row)); + output.on_next(sliced_cm); + } + else + { + // sink_type_t not supported + static_assert(!sizeof(sink_type_t), + "FilterDetectionsStage receives unsupported input type"); + } + } + + slice_start = num_rows; + } + } + + if (slice_start != num_rows) + { + // Last row was above the threshold + if (m_copy) + { + selected_ranges.emplace_back(std::pair{slice_start, num_rows}); + num_selected_rows += (num_rows - slice_start); + } + else + { + if constexpr (std::is_same_v) + { + output.on_next(x->get_slice(slice_start, num_rows)); + } + else if constexpr (std::is_same_v) + { + auto meta = x->payload(); + x->payload(meta->get_slice(slice_start, num_rows)); + output.on_next(x); + } + else + { + // sink_type_t not supported + static_assert(!sizeof(sink_type_t), + "FilterDetectionsStage receives unsupported input type"); + } + } + } + + // num_selected_rows will always be 0 when m_copy is false, + // or when m_copy is true, but none of the rows matched the output + if (num_selected_rows > 0) + { + DCHECK(m_copy); + if constexpr (std::is_same_v) + { + output.on_next(x->copy_ranges(selected_ranges, num_selected_rows)); + } + else if constexpr (std::is_same_v) + { + auto meta = x->payload(); + x->payload(meta->copy_ranges(selected_ranges)); + output.on_next(x); + } + else + { + // sink_type_t not supported + static_assert(!sizeof(sink_type_t), "FilterDetectionsStage receives unsupported input type"); + } + } + }, + [&](std::exception_ptr error_ptr) { + output.on_error(error_ptr); + }, + [&]() { + output.on_completed(); + })); + }; +} + +// ************ FilterDetectionStageInterfaceProxy ************* // +std::shared_ptr> FilterDetectionStageInterfaceProxy::init_mm( + mrc::segment::Builder& builder, + const std::string& name, + float threshold, + bool copy, + FilterSource filter_source, + std::string field_name) +{ + auto stage = builder.construct_object(name, threshold, copy, filter_source, field_name); + + return stage; +} + +std::shared_ptr> FilterDetectionStageInterfaceProxy::init_cm( + mrc::segment::Builder& builder, + const std::string& name, + float threshold, + bool copy, + FilterSource filter_source, + std::string field_name) +{ + auto stage = builder.construct_object(name, threshold, copy, filter_source, field_name); + + return stage; +} +} // namespace morpheus diff --git a/morpheus/_lib/src/stages/preprocess_fil.cpp b/morpheus/_lib/src/stages/preprocess_fil.cpp index 978e7557eb..ad1e09c1b4 100644 --- a/morpheus/_lib/src/stages/preprocess_fil.cpp +++ b/morpheus/_lib/src/stages/preprocess_fil.cpp @@ -38,7 +38,6 @@ #include // for column_view #include // for type_id, data_type #include // for cast -#include // for COMPACT_GOOGLE_LOG_ERROR, LOG, LogMessage #include // for __check_cuda_errors, MRC_CHECK_CUDA #include // for Builder #include // for gil_scoped_acquire @@ -50,9 +49,7 @@ #include // for find #include // for size_t #include // for shared_ptr, __shared_ptr_access, allocator, mak... -#include // for runtime_error #include // for is_same_v -#include // for type_info #include // for move namespace morpheus { @@ -144,12 +141,10 @@ TableInfo PreprocessFILStage::fix_bad_columns(sink_type_t x) // Now re-get the meta return x->payload()->get_info(m_fea_cols); } - // sink_type_t not supported else { - std::string error_msg{"PreProcessFILStage receives unsupported input type: " + std::string(typeid(x).name())}; - LOG(ERROR) << error_msg; - throw std::runtime_error(error_msg); + // sink_type_t not supported + static_assert(!sizeof(sink_type_t), "PreProcessFILStage receives unsupported input type"); } } @@ -164,12 +159,10 @@ PreprocessFILStage::source_type_t PreprocessFILStage // for column #include // for make_column_from_scalar @@ -42,7 +42,6 @@ #include // for table_view #include // for type_id, data_type #include // for cast -#include // for COMPACT_GOOGLE_LOG_ERROR, LOG, LogMessage #include // for Builder #include // for normalize_spaces #include // for tokenizer_result, load_vocabulary_file, subword_tok... @@ -52,9 +51,7 @@ #include // for uint32_t, int32_t #include // for shared_ptr, unique_ptr, __shared_ptr_access, make_s... -#include // for runtime_error #include // for is_same_v -#include // for type_info #include // for move #include // for vector @@ -100,12 +97,10 @@ PreprocessNLPStage::source_type_t PreprocessNLPStageon_control_message(x); } - // sink_type_t not supported else { - std::string error_msg{"PreProcessNLPStage receives unsupported input type: " + std::string(typeid(x).name())}; - LOG(ERROR) << error_msg; - throw std::runtime_error(error_msg); + // sink_type_t not supported + static_assert(!sizeof(sink_type_t), "PreProcessNLPStage receives unsupported input type"); } } diff --git a/morpheus/_lib/stages/__init__.pyi b/morpheus/_lib/stages/__init__.pyi index bfd66dcb64..8b8413b67e 100644 --- a/morpheus/_lib/stages/__init__.pyi +++ b/morpheus/_lib/stages/__init__.pyi @@ -21,7 +21,8 @@ __all__ = [ "DeserializeControlMessageStage", "DeserializeMultiMessageStage", "FileSourceStage", - "FilterDetectionsStage", + "FilterDetectionsControlMessageStage", + "FilterDetectionsMultiMessageStage", "FilterSource", "HttpServerSourceStage", "InferenceClientStageCM", @@ -64,7 +65,10 @@ class FileSourceStage(mrc.core.segment.SegmentObject): @typing.overload def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, filter_null: bool, filter_null_columns: typing.List[str], parser_kwargs: dict) -> None: ... pass -class FilterDetectionsStage(mrc.core.segment.SegmentObject): +class FilterDetectionsControlMessageStage(mrc.core.segment.SegmentObject): + def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ... + pass +class FilterDetectionsMultiMessageStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ... pass class HttpServerSourceStage(mrc.core.segment.SegmentObject): diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index 32c3c5e030..5b33f59179 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -25,7 +25,7 @@ #include "morpheus/stages/add_scores.hpp" #include "morpheus/stages/deserialize.hpp" #include "morpheus/stages/file_source.hpp" -#include "morpheus/stages/filter_detection.hpp" +#include "morpheus/stages/filter_detections.hpp" #include "morpheus/stages/http_server_source_stage.hpp" #include "morpheus/stages/inference_client_stage.hpp" #include "morpheus/stages/kafka_source.hpp" @@ -168,11 +168,23 @@ PYBIND11_MODULE(stages, _module) py::arg("filter_null_columns"), py::arg("parser_kwargs")); - py::class_, + py::class_, mrc::segment::ObjectProperties, - std::shared_ptr>>( - _module, "FilterDetectionsStage", py::multiple_inheritance()) - .def(py::init<>(&FilterDetectionStageInterfaceProxy::init), + std::shared_ptr>>( + _module, "FilterDetectionsMultiMessageStage", py::multiple_inheritance()) + .def(py::init<>(&FilterDetectionStageInterfaceProxy::init_mm), + py::arg("builder"), + py::arg("name"), + py::arg("threshold"), + py::arg("copy"), + py::arg("filter_source"), + py::arg("field_name") = "probs"); + + py::class_, + mrc::segment::ObjectProperties, + std::shared_ptr>>( + _module, "FilterDetectionsControlMessageStage", py::multiple_inheritance()) + .def(py::init<>(&FilterDetectionStageInterfaceProxy::init_cm), py::arg("builder"), py::arg("name"), py::arg("threshold"), diff --git a/morpheus/controllers/filter_detections_controller.py b/morpheus/controllers/filter_detections_controller.py index ecd38a59b3..167bef64fb 100644 --- a/morpheus/controllers/filter_detections_controller.py +++ b/morpheus/controllers/filter_detections_controller.py @@ -20,6 +20,7 @@ import typing_utils from morpheus.common import FilterSource +from morpheus.messages import ControlMessage from morpheus.messages import MultiMessage from morpheus.messages import MultiResponseMessage @@ -66,12 +67,18 @@ def field_name(self): """ return self._field_name - def _find_detections(self, x: MultiMessage) -> typing.Union[cp.ndarray, np.ndarray]: - # Determind the filter source - if self._filter_source == FilterSource.TENSOR: - filter_source = x.get_output(self._field_name) - else: - filter_source = x.get_meta(self._field_name).values + def _find_detections(self, x: MultiMessage | ControlMessage) -> typing.Union[cp.ndarray, np.ndarray]: + # Determine the filter source + if isinstance(x, MultiMessage): + if self._filter_source == FilterSource.TENSOR: + filter_source = x.get_output(self._field_name) + else: + filter_source = x.get_meta(self._field_name).values + elif isinstance(x, ControlMessage): + if self._filter_source == FilterSource.TENSOR: + filter_source = x.tensors().get_tensor(self._field_name) + else: + filter_source = x.payload().get_data(self._field_name).values if (isinstance(filter_source, np.ndarray)): array_mod = np @@ -89,7 +96,7 @@ def _find_detections(self, x: MultiMessage) -> typing.Union[cp.ndarray, np.ndarr return array_mod.where(detections[1:] != detections[:-1])[0].reshape((-1, 2)) - def filter_copy(self, x: MultiMessage) -> MultiMessage: + def filter_copy(self, x: MultiMessage | ControlMessage) -> MultiMessage | ControlMessage: """ This function uses a threshold value to filter the messages. @@ -113,9 +120,15 @@ def filter_copy(self, x: MultiMessage) -> MultiMessage: if (true_pairs.shape[0] == 0): return None - return x.copy_ranges(true_pairs) + if isinstance(x, MultiMessage): + return x.copy_ranges(true_pairs) + if isinstance(x, ControlMessage): + meta = x.payload() + x.payload(meta.copy_ranges(true_pairs)) + return x + raise TypeError(f"Unsupported message type: {type(x)}") - def filter_slice(self, x: MultiMessage) -> typing.List[MultiMessage]: + def filter_slice(self, x: MultiMessage | ControlMessage) -> typing.List[MultiMessage] | typing.List[ControlMessage]: """ This function uses a threshold value to filter the messages. @@ -134,10 +147,19 @@ def filter_slice(self, x: MultiMessage) -> typing.List[MultiMessage]: output_list = [] if x is not None: true_pairs = self._find_detections(x) - for pair in true_pairs: - pair = tuple(pair.tolist()) - if ((pair[1] - pair[0]) > 0): - output_list.append(x.get_slice(*pair)) + if isinstance(x, MultiMessage): + for pair in true_pairs: + pair = tuple(pair.tolist()) + if ((pair[1] - pair[0]) > 0): + output_list.append(x.get_slice(*pair)) + elif isinstance(x, ControlMessage): + for pair in true_pairs: + pair = tuple(pair.tolist()) + if ((pair[1] - pair[0]) > 0): + sliced_meta = x.payload().get_slice(*pair) + cm = ControlMessage(x) + cm.payload(sliced_meta) + output_list.append(cm) return output_list diff --git a/morpheus/llm/services/llm_service.py b/morpheus/llm/services/llm_service.py index 07c777c547..7466b01a8a 100644 --- a/morpheus/llm/services/llm_service.py +++ b/morpheus/llm/services/llm_service.py @@ -30,6 +30,11 @@ class LLMClient(ABC): def get_input_names(self) -> list[str]: """ Returns the names of the inputs to the model. + + Returns + ------- + list[str] + List of input names. """ pass @@ -42,6 +47,11 @@ def generate(self, **input_dict) -> str: ---------- input_dict : dict Input containing prompt data. + + Returns + ------- + str + Generated response for prompt. """ pass @@ -54,6 +64,11 @@ async def generate_async(self, **input_dict) -> str: ---------- input_dict : dict Input containing prompt data. + + Returns + ------- + str + Generated async response for prompt. """ pass @@ -80,6 +95,11 @@ def generate_batch(self, inputs: dict[str, list], return_exceptions=False) -> li Inputs containing prompt data. return_exceptions : bool Whether to return exceptions in the output list or raise them immediately. + + Returns + ------- + list[str] | list[str | BaseException] + List of responses or list of responses and exceptions. """ pass @@ -110,6 +130,11 @@ async def generate_batch_async(self, Inputs containing prompt data. return_exceptions : bool Whether to return exceptions in the output list or raise them immediately. + + Returns + ------- + list[str] | list[str | BaseException] + List of responses or list of responses and exceptions. """ pass @@ -131,5 +156,10 @@ def get_client(self, *, model_name: str, **model_kwargs) -> LLMClient: model_kwargs : dict[str, typing.Any] Additional keyword arguments to pass to the model. + + Returns + ------- + LLMClient + Client for interacting with LLM models. """ pass diff --git a/morpheus/llm/services/nemo_llm_service.py b/morpheus/llm/services/nemo_llm_service.py index 81fb4f9b97..d744948159 100644 --- a/morpheus/llm/services/nemo_llm_service.py +++ b/morpheus/llm/services/nemo_llm_service.py @@ -14,12 +14,12 @@ import asyncio import logging -import os import typing import warnings from morpheus.llm.services.llm_service import LLMClient from morpheus.llm.services.llm_service import LLMService +from morpheus.utils.env_config_value import EnvConfigValue logger = logging.getLogger(__name__) @@ -190,7 +190,24 @@ class NeMoLLMService(LLMService): A service for interacting with NeMo LLM models, this class should be used to create a client for a specific model. """ - def __init__(self, *, api_key: str = None, org_id: str = None, retry_count=5) -> None: + class APIKey(EnvConfigValue): + _ENV_KEY: str = "NGC_API_KEY" + _ALLOW_NONE: bool = True + + class OrgId(EnvConfigValue): + _ENV_KEY: str = "NGC_ORG_ID" + _ALLOW_NONE: bool = True + + class BaseURI(EnvConfigValue): + _ENV_KEY: str = "NGC_API_BASE" + _ALLOW_NONE: bool = True + + def __init__(self, + *, + api_key: APIKey | str = None, + org_id: OrgId | str = None, + base_uri: BaseURI | str = None, + retry_count=5) -> None: """ Creates a service for interacting with NeMo LLM models. @@ -203,6 +220,10 @@ def __init__(self, *, api_key: str = None, org_id: str = None, retry_count=5) -> The organization ID for the LLM service, by default None. If `None` the organization ID will be read from the `NGC_ORG_ID` environment variable. This value is only required if the account associated with the `api_key` is a member of multiple NGC organizations., by default None + base_uri : str, optional + The base URI for the LLM service, by default None. If `None` the base URI will be read from + the `NGC_API_BASE` environment variable. This value is only required if the account associated with the + `api_key` is a member of multiple NGC organizations., by default None retry_count : int, optional The number of times to retry a request before raising an exception, by default 5 @@ -212,22 +233,29 @@ def __init__(self, *, api_key: str = None, org_id: str = None, retry_count=5) -> raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION super().__init__() - api_key = api_key if api_key is not None else os.environ.get("NGC_API_KEY", None) - org_id = org_id if org_id is not None else os.environ.get("NGC_ORG_ID", None) + + if not isinstance(api_key, NeMoLLMService.APIKey): + api_key = NeMoLLMService.APIKey(api_key) + + if not isinstance(org_id, NeMoLLMService.OrgId): + org_id = NeMoLLMService.OrgId(org_id) + + if not isinstance(base_uri, NeMoLLMService.BaseURI): + base_uri = NeMoLLMService.BaseURI(base_uri) self._retry_count = retry_count self._conn = nemollm.NemoLLM( - api_host=os.environ.get("NGC_API_BASE", None), + api_host=base_uri.value, # The client must configure the authentication and authorization parameters # in accordance with the API server security policy. # Configure Bearer authorization - api_key=api_key, + api_key=api_key.value, # If you are in more than one LLM-enabled organization, you must # specify your org ID in the form of a header. This is optional # if you are only in one LLM-enabled org. - org_id=org_id, + org_id=org_id.value, ) def get_client(self, *, model_name: str, **model_kwargs) -> NeMoLLMClient: diff --git a/morpheus/llm/services/openai_chat_service.py b/morpheus/llm/services/openai_chat_service.py index 3179d2474f..d4eab0f31e 100644 --- a/morpheus/llm/services/openai_chat_service.py +++ b/morpheus/llm/services/openai_chat_service.py @@ -24,6 +24,7 @@ from morpheus.llm.services.llm_service import LLMClient from morpheus.llm.services.llm_service import LLMService +from morpheus.utils.env_config_value import EnvConfigValue logger = logging.getLogger(__name__) @@ -65,6 +66,20 @@ def set_output(self, output: typing.Any) -> None: self.outputs = output +class OpenAIOrgId(EnvConfigValue): + _ENV_KEY: str = "OPENAI_ORG_ID" + _ALLOW_NONE: bool = True + + +class OpenAIAPIKey(EnvConfigValue): + _ENV_KEY: str = "OPENAI_API_KEY" + + +class OpenAIBaseURL(EnvConfigValue): + _ENV_KEY: str = "OPENAI_BASE_URL" + _ALLOW_NONE: bool = True + + class OpenAIChatClient(LLMClient): """ Client for interacting with a specific OpenAI chat model. This class should be constructed with the @@ -94,12 +109,24 @@ def __init__(self, model_name: str, set_assistant: bool = False, max_retries: int = 10, + org_id: str | OpenAIOrgId = None, + api_key: str | OpenAIAPIKey = None, + base_url: str | OpenAIBaseURL = None, **model_kwargs) -> None: if IMPORT_EXCEPTION is not None: raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION super().__init__() + if not isinstance(org_id, OpenAIOrgId): + org_id = OpenAIOrgId(org_id) + + if not isinstance(api_key, OpenAIOrgId): + api_key = OpenAIOrgId(api_key) + + if not isinstance(base_url, OpenAIBaseURL): + base_url = OpenAIBaseURL(base_url) + assert parent is not None, "Parent service cannot be None." self._parent = parent @@ -113,8 +140,14 @@ def __init__(self, self._model_kwargs = copy.deepcopy(model_kwargs) # Create the client objects for both sync and async - self._client = openai.OpenAI(max_retries=max_retries) - self._client_async = openai.AsyncOpenAI(max_retries=max_retries) + self._client = openai.OpenAI(max_retries=max_retries, + organization=org_id.value, + api_key=api_key.value, + base_url=base_url.value) + self._client_async = openai.AsyncOpenAI(max_retries=max_retries, + organization=org_id.value, + api_key=api_key.value, + base_url=base_url.value) def get_input_names(self) -> list[str]: input_names = [self._prompt_key] diff --git a/morpheus/stages/postprocess/filter_detections_stage.py b/morpheus/stages/postprocess/filter_detections_stage.py index c071ffb333..9cadb26290 100644 --- a/morpheus/stages/postprocess/filter_detections_stage.py +++ b/morpheus/stages/postprocess/filter_detections_stage.py @@ -23,6 +23,7 @@ from morpheus.common import FilterSource from morpheus.config import Config from morpheus.controllers.filter_detections_controller import FilterDetectionsController +from morpheus.messages import ControlMessage from morpheus.messages import MultiMessage from morpheus.messages import MultiResponseMessage from morpheus.pipeline.single_port_stage import SinglePortStage @@ -103,9 +104,9 @@ def accepted_types(self) -> typing.Tuple: """ if self._controller.filter_source == FilterSource.TENSOR: - return (MultiResponseMessage, ) + return (MultiResponseMessage, ControlMessage) - return (MultiMessage, ) + return (MultiMessage, ControlMessage) def compute_schema(self, schema: StageSchema): self._controller.update_filter_source(message_type=schema.input_type) @@ -117,12 +118,21 @@ def supports_cpp_node(self): def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: if self._build_cpp_node(): - node = _stages.FilterDetectionsStage(builder, - self.unique_name, - self._controller.threshold, - self._copy, - self._controller.filter_source, - self._controller.field_name) + if (self._schema.input_type == ControlMessage): + node = _stages.FilterDetectionsControlMessageStage(builder, + self.unique_name, + self._controller.threshold, + self._copy, + self._controller.filter_source, + self._controller.field_name) + + else: + node = _stages.FilterDetectionsMultiMessageStage(builder, + self.unique_name, + self._controller.threshold, + self._copy, + self._controller.filter_source, + self._controller.field_name) else: if self._copy: diff --git a/morpheus/stages/postprocess/generate_viz_frames_stage.py b/morpheus/stages/postprocess/generate_viz_frames_stage.py index cf60d638b8..b2d059666c 100644 --- a/morpheus/stages/postprocess/generate_viz_frames_stage.py +++ b/morpheus/stages/postprocess/generate_viz_frames_stage.py @@ -32,6 +32,7 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.messages import ControlMessage from morpheus.messages import MultiResponseMessage from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @@ -91,11 +92,11 @@ def accepted_types(self) -> typing.Tuple: Returns ------- - typing.Tuple[morpheus.pipeline.messages.MultiResponseMessage, ] + typing.Tuple[morpheus.pipeline.messages.MultiResponseMessage, ControlMessage] Accepted input types """ - return (MultiResponseMessage, ) + return (MultiResponseMessage, ControlMessage) def supports_cpp_node(self): return False @@ -118,7 +119,7 @@ def round_to_sec(x: int | float): """ return int(round(x / 1000.0) * 1000) - def _to_vis_df(self, x: MultiResponseMessage): + def _to_vis_df(self, x: MultiResponseMessage | ControlMessage): idx2label = { 0: 'address', @@ -133,7 +134,11 @@ def _to_vis_df(self, x: MultiResponseMessage): 9: 'user' } - df = x.get_meta(["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "data"]) + columns = ["timestamp", "src_ip", "dest_ip", "src_port", "dest_port", "data"] + if isinstance(x, MultiResponseMessage): + df = x.get_meta(columns) + elif isinstance(x, ControlMessage): + df = x.payload().get_data(columns) def indent_data(y: str): try: @@ -141,9 +146,16 @@ def indent_data(y: str): except Exception: return y + if isinstance(df, cudf.DataFrame): + df = df.to_pandas() + df["data"] = df["data"].apply(indent_data) - probs = x.get_probs_tensor() + if isinstance(x, MultiResponseMessage): + probs = x.get_probs_tensor() + elif isinstance(x, ControlMessage): + probs = x.tensors().get_tensor("probs") + pass_thresh = (probs >= 0.5).any(axis=1) max_arg = probs.argmax(axis=1) @@ -263,14 +275,21 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> def node_fn(input_obs, output_obs): - def write_batch(x: MultiResponseMessage): + def write_batch(x: MultiResponseMessage | ControlMessage): sink = pa.BufferOutputStream() # This is the timestamp of the earliest message - time0 = x.get_meta("timestamp").min() - - df = x.get_meta(["timestamp", "src_ip", "dest_ip", "secret_keys", "data"]) + if isinstance(x, MultiResponseMessage): + time0 = x.get_meta("timestamp").min() + elif isinstance(x, ControlMessage): + time0 = x.payload().get_data("timestamp").min() + + columns = ["timestamp", "src_ip", "dest_ip", "secret_keys", "data"] + if isinstance(x, MultiResponseMessage): + df = x.get_meta(columns) + elif isinstance(x, ControlMessage): + df = x.payload().get_data(columns) out_df = cudf.DataFrame() diff --git a/morpheus/stages/postprocess/ml_flow_drift_stage.py b/morpheus/stages/postprocess/ml_flow_drift_stage.py index 4e5974cf51..60434c3a42 100644 --- a/morpheus/stages/postprocess/ml_flow_drift_stage.py +++ b/morpheus/stages/postprocess/ml_flow_drift_stage.py @@ -24,6 +24,7 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.messages import ControlMessage from morpheus.messages import MultiResponseMessage from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @@ -119,27 +120,36 @@ def accepted_types(self) -> typing.Tuple: Returns ------- - typing.Tuple[`morpheus.pipeline.messages.MultiResponseMessage`, ] + typing.Tuple[`morpheus.pipeline.messages.MultiResponseMessage`, ControlMessage] Accepted input types. """ - return (MultiResponseMessage, ) + return (MultiResponseMessage, ControlMessage) def supports_cpp_node(self): return False - def _calc_drift(self, x: MultiResponseMessage): + def _calc_drift(self, x: MultiResponseMessage | ControlMessage): + if isinstance(x, MultiResponseMessage): + probs_tensor = x.get_probs_tensor() + elif isinstance(x, ControlMessage): + probs_tensor = x.tensors().get_tensor("probs") # All probs in a batch will be calculated - shifted = cp.abs(x.get_probs_tensor() - 0.5) + 0.5 + shifted = cp.abs(probs_tensor - 0.5) + 0.5 # Make sure the labels list is long enough for label in range(len(self._labels), shifted.shape[1]): self._labels.append(str(label)) - for i in list(range(0, x.count, self._batch_size)): + if isinstance(x, MultiResponseMessage): + count = x.count + elif isinstance(x, ControlMessage): + count = x.payload().count + + for i in list(range(0, count, self._batch_size)): start = i - end = min(start + self._batch_size, x.count) + end = min(start + self._batch_size, count) mean = cp.mean(shifted[start:end, :], axis=0, keepdims=True) # For each column, report the metric diff --git a/morpheus/stages/postprocess/timeseries_stage.py b/morpheus/stages/postprocess/timeseries_stage.py index 28c4b70f2c..5005114df3 100644 --- a/morpheus/stages/postprocess/timeseries_stage.py +++ b/morpheus/stages/postprocess/timeseries_stage.py @@ -28,8 +28,9 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.messages import ControlMessage +from morpheus.messages import MultiResponseAEMessage from morpheus.messages import MultiResponseMessage -from morpheus.messages.multi_ae_message import MultiMessage from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @@ -58,7 +59,6 @@ def calc_bin(obj: pd.Timestamp, time0: pd.Timestamp, resolution_sec: float) -> i """ Calculates the bin spacing between the start and stop timestamp at a specified resolution. """ - return round((round_seconds(obj) - time0).total_seconds()) // resolution_sec @@ -164,7 +164,7 @@ class _TimeSeriesAction: window_end: dt.datetime = None send_message: bool = False - message: MultiResponseMessage = None + message: MultiResponseMessage | ControlMessage = None class _UserTimeSeries: @@ -207,7 +207,8 @@ def __init__(self, self._holding_timestamps = deque() # Stateful members - self._pending_messages: deque[MultiResponseMessage] = deque() # Holds the existing messages pending + self._pending_messages: deque[MultiResponseMessage + | ControlMessage] = deque() # Holds the existing messages pending self._timeseries_data: pd.DataFrame = pd.DataFrame(columns=[self._timestamp_col ]) # Holds all available timeseries data @@ -263,16 +264,24 @@ def _determine_action(self, is_complete: bool) -> typing.Optional[_TimeSeriesAct if (len(self._pending_messages) == 0): return None - # Note: We calculate everything in bins to ensure 1) Full bins, and 2) Even binning + # Note: We calculate everything in bins to ensure 1) Full xbins, and 2) Even binning timeseries_start = self._timeseries_data["event_bin"].iloc[0] timeseries_end = self._timeseries_data["event_bin"].iloc[-1] # Peek the front message - x: MultiResponseMessage = self._pending_messages[0] + x: MultiResponseMessage | ControlMessage = self._pending_messages[0] # Get the first message timestamp - message_start = calc_bin(x.get_meta(self._timestamp_col).iloc[0], self._t0_epoch, self._resolution_sec) - message_end = calc_bin(x.get_meta(self._timestamp_col).iloc[-1], self._t0_epoch, self._resolution_sec) + if isinstance(x, MultiResponseMessage): + message_start = calc_bin(x.get_meta(self._timestamp_col).iloc[0], self._t0_epoch, self._resolution_sec) + message_end = calc_bin(x.get_meta(self._timestamp_col).iloc[-1], self._t0_epoch, self._resolution_sec) + elif isinstance(x, ControlMessage): + message_start = calc_bin(pd.Timestamp(x.payload().get_data(self._timestamp_col).iloc[0]), + self._t0_epoch, + self._resolution_sec) + message_end = calc_bin(pd.Timestamp(x.payload().get_data(self._timestamp_col).iloc[-1]), + self._t0_epoch, + self._resolution_sec) window_start = message_start - self._half_window_bins window_end = message_end + self._half_window_bins @@ -341,17 +350,23 @@ def _determine_action(self, is_complete: bool) -> typing.Optional[_TimeSeriesAct send_message=True, message=self._pending_messages.popleft()) - def _calc_timeseries(self, x: MultiResponseMessage, is_complete: bool): + def _calc_timeseries(self, x: MultiResponseMessage | ControlMessage, is_complete: bool): if (x is not None): # Ensure that we have the meta column set for all messages - x.set_meta("ts_anomaly", False) + if isinstance(x, MultiResponseMessage): + x.set_meta("ts_anomaly", False) + elif isinstance(x, ControlMessage): + x.payload().set_data("ts_anomaly", False) # Save this message in the pending queue self._pending_messages.append(x) - new_timedata = x.get_meta([self._timestamp_col]) + if isinstance(x, MultiResponseMessage): + new_timedata = x.get_meta([self._timestamp_col]) + elif isinstance(x, ControlMessage): + new_timedata = x.payload().get_data([self._timestamp_col]).to_pandas() # Save this message event times in the event list. Ensure the values are always sorted self._timeseries_data = pd.concat([self._timeseries_data, new_timedata]).sort_index() @@ -472,34 +487,38 @@ def accepted_types(self) -> typing.Tuple: Returns ------- - typing.Tuple[`morpheus.pipeline.messages.MultiResponseMessage`, ] + typing.Tuple[`morpheus.pipeline.messages.MultiResponseMessage`, ControlMessage] Accepted input types. """ - return (MultiMessage, ) + return (MultiResponseMessage, ControlMessage) def supports_cpp_node(self): return False - def _call_timeseries_user(self, x: MultiMessage): + def _call_timeseries_user(self, x: MultiResponseAEMessage | ControlMessage): + if isinstance(x, MultiResponseAEMessage): + user_id = x.user_id + elif isinstance(x, ControlMessage): + user_id = x.get_metadata("user_id") - if (x.user_id not in self._timeseries_per_user): - self._timeseries_per_user[x.user_id] = _UserTimeSeries(user_id=x.user_id, - timestamp_col=self._timestamp_col, - resolution=self._resolution, - min_window=self._min_window, - hot_start=self._hot_start, - cold_end=self._cold_end, - filter_percent=self._filter_percent, - zscore_threshold=self._zscore_threshold) + if (user_id not in self._timeseries_per_user): + self._timeseries_per_user[user_id] = _UserTimeSeries(user_id=user_id, + timestamp_col=self._timestamp_col, + resolution=self._resolution, + min_window=self._min_window, + hot_start=self._hot_start, + cold_end=self._cold_end, + filter_percent=self._filter_percent, + zscore_threshold=self._zscore_threshold) - return self._timeseries_per_user[x.user_id]._calc_timeseries(x, False) + return self._timeseries_per_user[user_id]._calc_timeseries(x, False) def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - def on_next(x: MultiMessage): + def on_next(x: MultiResponseMessage | ControlMessage): - message_list: typing.List[MultiResponseMessage] = self._call_timeseries_user(x) + message_list: typing.List[MultiResponseMessage | ControlMessage] = self._call_timeseries_user(x) return message_list @@ -508,7 +527,8 @@ def on_completed(): to_send = [] for timestamp in self._timeseries_per_user.values(): - message_list: typing.List[MultiResponseMessage] = timestamp._calc_timeseries(None, True) + message_list: typing.List[MultiResponseMessage | ControlMessage] = timestamp._calc_timeseries( + None, True) to_send = to_send + message_list diff --git a/morpheus/stages/postprocess/validation_stage.py b/morpheus/stages/postprocess/validation_stage.py index 7ae46db06f..445e8b4be9 100644 --- a/morpheus/stages/postprocess/validation_stage.py +++ b/morpheus/stages/postprocess/validation_stage.py @@ -111,7 +111,7 @@ def accepted_types(self) -> typing.Tuple: Returns ------- - typing.Tuple(`morpheus.pipeline.messages.MultiMessage`, ) + typing.Tuple(`morpheus.pipeline.messages.MultiMessage`, ControlMessage) Accepted input types. """ diff --git a/morpheus/stages/preprocess/preprocess_ae_stage.py b/morpheus/stages/preprocess/preprocess_ae_stage.py index e96a527630..1cf7263d7c 100644 --- a/morpheus/stages/preprocess/preprocess_ae_stage.py +++ b/morpheus/stages/preprocess/preprocess_ae_stage.py @@ -22,9 +22,11 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.messages import ControlMessage from morpheus.messages import InferenceMemoryAE from morpheus.messages import MultiInferenceMessage from morpheus.messages import MultiMessage +from morpheus.messages import TensorMemory as CppTensorMemory from morpheus.messages.multi_ae_message import MultiAEMessage from morpheus.stages.inference.auto_encoder_inference_stage import MultiInferenceAEMessage from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage @@ -58,14 +60,14 @@ def accepted_types(self) -> typing.Tuple: """ Returns accepted input types for this stage. """ - return (MultiAEMessage, ) + return (MultiAEMessage, ControlMessage) def supports_cpp_node(self): return False @staticmethod def pre_process_batch(x: MultiAEMessage, fea_len: int, - feature_columns: typing.List[str]) -> MultiInferenceAEMessage: + feature_columns: typing.List[str]) -> MultiInferenceAEMessage | ControlMessage: """ This function performs pre-processing for autoencoder. @@ -84,7 +86,42 @@ def pre_process_batch(x: MultiAEMessage, fea_len: int, Autoencoder inference message. """ + if isinstance(x, ControlMessage): + return PreprocessAEStage.process_control_message(x, fea_len, feature_columns) + if isinstance(x, MultiAEMessage): + return PreprocessAEStage.process_multi_ae_message(x, fea_len, feature_columns) + raise TypeError("Unsupported message type.") + @staticmethod + def process_control_message(x: ControlMessage, fea_len: int, feature_columns: typing.List[str]) -> ControlMessage: + meta_df = x.payload().get_data(x.payload().df.columns.intersection(feature_columns)) + + autoencoder = x.get_metadata("autoencoder") + scores_mean = x.get_metadata("train_scores_mean") + scores_std = x.get_metadata("train_scores_std") + count = len(meta_df.index) + + inputs = cp.zeros(meta_df.shape, dtype=cp.float32) + + if autoencoder is not None: + data = autoencoder.prepare_df(meta_df) + inputs = autoencoder.build_input_tensor(data) + inputs = cp.asarray(inputs.detach()) + count = inputs.shape[0] + + seg_ids = cp.zeros((count, 3), dtype=cp.uint32) + seg_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32) + seg_ids[:, 2] = fea_len - 1 + + x.set_metadata("autoencoder", autoencoder) + x.set_metadata("train_scores_mean", scores_mean) + x.set_metadata("train_scores_std", scores_std) + x.tensors(CppTensorMemory(count=count, tensors={"input": inputs, "seq_ids": seg_ids})) + return x + + @staticmethod + def process_multi_ae_message(x: MultiAEMessage, fea_len: int, + feature_columns: typing.List[str]) -> MultiInferenceAEMessage: meta_df = x.get_meta(x.meta.df.columns.intersection(feature_columns)) autoencoder = x.model scores_mean = x.train_scores_mean @@ -117,7 +154,8 @@ def pre_process_batch(x: MultiAEMessage, fea_len: int, return infer_message - def _get_preprocess_fn(self) -> typing.Callable[[MultiMessage], MultiInferenceMessage]: + def _get_preprocess_fn( + self) -> typing.Callable[[MultiMessage | ControlMessage], MultiInferenceMessage | ControlMessage]: return partial(PreprocessAEStage.pre_process_batch, fea_len=self._fea_length, feature_columns=self._feature_columns) diff --git a/morpheus/stages/preprocess/preprocess_fil_stage.py b/morpheus/stages/preprocess/preprocess_fil_stage.py index cbfc6a581f..8ff369ebe7 100644 --- a/morpheus/stages/preprocess/preprocess_fil_stage.py +++ b/morpheus/stages/preprocess/preprocess_fil_stage.py @@ -67,7 +67,8 @@ def supports_cpp_node(self): return True @staticmethod - def pre_process_batch(x: MultiMessage, fea_len: int, fea_cols: typing.List[str]) -> MultiInferenceFILMessage: + def pre_process_batch(x: typing.Union[MultiMessage, ControlMessage], fea_len: int, + fea_cols: typing.List[str]) -> typing.Union[MultiMessage, ControlMessage]: """ For FIL category usecases, this function performs pre-processing. diff --git a/morpheus/utils/env_config_value.py b/morpheus/utils/env_config_value.py new file mode 100644 index 0000000000..f2776aa37c --- /dev/null +++ b/morpheus/utils/env_config_value.py @@ -0,0 +1,94 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from abc import ABC +from enum import Enum + + +class EnvConfigValueSource(Enum): + ENV_DEFAULT = 1 + CONSTRUCTOR = 2 + ENV_OVERRIDE = 3 + + +class EnvConfigValue(ABC): + """ + A wrapper for a string used as a configuration value which can be loaded from the system environment or injected via + the constructor. This class should be subclassed and the class fields `_ENV_KEY` and `_ENV_KEY_OVERRIDE` can be set + to enable environment-loading functionality. Convienience properties are available to check from where the value was + loaded. + """ + + _ENV_KEY: str | None = None + _ENV_KEY_OVERRIDE: str | None = None + _ALLOW_NONE: bool = False + + def __init__(self, value: str | None = None, use_env: bool = True): + """ + Parameters + ---------- + value : str, optional + The value to be contained in the EnvConfigValue. If the value is `None`, an attempt will be made to load it + from the environment using `_ENV_KEY`. if the `_ENV_KEY_OVERRIDE` field is not `None`, an attempt will be + made to load that environment variable in place of the passed-in value. + use_env : bool + If False, all environment-loading logic will be bypassed and the passed-in value will be used as-is. + defaults to True. + """ + + self._source = EnvConfigValueSource.CONSTRUCTOR + + if use_env: + if value is None and self.__class__._ENV_KEY is not None: + value = os.environ.get(self.__class__._ENV_KEY, None) + self._source = EnvConfigValueSource.ENV_DEFAULT + + if self.__class__._ENV_KEY_OVERRIDE is not None and self.__class__._ENV_KEY_OVERRIDE in os.environ: + value = os.environ[self.__class__._ENV_KEY_OVERRIDE] + self._source = EnvConfigValueSource.ENV_OVERRIDE + + if not self.__class__._ALLOW_NONE and value is None: + + message = ("value must not be None, but provided value was None and no environment-based default or " + "override was found.") + + if self.__class__._ENV_KEY is None: + raise ValueError(message) + + raise ValueError( + f"{message} Try passing a value to the constructor, or setting the `{self.__class__._ENV_KEY}` " + "environment variable.") + + else: + if not self.__class__._ALLOW_NONE and value is None: + raise ValueError("value must not be none") + + assert isinstance(value, str) or value is None + + self._value = value + self._use_env = use_env + + @property + def source(self) -> EnvConfigValueSource: + return self._source + + @property + def use_env(self) -> bool: + return self._use_env + + @property + def value(self) -> str | None: + return self._value diff --git a/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py b/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py index a4273ca17b..886c339962 100644 --- a/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py +++ b/tests/examples/gnn_fraud_detection_pipeline/test_graph_sage_stage.py @@ -22,9 +22,9 @@ from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage -# pylint: disable=no-name-in-module - +# pylint: disable=no-name-in-module +@pytest.mark.usefixtures("manual_seed") @pytest.mark.use_python class TestGraphSageStage: @@ -68,4 +68,4 @@ def test_process_message(self, cols = results.inductive_embedding_column_names + ['index'] assert sorted(cols) == sorted(expected_df.columns) ind_emb_df = results.get_meta(cols) - dataset_pandas.assert_compare_df(ind_emb_df.to_pandas(), expected_df) + dataset_pandas.assert_compare_df(ind_emb_df.to_pandas(), expected_df, abs_tol=1, rel_tol=1) diff --git a/tests/llm/services/test_openai_chat_client.py b/tests/llm/services/test_openai_chat_client.py index 577c83c7bb..d1bc0172f3 100644 --- a/tests/llm/services/test_openai_chat_client.py +++ b/tests/llm/services/test_openai_chat_client.py @@ -30,7 +30,7 @@ def test_constructor(mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock] assert isinstance(client, LLMClient) for mock_client in mock_chat_completion: - mock_client.assert_called_once_with(max_retries=max_retries) + mock_client.assert_called_once_with(max_retries=max_retries, organization=None, api_key=None, base_url=None) @pytest.mark.parametrize("use_async", [True, False]) diff --git a/tests/stages/test_filter_detections_stage.py b/tests/stages/test_filter_detections_stage.py new file mode 100644 index 0000000000..4f9cb43cfe --- /dev/null +++ b/tests/stages/test_filter_detections_stage.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cupy as cp +import pytest + +import morpheus._lib.messages as _messages +from morpheus.common import FilterSource +from morpheus.messages import ControlMessage +from morpheus.messages import MultiResponseMessage +from morpheus.messages import ResponseMemory +from morpheus.messages.message_meta import MessageMeta +from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage + + +def _make_multi_response_message(df, probs): + df_ = df[0:len(probs)] + mem = ResponseMemory(count=len(df_), tensors={'probs': probs}) + + return MultiResponseMessage(meta=MessageMeta(df_), memory=mem) + + +def _make_control_message(df, probs): + df_ = df[0:len(probs)] + cm = ControlMessage() + cm.payload(MessageMeta(df_)) + cm.tensors(_messages.TensorMemory(count=len(df_), tensors={'probs': probs})) + + return cm + + +def test_constructor(config): + fds = FilterDetectionsStage(config) + assert fds.name == "filter" + + # Just ensure that we get a valid non-empty tuple + accepted_types = fds.accepted_types() + assert isinstance(accepted_types, tuple) + assert len(accepted_types) > 0 + + +@pytest.mark.use_cudf +def test_filter_copy(config, filter_probs_df): + fds = FilterDetectionsStage(config, threshold=0.5, filter_source=FilterSource.TENSOR) + + probs = cp.array([[0.1, 0.5, 0.3], [0.2, 0.3, 0.4]]) + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + mock_control_message = _make_control_message(filter_probs_df, probs) + + # All values are at or below the threshold so nothing should be returned + output_multi_response_message = fds._controller.filter_copy(mock_multi_response_message) + assert output_multi_response_message is None + output_control_message = fds._controller.filter_copy(mock_control_message) + assert output_control_message is None + + # Only one row has a value above the threshold + probs = cp.array([ + [0.2, 0.4, 0.3], + [0.1, 0.5, 0.8], + [0.2, 0.4, 0.3], + ]) + + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + output_multi_response_message = fds._controller.filter_copy(mock_multi_response_message) + assert output_multi_response_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[1:1, :].to_cupy().tolist() + mock_control_message = _make_control_message(filter_probs_df, probs) + output_control_message = fds._controller.filter_copy(mock_control_message) + assert output_control_message.payload().get_data().to_cupy().tolist() == output_multi_response_message.get_meta( + ).to_cupy().tolist() + + # Two adjacent rows have a value above the threashold + probs = cp.array([ + [0.2, 0.4, 0.3], + [0.1, 0.2, 0.3], + [0.1, 0.5, 0.8], + [0.1, 0.9, 0.2], + [0.2, 0.4, 0.3], + ]) + + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + output_multi_response_message = fds._controller.filter_copy(mock_multi_response_message) + assert output_multi_response_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[2:3, :].to_cupy().tolist() + mock_control_message = _make_control_message(filter_probs_df, probs) + output_control_message = fds._controller.filter_copy(mock_control_message) + assert output_control_message.payload().get_data().to_cupy().tolist() == output_multi_response_message.get_meta( + ).to_cupy().tolist() + + # Two non-adjacent rows have a value above the threashold + probs = cp.array([ + [0.2, 0.4, 0.3], + [0.1, 0.2, 0.3], + [0.1, 0.5, 0.8], + [0.4, 0.3, 0.2], + [0.1, 0.9, 0.2], + [0.2, 0.4, 0.3], + ]) + + mask = cp.zeros(len(filter_probs_df), dtype=cp.bool_) + mask[2] = True + mask[4] = True + + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + output_multi_response_message = fds._controller.filter_copy(mock_multi_response_message) + assert output_multi_response_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[ + mask, :].to_cupy().tolist() + mock_control_message = _make_control_message(filter_probs_df, probs) + output_control_message = fds._controller.filter_copy(mock_control_message) + assert output_control_message.payload().get_data().to_cupy().tolist() == output_multi_response_message.get_meta( + ).to_cupy().tolist() + + +@pytest.mark.use_cudf +@pytest.mark.parametrize('do_copy', [True, False]) +@pytest.mark.parametrize('threshold', [0.1, 0.5, 0.8]) +@pytest.mark.parametrize('field_name', ['v1', 'v2', 'v3', 'v4']) +def test_filter_column(config, filter_probs_df, do_copy, threshold, field_name): + fds = FilterDetectionsStage(config, + threshold=threshold, + copy=do_copy, + filter_source=FilterSource.DATAFRAME, + field_name=field_name) + expected_df = filter_probs_df.to_pandas() + expected_df = expected_df[expected_df[field_name] > threshold] + + probs = cp.zeros([len(filter_probs_df), 3], 'float') + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + # All values are at or below the threshold + output_multi_response_message = fds._controller.filter_copy(mock_multi_response_message) + assert output_multi_response_message.get_meta().to_cupy().tolist() == expected_df.to_numpy().tolist() + mock_control_message = _make_control_message(filter_probs_df, probs) + output_control_message = fds._controller.filter_copy(mock_control_message) + assert output_control_message.payload().get_data().to_cupy().tolist() == output_multi_response_message.get_meta( + ).to_cupy().tolist() + + +@pytest.mark.use_cudf +def test_filter_slice(config, filter_probs_df): + fds = FilterDetectionsStage(config, threshold=0.5, filter_source=FilterSource.TENSOR) + + probs = cp.array([[0.1, 0.5, 0.3], [0.2, 0.3, 0.4]]) + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + + # All values are at or below the threshold + output_multi_response_messages = fds._controller.filter_slice(mock_multi_response_message) + assert len(output_multi_response_messages) == 0 + mock_control_message = _make_control_message(filter_probs_df, probs) + output_control_message = fds._controller.filter_slice(mock_control_message) + assert len(output_control_message) == len(output_multi_response_messages) + + # Only one row has a value above the threshold + probs = cp.array([ + [0.2, 0.4, 0.3], + [0.1, 0.5, 0.8], + [0.2, 0.4, 0.3], + ]) + + mock_multi_response_message: MultiResponseMessage = _make_multi_response_message(filter_probs_df, probs) + + output_multi_response_messages = fds._controller.filter_slice(mock_multi_response_message) + assert len(output_multi_response_messages) == 1 + assert output_multi_response_messages[0].get_meta().to_cupy().tolist() == filter_probs_df.loc[ + 1:1, :].to_cupy().tolist() + + mock_control_message = _make_control_message(filter_probs_df, probs) + output_control_message = fds._controller.filter_slice(mock_control_message) + assert len(output_control_message) == len(output_multi_response_messages) + assert output_control_message[0].payload().get_data().to_cupy().tolist( + ) == output_multi_response_messages[0].get_meta().to_cupy().tolist() + + # Two adjacent rows have a value above the threashold + probs = cp.array([ + [0.2, 0.4, 0.3], + [0.1, 0.2, 0.3], + [0.1, 0.5, 0.8], + [0.1, 0.9, 0.2], + [0.2, 0.4, 0.3], + ]) + + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + + output_multi_response_messages = fds._controller.filter_slice(mock_multi_response_message) + assert len(output_multi_response_messages) == 1 + assert output_multi_response_messages[0].offset == 2 + assert output_multi_response_messages[0].count == 2 + assert output_multi_response_messages[0].get_meta().to_cupy().tolist() == filter_probs_df.loc[ + 2:3, :].to_cupy().tolist() + + mock_control_message = _make_control_message(filter_probs_df, probs) + output_control_message = fds._controller.filter_slice(mock_control_message) + assert len(output_control_message) == len(output_multi_response_messages) + assert output_control_message[0].payload().get_data().to_cupy().tolist( + ) == output_multi_response_messages[0].get_meta().to_cupy().tolist() + + # Two non-adjacent rows have a value above the threashold + probs = cp.array([ + [0.2, 0.4, 0.3], + [0.1, 0.2, 0.3], + [0.1, 0.5, 0.8], + [0.4, 0.3, 0.2], + [0.1, 0.9, 0.2], + [0.2, 0.4, 0.3], + ]) + + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + + output_multi_response_messages = fds._controller.filter_slice(mock_multi_response_message) + assert len(output_multi_response_messages) == 2 + + # pylint: disable=unbalanced-tuple-unpacking + (multi_response_msg1, multi_response_msg2) = output_multi_response_messages + assert multi_response_msg1.offset == 2 + assert multi_response_msg1.count == 1 + + assert multi_response_msg2.offset == 4 + assert multi_response_msg2.count == 1 + + assert multi_response_msg1.get_meta().to_cupy().tolist() == filter_probs_df.loc[2:2, :].to_cupy().tolist() + assert multi_response_msg2.get_meta().to_cupy().tolist() == filter_probs_df.loc[4:4, :].to_cupy().tolist() + + mock_control_message = _make_control_message(filter_probs_df, probs) + output_control_message = fds._controller.filter_slice(mock_control_message) + assert len(output_control_message) == len(output_multi_response_messages) + (control_msg1, control_msg2) = output_control_message # pylint: disable=unbalanced-tuple-unpacking + assert control_msg1.payload().count == multi_response_msg1.count + assert control_msg2.payload().count == multi_response_msg2.count + + assert control_msg1.payload().get_data().to_cupy().tolist() == multi_response_msg1.get_meta().to_cupy().tolist() + assert control_msg2.payload().get_data().to_cupy().tolist() == multi_response_msg2.get_meta().to_cupy().tolist() diff --git a/tests/stages/test_generate_viz_frames_stage.py b/tests/stages/test_generate_viz_frames_stage.py new file mode 100644 index 0000000000..7cc79c2d31 --- /dev/null +++ b/tests/stages/test_generate_viz_frames_stage.py @@ -0,0 +1,77 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +import cupy as cp +import typing_utils + +import cudf + +import morpheus._lib.messages as _messages +from morpheus.config import Config +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta +from morpheus.messages import MultiResponseMessage +from morpheus.messages import ResponseMemory +from morpheus.stages.postprocess.generate_viz_frames_stage import GenerateVizFramesStage + + +def _make_multi_response_message(df, probs): + df_ = df[0:len(probs)] + mem = ResponseMemory(count=len(df_), tensors={'probs': probs}) + + return MultiResponseMessage(meta=MessageMeta(df_), memory=mem) + + +def _make_control_message(df, probs): + df_ = df[0:len(probs)] + cm = ControlMessage() + cm.payload(MessageMeta(df_)) + cm.tensors(_messages.TensorMemory(count=len(df_), tensors={'probs': probs})) + + return cm + + +def test_constructor(config: Config): + stage = GenerateVizFramesStage(config) + assert stage.name == "gen_viz" + + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiResponseMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) + + +def test_process_control_message_and_multi_message(config: Config): + stage = GenerateVizFramesStage(config) + + df = cudf.DataFrame({ + "timestamp": [1616380971990, 1616380971991], + "src_ip": ["10.20.16.248", "10.244.0.1"], + "dest_ip": ["10.244.0.59", "10.244.0.25"], + "src_port": ["50410", "50410"], + "dest_port": ["80", "80"], + "data": ["a", "b"] + }) + + probs = cp.array([[0.1, 0.5, 0.3], [0.2, 0.3, 0.4]]) + mock_multi_response_message = _make_multi_response_message(df, probs) + mock_control_message = _make_control_message(df, probs) + + output_multi_response_message_list = stage._to_vis_df(mock_multi_response_message) + output_control_message_list = stage._to_vis_df(mock_control_message) + for output_multi_response_message, output_control_message in zip(output_multi_response_message_list, + output_control_message_list): + assert output_multi_response_message[1].equals(output_control_message[1]) diff --git a/tests/stages/test_ml_flow_drift_stage.py b/tests/stages/test_ml_flow_drift_stage.py new file mode 100644 index 0000000000..3f41683315 --- /dev/null +++ b/tests/stages/test_ml_flow_drift_stage.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing +from unittest.mock import patch + +import cupy as cp +import pytest +import typing_utils + +import morpheus._lib.messages as _messages +from morpheus.messages import ControlMessage +from morpheus.messages import MultiResponseMessage +from morpheus.messages import ResponseMemory +from morpheus.messages.message_meta import MessageMeta +from morpheus.stages.postprocess.ml_flow_drift_stage import MLFlowDriftStage + + +def _make_multi_response_message(df, probs): + df_ = df[0:len(probs)] + mem = ResponseMemory(count=len(df_), tensors={'probs': probs}) + + return MultiResponseMessage(meta=MessageMeta(df_), count=len(df_), memory=mem) + + +def _make_control_message(df, probs): + df_ = df[0:len(probs)] + cm = ControlMessage() + cm.payload(MessageMeta(df_)) + cm.tensors(_messages.TensorMemory(count=len(df_), tensors={'probs': probs})) + + return cm + + +def test_constructor(config): + with patch("morpheus.stages.postprocess.ml_flow_drift_stage.mlflow.start_run"): + stage = MLFlowDriftStage(config) + assert stage.name == "mlflow_drift" + + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiResponseMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) + + +@pytest.mark.use_cudf +@pytest.mark.use_python +def test_calc_drift(config, filter_probs_df): + with patch("morpheus.stages.postprocess.ml_flow_drift_stage.mlflow.start_run"): + labels = ["a", "b", "c"] + stage = MLFlowDriftStage(config, labels=labels, batch_size=1) + + probs = cp.array([[0.1, 0.5, 0.3], [0.2, 0.3, 0.4]]) + mock_multi_response_message = _make_multi_response_message(filter_probs_df, probs) + mock_control_message = _make_control_message(filter_probs_df, probs) + + expected_metrics = [{ + 'a': 0.9, 'b': 0.5, 'c': 0.7, 'total': 0.6999999999999998 + }, { + 'a': 0.8, 'b': 0.7, 'c': 0.6, 'total': 0.7000000000000001 + }] + + multi_response_message_metrics = [] + with patch("morpheus.stages.postprocess.ml_flow_drift_stage.mlflow.log_metrics") as mock_log_metrics: + stage._calc_drift(mock_multi_response_message) + for call_arg in mock_log_metrics.call_args_list: + multi_response_message_metrics.append(call_arg[0][0]) + assert multi_response_message_metrics == expected_metrics + + control_message_metrics = [] + with patch("morpheus.stages.postprocess.ml_flow_drift_stage.mlflow.log_metrics") as mock_log_metrics: + stage._calc_drift(mock_control_message) + for call_arg in mock_log_metrics.call_args_list: + control_message_metrics.append(call_arg[0][0]) + assert control_message_metrics == multi_response_message_metrics diff --git a/tests/stages/test_preprocess_ae_stage.py b/tests/stages/test_preprocess_ae_stage.py new file mode 100644 index 0000000000..d702abee54 --- /dev/null +++ b/tests/stages/test_preprocess_ae_stage.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +import cupy as cp +import pytest +import typing_utils + +import cudf + +from morpheus.config import Config +from morpheus.config import ConfigAutoEncoder +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta +from morpheus.messages import MultiAEMessage +from morpheus.stages.preprocess.preprocess_ae_stage import PreprocessAEStage + + +@pytest.fixture(name='config') +def fixture_config(config: Config): + config.feature_length = 256 + config.ae = ConfigAutoEncoder() + config.ae.feature_columns = ["data"] + yield config + + +def test_constructor(config: Config): + stage = PreprocessAEStage(config) + assert stage.name == "preprocess-ae" + + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiAEMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) + + +def test_process_control_message_and_multi_message(config: Config): + stage = PreprocessAEStage(config) + + df = cudf.DataFrame({"data": ["a", "b", "c"]}) + meta = MessageMeta(df) + + input_multi_ae_message = MultiAEMessage(meta=meta, + mess_offset=0, + mess_count=3, + model=None, + train_scores_mean=0.0, + train_scores_std=1.0) + + output_multi_inference_ae_message = stage.pre_process_batch(input_multi_ae_message, + fea_len=256, + feature_columns=["data"]) + + input_control_message = ControlMessage() + input_control_message.payload(meta) + + output_control_message = stage.pre_process_batch(input_control_message, fea_len=256, feature_columns=["data"]) + + # Check if each tensor in the control message is equal to the corresponding tensor in the inference message + for tensor_key in output_control_message.tensors().tensor_names: + assert cp.array_equal(output_control_message.tensors().get_tensor(tensor_key), + getattr(output_multi_inference_ae_message, tensor_key)) diff --git a/tests/stages/test_preprocess_fil_stage.py b/tests/stages/test_preprocess_fil_stage.py index 638fcaa994..cdbe66dafe 100644 --- a/tests/stages/test_preprocess_fil_stage.py +++ b/tests/stages/test_preprocess_fil_stage.py @@ -13,8 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing + import cupy as cp import pytest +import typing_utils import cudf @@ -40,9 +43,9 @@ def test_constructor(config: Config): assert stage._fea_length == config.feature_length assert stage.features == config.fil.feature_columns - accepted_types = stage.accepted_types() - assert isinstance(accepted_types, tuple) - assert len(accepted_types) > 0 + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) def test_process_control_message(config: Config): diff --git a/tests/stages/test_preprocess_nlp_stage.py b/tests/stages/test_preprocess_nlp_stage.py index 22fc99e04a..9c202a168d 100644 --- a/tests/stages/test_preprocess_nlp_stage.py +++ b/tests/stages/test_preprocess_nlp_stage.py @@ -13,11 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing from unittest.mock import Mock from unittest.mock import patch import cupy as cp import pytest +import typing_utils import cudf @@ -61,9 +63,9 @@ def test_constructor(config: Config): assert stage._do_lower_case is False assert stage._add_special_tokens is False - accepted_types = stage.accepted_types() - assert isinstance(accepted_types, tuple) - assert len(accepted_types) > 0 + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) @patch("morpheus.stages.preprocess.preprocess_nlp_stage.tokenize_text_series") diff --git a/tests/stages/test_timeseries_stage.py b/tests/stages/test_timeseries_stage.py new file mode 100644 index 0000000000..8babd0e752 --- /dev/null +++ b/tests/stages/test_timeseries_stage.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +import cupy as cp +import pandas as pd +import pytest +import typing_utils + +import morpheus._lib.messages as _messages +from morpheus.config import Config +from morpheus.config import ConfigAutoEncoder +from morpheus.messages import ControlMessage +from morpheus.messages import MultiResponseAEMessage +from morpheus.messages import MultiResponseMessage +from morpheus.messages import ResponseMemory +from morpheus.messages.message_meta import MessageMeta +from morpheus.stages.postprocess.timeseries_stage import TimeSeriesStage + + +@pytest.fixture(name='config') +def fixture_config(config: Config): + config.feature_length = 256 + config.ae = ConfigAutoEncoder() + config.ae.feature_columns = ["data"] + config.ae.timestamp_column_name = "ts" + yield config + + +def _make_multi_response_ae_message(df, probs): + df_ = df[0:len(probs)] + mem = ResponseMemory(count=len(df_), tensors={'probs': probs}) + + return MultiResponseAEMessage(meta=MessageMeta(df_), count=len(df_), memory=mem, user_id="test_user_id") + + +def _make_control_message(df, probs): + df_ = df[0:len(probs)] + cm = ControlMessage() + cm.payload(MessageMeta(df_)) + cm.tensors(_messages.TensorMemory(count=len(df_), tensors={'probs': probs})) + cm.set_metadata("user_id", "test_user_id") + + return cm + + +def test_constructor(config): + stage = TimeSeriesStage(config) + assert stage.name == "timeseries" + + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiResponseMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) + + +@pytest.mark.use_cudf +@pytest.mark.use_python +def test_call_timeseries_user(config): + stage = TimeSeriesStage(config) + + df = pd.DataFrame({"ts": pd.date_range(start='01-01-2022', periods=5)}) + probs = cp.array([[0.1, 0.5, 0.3], [0.2, 0.3, 0.4]]) + mock_multi_response_ae_message = _make_multi_response_ae_message(df, probs) + mock_control_message = _make_control_message(df, probs) + + assert stage._call_timeseries_user(mock_multi_response_ae_message)[0].user_id == "test_user_id" + assert stage._call_timeseries_user(mock_control_message)[0].get_metadata("user_id") == "test_user_id" diff --git a/tests/stages/test_validation_stage.py b/tests/stages/test_validation_stage.py new file mode 100644 index 0000000000..8f15799b63 --- /dev/null +++ b/tests/stages/test_validation_stage.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +import pandas as pd +import typing_utils + +from morpheus.messages import ControlMessage +from morpheus.messages import MultiMessage +from morpheus.messages.message_meta import MessageMeta +from morpheus.stages.postprocess.validation_stage import ValidationStage + + +def _make_multi_message(df): + return MultiMessage(meta=MessageMeta(df)) + + +def _make_control_message(df): + cm = ControlMessage() + cm.payload(MessageMeta(df)) + + return cm + + +def test_constructor(config): + df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}) + stage = ValidationStage(config, val_file_name=df) + assert stage.name == "validation" + + # Just ensure that we get a valid non-empty tuple + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) + + +def test_do_comparison(config): + df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}) + stage = ValidationStage(config, val_file_name=df) + + mm = _make_multi_message(df) + cm = _make_control_message(df) + + stage._append_message(mm) + mm_results = stage.get_results(clear=True) + stage._append_message(cm) + cm_results = stage.get_results(clear=True) + assert mm_results == cm_results diff --git a/tests/test_add_classifications_stage.py b/tests/test_add_classifications_stage.py index 80091f3dc5..e3bbf70c1a 100755 --- a/tests/test_add_classifications_stage.py +++ b/tests/test_add_classifications_stage.py @@ -14,8 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing + import cupy as cp import pytest +import typing_utils import cudf @@ -43,10 +46,9 @@ def test_constructor(config: Config): assert stage._idx2label == {0: 'frogs', 1: 'lizards', 2: 'toads'} assert stage.name == "add-class" - # Just ensure that we get a valid non-empty tuple - accepted_types = stage.accepted_types() - assert isinstance(accepted_types, tuple) - assert len(accepted_types) > 0 + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiResponseMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) def test_constructor_explicit_labels(config: Config): diff --git a/tests/test_add_scores_stage.py b/tests/test_add_scores_stage.py index e454a0e35f..0e347c7d78 100755 --- a/tests/test_add_scores_stage.py +++ b/tests/test_add_scores_stage.py @@ -14,8 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import typing + import cupy as cp import pytest +import typing_utils import cudf @@ -44,10 +47,9 @@ def test_constructor(config: Config): assert stage._idx2label == {0: 'frogs', 1: 'lizards', 2: 'toads'} assert stage.name == "add-scores" - # Just ensure that we get a valid non-empty tuple - accepted_types = stage.accepted_types() - assert isinstance(accepted_types, tuple) - assert len(accepted_types) > 0 + accepted_union = typing.Union[stage.accepted_types()] + assert typing_utils.issubtype(MultiResponseMessage, accepted_union) + assert typing_utils.issubtype(ControlMessage, accepted_union) def test_constructor_explicit_labels(config: Config): diff --git a/tests/test_env_config_value.py b/tests/test_env_config_value.py new file mode 100644 index 0000000000..54fb243556 --- /dev/null +++ b/tests/test_env_config_value.py @@ -0,0 +1,118 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from unittest import mock + +import pytest + +from morpheus.utils.env_config_value import EnvConfigValue +from morpheus.utils.env_config_value import EnvConfigValueSource + + +class EnvDrivenValue(EnvConfigValue): + _ENV_KEY = "DEFAULT" + _ENV_KEY_OVERRIDE = "OVERRIDE" + + +def test_env_driven_value(): + with mock.patch.dict(os.environ, clear=True, values={"DEFAULT": "default.api.com"}): + + config = EnvDrivenValue() + assert config.value == "default.api.com" + assert config.source == EnvConfigValueSource.ENV_DEFAULT + assert config.use_env + + with pytest.raises(ValueError): + config = EnvDrivenValue(use_env=False) + + config = EnvDrivenValue("api.com") + assert config.value == "api.com" + assert config.source == EnvConfigValueSource.CONSTRUCTOR + assert config.use_env + + with mock.patch.dict(os.environ, clear=True, values={"OVERRIDE": "override.api.com"}): + + config = EnvDrivenValue("api.com") + assert config.value == "override.api.com" + assert config.source == EnvConfigValueSource.ENV_OVERRIDE + assert config.use_env + + config = EnvDrivenValue("api.com", use_env=False) + assert config.value == "api.com" + assert config.source == EnvConfigValueSource.CONSTRUCTOR + assert not config.use_env + + +class EnvDriverValueNoOverride(EnvConfigValue): + _ENV_KEY = "DEFAULT" + + +def test_env_driven_value_no_override(): + with mock.patch.dict(os.environ, clear=True, values={"DEFAULT": "default.api.com"}): + + config = EnvDriverValueNoOverride() + assert config.value == "default.api.com" + assert config.source == EnvConfigValueSource.ENV_DEFAULT + assert config.use_env + + with pytest.raises(ValueError): + config = EnvDriverValueNoOverride(use_env=False) + + config = EnvDriverValueNoOverride("api.com") + assert config.value == "api.com" + assert config.source == EnvConfigValueSource.CONSTRUCTOR + assert config.use_env + + with mock.patch.dict(os.environ, clear=True, values={"OVERRIDE": "override.api.com"}): + + config = EnvDriverValueNoOverride("api.com") + assert config.value == "api.com" + assert config.source == EnvConfigValueSource.CONSTRUCTOR + assert config.use_env + + +class EnvDrivenValueNoDefault(EnvConfigValue): + _ENV_KEY_OVERRIDE = "OVERRIDE" + + +def test_env_driven_value_no_default(): + with mock.patch.dict(os.environ, clear=True, values={"DEFAULT": "default.api.com"}): + + with pytest.raises(ValueError): + config = EnvDrivenValueNoDefault() + + config = EnvDrivenValueNoDefault("api.com") + assert config.value == "api.com" + assert config.source == EnvConfigValueSource.CONSTRUCTOR + assert config.use_env + + with mock.patch.dict(os.environ, clear=True, values={"OVERRIDE": "override.api.com"}): + + config = EnvDrivenValueNoDefault("api.com") + assert config.value == "override.api.com" + assert config.source == EnvConfigValueSource.ENV_OVERRIDE + assert config.use_env + + +class EnvOptionalValue(EnvConfigValue): + _ALLOW_NONE = True + + +def test_env_optional_value(): + config = EnvOptionalValue() + assert config.value is None + assert config.source == EnvConfigValueSource.CONSTRUCTOR + assert config.use_env diff --git a/tests/test_filter_detections_stage.py b/tests/test_filter_detections_stage.py deleted file mode 100755 index ba8ed0591f..0000000000 --- a/tests/test_filter_detections_stage.py +++ /dev/null @@ -1,192 +0,0 @@ -#!/usr/bin/env python -# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import cupy as cp -import pytest - -from morpheus.common import FilterSource -from morpheus.messages import MultiResponseMessage -from morpheus.messages import ResponseMemory -from morpheus.messages.message_meta import MessageMeta -from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage - - -def _make_message(df, probs): - df_ = df[0:len(probs)] - mem = ResponseMemory(count=len(df_), tensors={'probs': probs}) - return MultiResponseMessage(meta=MessageMeta(df_), memory=mem) - - -def test_constructor(config): - fds = FilterDetectionsStage(config) - assert fds.name == "filter" - - # Just ensure that we get a valid non-empty tuple - accepted_types = fds.accepted_types() - assert isinstance(accepted_types, tuple) - assert len(accepted_types) > 0 - - fds = FilterDetectionsStage(config, threshold=0.2) - assert fds._controller._threshold == 0.2 - - -@pytest.mark.use_cudf -@pytest.mark.use_python -def test_filter_copy(config, filter_probs_df): - fds = FilterDetectionsStage(config, threshold=0.5, filter_source=FilterSource.TENSOR) - - probs = cp.array([[0.1, 0.5, 0.3], [0.2, 0.3, 0.4]]) - mock_message = _make_message(filter_probs_df, probs) - - # All values are at or below the threshold so nothing should be returned - output_message = fds._controller.filter_copy(mock_message) - assert output_message is None - - # Only one row has a value above the threshold - probs = cp.array([ - [0.2, 0.4, 0.3], - [0.1, 0.5, 0.8], - [0.2, 0.4, 0.3], - ]) - - mock_message = _make_message(filter_probs_df, probs) - - output_message = fds._controller.filter_copy(mock_message) - assert output_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[1:1, :].to_cupy().tolist() - - # Two adjacent rows have a value above the threashold - probs = cp.array([ - [0.2, 0.4, 0.3], - [0.1, 0.2, 0.3], - [0.1, 0.5, 0.8], - [0.1, 0.9, 0.2], - [0.2, 0.4, 0.3], - ]) - - mock_message = _make_message(filter_probs_df, probs) - - output_message = fds._controller.filter_copy(mock_message) - assert output_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[2:3, :].to_cupy().tolist() - - # Two non-adjacent rows have a value above the threashold - probs = cp.array([ - [0.2, 0.4, 0.3], - [0.1, 0.2, 0.3], - [0.1, 0.5, 0.8], - [0.4, 0.3, 0.2], - [0.1, 0.9, 0.2], - [0.2, 0.4, 0.3], - ]) - - mock_message = _make_message(filter_probs_df, probs) - - output_message = fds._controller.filter_copy(mock_message) - mask = cp.zeros(len(filter_probs_df), dtype=cp.bool_) - mask[2] = True - mask[4] = True - assert output_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[mask, :].to_cupy().tolist() - - -@pytest.mark.use_cudf -@pytest.mark.use_python -@pytest.mark.parametrize('do_copy', [True, False]) -@pytest.mark.parametrize('threshold', [0.1, 0.5, 0.8]) -@pytest.mark.parametrize('field_name', ['v1', 'v2', 'v3', 'v4']) -def test_filter_column(config, filter_probs_df, do_copy, threshold, field_name): - fds = FilterDetectionsStage(config, - threshold=threshold, - copy=do_copy, - filter_source=FilterSource.DATAFRAME, - field_name=field_name) - expected_df = filter_probs_df.to_pandas() - expected_df = expected_df[expected_df[field_name] > threshold] - - probs = cp.zeros([len(filter_probs_df), 3], 'float') - mock_message = _make_message(filter_probs_df, probs) - - # All values are at or below the threshold - output_message = fds._controller.filter_copy(mock_message) - - assert output_message.get_meta().to_cupy().tolist() == expected_df.to_numpy().tolist() - - -@pytest.mark.use_cudf -@pytest.mark.use_python -def test_filter_slice(config, filter_probs_df): - fds = FilterDetectionsStage(config, threshold=0.5, filter_source=FilterSource.TENSOR) - - probs = cp.array([[0.1, 0.5, 0.3], [0.2, 0.3, 0.4]]) - mock_message = _make_message(filter_probs_df, probs) - - # All values are at or below the threshold - output_messages = fds._controller.filter_slice(mock_message) - assert len(output_messages) == 0 - - # Only one row has a value above the threshold - probs = cp.array([ - [0.2, 0.4, 0.3], - [0.1, 0.5, 0.8], - [0.2, 0.4, 0.3], - ]) - - mock_message = _make_message(filter_probs_df, probs) - - output_messages = fds._controller.filter_slice(mock_message) - assert len(output_messages) == 1 - output_message = output_messages[0] - assert output_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[1:1, :].to_cupy().tolist() - - # Two adjacent rows have a value above the threashold - probs = cp.array([ - [0.2, 0.4, 0.3], - [0.1, 0.2, 0.3], - [0.1, 0.5, 0.8], - [0.1, 0.9, 0.2], - [0.2, 0.4, 0.3], - ]) - - mock_message = _make_message(filter_probs_df, probs) - - output_messages = fds._controller.filter_slice(mock_message) - assert len(output_messages) == 1 - output_message = output_messages[0] - assert output_message.offset == 2 - assert output_message.count == 2 - assert output_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[2:3, :].to_cupy().tolist() - - # Two non-adjacent rows have a value above the threashold - probs = cp.array([ - [0.2, 0.4, 0.3], - [0.1, 0.2, 0.3], - [0.1, 0.5, 0.8], - [0.4, 0.3, 0.2], - [0.1, 0.9, 0.2], - [0.2, 0.4, 0.3], - ]) - - mock_message = _make_message(filter_probs_df, probs) - - output_messages = fds._controller.filter_slice(mock_message) - assert len(output_messages) == 2 - (msg1, msg2) = output_messages # pylint: disable=unbalanced-tuple-unpacking - assert msg1.offset == 2 - assert msg1.count == 1 - - assert msg2.offset == 4 - assert msg2.count == 1 - - assert msg1.get_meta().to_cupy().tolist() == filter_probs_df.loc[2:2, :].to_cupy().tolist() - assert msg2.get_meta().to_cupy().tolist() == filter_probs_df.loc[4:4, :].to_cupy().tolist() diff --git a/tests/test_filter_detections_stage_pipe.py b/tests/test_filter_detections_stage_pipe.py index e90ea13b3f..15e36bd244 100755 --- a/tests/test_filter_detections_stage_pipe.py +++ b/tests/test_filter_detections_stage_pipe.py @@ -24,7 +24,9 @@ from _utils import assert_results from _utils.dataset_manager import DatasetManager from _utils.stages.conv_msg import ConvMsg +from morpheus.common import FilterSource from morpheus.config import Config +from morpheus.messages import ControlMessage from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage from morpheus.messages import MultiResponseMessage @@ -92,6 +94,30 @@ def _test_filter_detections_stage_multi_segment_pipe(config: Config, dataset_pan assert_results(comp_stage.get_results()) +def _test_filter_detections_control_message_stage_multi_segment_pipe(config: Config, + dataset_pandas: DatasetManager, + copy: bool = True): + threshold = 0.75 + + input_df = dataset_pandas["filter_probs.csv"] + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, [cudf.DataFrame(input_df)])) + pipe.add_segment_boundary(MessageMeta) + pipe.add_stage(DeserializeStage(config, message_type=ControlMessage)) + pipe.add_segment_boundary(data_type=ControlMessage) + pipe.add_stage(ConvMsg(config, message_type=ControlMessage)) + pipe.add_segment_boundary(ControlMessage) + pipe.add_stage(FilterDetectionsStage(config, threshold=threshold, copy=copy, filter_source=FilterSource.TENSOR)) + pipe.add_segment_boundary(ControlMessage) + pipe.add_stage(SerializeStage(config)) + pipe.add_segment_boundary(MessageMeta) + comp_stage = pipe.add_stage( + CompareDataFrameStage(config, build_expected(dataset_pandas["filter_probs.csv"], threshold))) + pipe.run() + + assert_results(comp_stage.get_results()) + + @pytest.mark.slow @pytest.mark.parametrize('order', ['F', 'C']) @pytest.mark.parametrize('pipeline_batch_size', [256, 1024, 2048]) @@ -109,3 +135,10 @@ def test_filter_detections_stage_pipe(config: Config, @pytest.mark.parametrize('do_copy', [True, False]) def test_filter_detections_stage_multi_segment_pipe(config: Config, dataset_pandas: DatasetManager, do_copy: bool): return _test_filter_detections_stage_multi_segment_pipe(config, dataset_pandas, do_copy) + + +@pytest.mark.parametrize('do_copy', [True, False]) +def test_filter_detections_control_message_stage_multi_segment_pipe(config: Config, + dataset_pandas: DatasetManager, + do_copy: bool): + return _test_filter_detections_control_message_stage_multi_segment_pipe(config, dataset_pandas, do_copy) diff --git a/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv b/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv index 9ae8600dd2..8c1309ce6b 100644 --- a/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv +++ b/tests/tests_data/examples/gnn_fraud_detection_pipeline/inductive_emb.csv @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:e5f824196fdc9a0e027d65369ae7f73e44fe2986058cc1af46c3c04d62217f74 -size 8641 +oid sha256:aa6fb37a801c5baf58d9512050671e55e21d132c7e56784d49f9910c7d0a3a26 +size 7525