diff --git a/.clang-tidy b/.clang-tidy index f845cdff71..a815b783f2 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -5,7 +5,19 @@ # Disable modernize-use-trailing-return-type (too many hits) # Disable modernize-use-nodiscard (why isnt [[nodiscard]] the default?) # Enable identifier in TODO comments -Checks: -*,readability-identifier-naming,modernize-*,-modernize-use-trailing-return-type,-modernize-use-nodiscard,google-readability-todo +Checks: > + -*, + readability-identifier-naming, + modernize-*, + -modernize-use-trailing-return-type, + -modernize-use-nodiscard, + google-readability-todo + +WarningsAsErrors: > + *, + -clang-diagnostic-unused-command-line-argument, + -Wno-ignored-optimization-argument, + -Qunused-arguments #WarningsAsErrors: '*' HeaderFilterRegex: '.*\/include\/morpheus\/.*' diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 0253b975a8..9195d475d4 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,6 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM rapidsai/devcontainers:23.10-cpp-cuda11.8-mambaforge-ubuntu22.04 AS base +FROM rapidsai/devcontainers:23.12-cpp-cuda12.1-mambaforge-ubuntu22.04 AS base ENV PATH="${PATH}:/workspaces/morpheus/.devcontainer/bin" diff --git a/.devcontainer/README.md b/.devcontainer/README.md index f97716e67b..8a91347e49 100644 --- a/.devcontainer/README.md +++ b/.devcontainer/README.md @@ -1,5 +1,5 @@ +# Morpheus 24.03.00 (7 Apr 2024) + +## 🚨 Breaking Changes + +- Updating `nlohman_json` to 3.11 to match MRC ([#1596](https://github.com/nv-morpheus/Morpheus/pull/1596)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- Add retry logic and proxy support to the NeMo LLM Service ([#1544](https://github.com/nv-morpheus/Morpheus/pull/1544)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- Upgrade `openai` version to 1.13 and `langchain` to version 0.1.9 ([#1529](https://github.com/nv-morpheus/Morpheus/pull/1529)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- Make `start_async()` available to source stages ([#1523](https://github.com/nv-morpheus/Morpheus/pull/1523)) [@efajardo-nv](https://github.com/efajardo-nv) +- RAPIDS 24.02 Upgrade ([#1468](https://github.com/nv-morpheus/Morpheus/pull/1468)) [@cwharris](https://github.com/cwharris) +- Decouple TritonInferenceStage from pipeline mode ([#1402](https://github.com/nv-morpheus/Morpheus/pull/1402)) [@dagardner-nv](https://github.com/dagardner-nv) + +## 🐛 Bug Fixes + +- Serialize datetime objects into the module config ([#1592](https://github.com/nv-morpheus/Morpheus/pull/1592)) [@dagardner-nv](https://github.com/dagardner-nv) +- Remove the defaults channel from `dependencies.yml` ([#1584](https://github.com/nv-morpheus/Morpheus/pull/1584)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- Fix `iso_date_regex_pattern` config in `file_batcher` module and allow override ([#1580](https://github.com/nv-morpheus/Morpheus/pull/1580)) [@efajardo-nv](https://github.com/efajardo-nv) +- Update DFP MLflow ModelManager to handle model retrieval using file URI ([#1578](https://github.com/nv-morpheus/Morpheus/pull/1578)) [@efajardo-nv](https://github.com/efajardo-nv) +- Fix `configure_logging` in DFP benchmarks ([#1553](https://github.com/nv-morpheus/Morpheus/pull/1553)) [@efajardo-nv](https://github.com/efajardo-nv) +- Catch langchain agent errors ([#1539](https://github.com/nv-morpheus/Morpheus/pull/1539)) [@dagardner-nv](https://github.com/dagardner-nv) +- Adding missing dependency on `pydantic` ([#1535](https://github.com/nv-morpheus/Morpheus/pull/1535)) [@yuchenz427](https://github.com/yuchenz427) +- Fix memory leak in the mutable dataframe checkout/checkin code ([#1534](https://github.com/nv-morpheus/Morpheus/pull/1534)) [@dagardner-nv](https://github.com/dagardner-nv) +- Fix pathlib.Path support for FileSourceStage ([#1531](https://github.com/nv-morpheus/Morpheus/pull/1531)) [@dagardner-nv](https://github.com/dagardner-nv) +- Make `start_async()` available to source stages ([#1523](https://github.com/nv-morpheus/Morpheus/pull/1523)) [@efajardo-nv](https://github.com/efajardo-nv) +- Update CI Containers ([#1521](https://github.com/nv-morpheus/Morpheus/pull/1521)) [@cwharris](https://github.com/cwharris) +- Fix intermittent segfault on interpreter shutdown ([#1513](https://github.com/nv-morpheus/Morpheus/pull/1513)) [@dagardner-nv](https://github.com/dagardner-nv) +- Adopt updated builds of CI runners ([#1503](https://github.com/nv-morpheus/Morpheus/pull/1503)) [@dagardner-nv](https://github.com/dagardner-nv) +- Update mlflow plugin version for deployments fix ([#1499](https://github.com/nv-morpheus/Morpheus/pull/1499)) [@pdmack](https://github.com/pdmack) +- Add runtime environment output to fix building the release container ([#1496](https://github.com/nv-morpheus/Morpheus/pull/1496)) [@cwharris](https://github.com/cwharris) +- Fix logging of sleep time ([#1493](https://github.com/nv-morpheus/Morpheus/pull/1493)) [@dagardner-nv](https://github.com/dagardner-nv) +- Pin pytest to <8 ([#1485](https://github.com/nv-morpheus/Morpheus/pull/1485)) [@dagardner-nv](https://github.com/dagardner-nv) +- Improve pipeline stop logic to ensure join is called exactly once for all stages ([#1479](https://github.com/nv-morpheus/Morpheus/pull/1479)) [@efajardo-nv](https://github.com/efajardo-nv) +- Fix expected JSON config file extension in logger ([#1471](https://github.com/nv-morpheus/Morpheus/pull/1471)) [@efajardo-nv](https://github.com/efajardo-nv) +- Fix Loss Function to Improve Model Convergence for `AutoEncoder` ([#1460](https://github.com/nv-morpheus/Morpheus/pull/1460)) [@hsin-c](https://github.com/hsin-c) +- GNN fraud detection notebook fix ([#1450](https://github.com/nv-morpheus/Morpheus/pull/1450)) [@efajardo-nv](https://github.com/efajardo-nv) +- Eliminate Redundant Fetches in RSS Controller ([#1442](https://github.com/nv-morpheus/Morpheus/pull/1442)) [@bsuryadevara](https://github.com/bsuryadevara) +- Updating the workspace settings to remove deprecated python options ([#1440](https://github.com/nv-morpheus/Morpheus/pull/1440)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- Improve camouflage startup issues ([#1436](https://github.com/nv-morpheus/Morpheus/pull/1436)) [@dagardner-nv](https://github.com/dagardner-nv) +- Fixes to modular DFP examples and benchmarks ([#1429](https://github.com/nv-morpheus/Morpheus/pull/1429)) [@efajardo-nv](https://github.com/efajardo-nv) + +## 📖 Documentation + +- Update minimum compute requirements to Volta ([#1594](https://github.com/nv-morpheus/Morpheus/pull/1594)) [@dagardner-nv](https://github.com/dagardner-nv) +- Fix broken link in getting started with Morpheus doc ([#1494](https://github.com/nv-morpheus/Morpheus/pull/1494)) [@edknv](https://github.com/edknv) +- Update abp-model-card.md ([#1439](https://github.com/nv-morpheus/Morpheus/pull/1439)) [@drobison00](https://github.com/drobison00) +- Update gnn-fsi-model-card.md ([#1438](https://github.com/nv-morpheus/Morpheus/pull/1438)) [@drobison00](https://github.com/drobison00) +- Update phishing-model-card.md ([#1437](https://github.com/nv-morpheus/Morpheus/pull/1437)) [@drobison00](https://github.com/drobison00) +- Document incompatible mlflow models issue ([#1434](https://github.com/nv-morpheus/Morpheus/pull/1434)) [@dagardner-nv](https://github.com/dagardner-nv) + +## 🚀 New Features + +- Adding retry logic to the `TritonInferenceStage` to allow recovering from errors ([#1548](https://github.com/nv-morpheus/Morpheus/pull/1548)) [@cwharris](https://github.com/cwharris) +- Create a base mixin class for ingress & egress stages ([#1473](https://github.com/nv-morpheus/Morpheus/pull/1473)) [@dagardner-nv](https://github.com/dagardner-nv) +- RAPIDS 24.02 Upgrade ([#1468](https://github.com/nv-morpheus/Morpheus/pull/1468)) [@cwharris](https://github.com/cwharris) +- Install headers & morpheus-config.cmake ([#1448](https://github.com/nv-morpheus/Morpheus/pull/1448)) [@dagardner-nv](https://github.com/dagardner-nv) + +## 🛠️ Improvements + +- Updating `nlohman_json` to 3.11 to match MRC ([#1596](https://github.com/nv-morpheus/Morpheus/pull/1596)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- DOCA 2.6 from public repo ([#1588](https://github.com/nv-morpheus/Morpheus/pull/1588)) [@e-ago](https://github.com/e-ago) +- Support `ControlMessage` for `PreProcessNLPStage` `PreProcessFILStage` `AddScoreStageBase` ([#1573](https://github.com/nv-morpheus/Morpheus/pull/1573)) [@yuchenz427](https://github.com/yuchenz427) +- Update MLflow in Production DFP example to use Python 3.10 ([#1572](https://github.com/nv-morpheus/Morpheus/pull/1572)) [@efajardo-nv](https://github.com/efajardo-nv) +- Fix environment yaml paths ([#1551](https://github.com/nv-morpheus/Morpheus/pull/1551)) [@efajardo-nv](https://github.com/efajardo-nv) +- Add retry logic and proxy support to the NeMo LLM Service ([#1544](https://github.com/nv-morpheus/Morpheus/pull/1544)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- Update to match new MRC function sig for AsyncioRunnable::on_data ([#1541](https://github.com/nv-morpheus/Morpheus/pull/1541)) [@dagardner-nv](https://github.com/dagardner-nv) +- Expose max_retries parameter to OpenAIChatService & OpenAIChatClient ([#1536](https://github.com/nv-morpheus/Morpheus/pull/1536)) [@dagardner-nv](https://github.com/dagardner-nv) +- Upgrade `openai` version to 1.13 and `langchain` to version 0.1.9 ([#1529](https://github.com/nv-morpheus/Morpheus/pull/1529)) [@mdemoret-nv](https://github.com/mdemoret-nv) +- Update ops-bot.yaml ([#1528](https://github.com/nv-morpheus/Morpheus/pull/1528)) [@AyodeAwe](https://github.com/AyodeAwe) +- Add the ability to attach Tensor objects and timestamps to `ControlMessage` ([#1511](https://github.com/nv-morpheus/Morpheus/pull/1511)) [@drobison00](https://github.com/drobison00) +- Fix or silence warnings emitted during tests ([#1501](https://github.com/nv-morpheus/Morpheus/pull/1501)) [@dagardner-nv](https://github.com/dagardner-nv) +- Support ControlMessage output in the C++ impl of DeserializeStage ([#1478](https://github.com/nv-morpheus/Morpheus/pull/1478)) [@dagardner-nv](https://github.com/dagardner-nv) +- DOCA Source Stage improvements ([#1475](https://github.com/nv-morpheus/Morpheus/pull/1475)) [@e-ago](https://github.com/e-ago) +- Update copyright headers for 2024 ([#1474](https://github.com/nv-morpheus/Morpheus/pull/1474)) [@efajardo-nv](https://github.com/efajardo-nv) +- Add conda builds to CI ([#1466](https://github.com/nv-morpheus/Morpheus/pull/1466)) [@dagardner-nv](https://github.com/dagardner-nv) +- Grafana log monitoring and error alerting example ([#1463](https://github.com/nv-morpheus/Morpheus/pull/1463)) [@efajardo-nv](https://github.com/efajardo-nv) +- Misc Conda Improvements ([#1462](https://github.com/nv-morpheus/Morpheus/pull/1462)) [@dagardner-nv](https://github.com/dagardner-nv) +- Simplification of the streaming RAG ingest example to improve usability ([#1454](https://github.com/nv-morpheus/Morpheus/pull/1454)) [@drobison00](https://github.com/drobison00) +- Replace GPUtil with pynvml for benchmark reports ([#1451](https://github.com/nv-morpheus/Morpheus/pull/1451)) [@efajardo-nv](https://github.com/efajardo-nv) +- Misc test improvements ([#1447](https://github.com/nv-morpheus/Morpheus/pull/1447)) [@dagardner-nv](https://github.com/dagardner-nv) +- Add a --manual_seed flag to the CLI ([#1445](https://github.com/nv-morpheus/Morpheus/pull/1445)) [@dagardner-nv](https://github.com/dagardner-nv) +- Optionally skip ci based on a label in the pr ([#1444](https://github.com/nv-morpheus/Morpheus/pull/1444)) [@dagardner-nv](https://github.com/dagardner-nv) +- Refactor verification of optional dependencies ([#1443](https://github.com/nv-morpheus/Morpheus/pull/1443)) [@dagardner-nv](https://github.com/dagardner-nv) +- Use dependencies.yaml as source-of-truth for environment files. ([#1441](https://github.com/nv-morpheus/Morpheus/pull/1441)) [@cwharris](https://github.com/cwharris) +- Add mocked test & benchmark for LLM agents pipeline ([#1424](https://github.com/nv-morpheus/Morpheus/pull/1424)) [@dagardner-nv](https://github.com/dagardner-nv) +- Add benchmarks for stand-alone RAG & vdb upload pipelines ([#1421](https://github.com/nv-morpheus/Morpheus/pull/1421)) [@dagardner-nv](https://github.com/dagardner-nv) +- Add benchmark for completion pipeline ([#1414](https://github.com/nv-morpheus/Morpheus/pull/1414)) [@dagardner-nv](https://github.com/dagardner-nv) +- Decouple TritonInferenceStage from pipeline mode ([#1402](https://github.com/nv-morpheus/Morpheus/pull/1402)) [@dagardner-nv](https://github.com/dagardner-nv) + # Morpheus 23.11.01 (7 Dec 2023) ## 🐛 Bug Fixes diff --git a/CMakeLists.txt b/CMakeLists.txt index 7aeb7773fc..4b57d23b1b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2018-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2018-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,10 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. - cmake_minimum_required(VERSION 3.25 FATAL_ERROR) - list(APPEND CMAKE_MESSAGE_CONTEXT "morpheus") # Global options (Keep sorted!) @@ -38,23 +36,13 @@ option(MORPHEUS_USE_IWYU "Enable running include-what-you-use as part of the bui set(MORPHEUS_PY_INSTALL_DIR "${CMAKE_CURRENT_BINARY_DIR}/wheel" CACHE STRING "Location to install the python directory") -set(MORPHEUS_RAPIDS_VERSION "23.06" CACHE STRING "Sets default versions for RAPIDS libraries.") +set(MORPHEUS_RAPIDS_VERSION "24.02" CACHE STRING "Sets default versions for RAPIDS libraries.") set(MORPHEUS_CACHE_DIR "${CMAKE_SOURCE_DIR}/.cache" CACHE PATH "Directory to contain all CPM and CCache data") mark_as_advanced(MORPHEUS_CACHE_DIR) -set(CMAKE_CXX_STANDARD 20) -set(CMAKE_CXX_STANDARD_REQUIRED ON) -set(CMAKE_CXX_EXTENSIONS ON) -set(CMAKE_POSITION_INDEPENDENT_CODE TRUE) -set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) -set(CMAKE_INSTALL_RPATH "$ORIGIN") - -# Disable compile commands until after dependencies -set(CMAKE_EXPORT_COMPILE_COMMANDS OFF) - enable_testing() -if (MORPHEUS_USE_IWYU AND MORPHEUS_USE_CCACHE) +if(MORPHEUS_USE_IWYU AND MORPHEUS_USE_CCACHE) message(FATAL_ERROR "MORPHEUS_USE_IWYU and MORPHEUS_USE_CCACHE cannot be set simultaneously") endif() @@ -80,66 +68,88 @@ set(MORPHEUS_UTILS_RAPIDS_VERSION ${MORPHEUS_RAPIDS_VERSION} CACHE STRING "" FOR # Load morpheus utils and update CMake paths include(morpheus_utils/load) -# Configure project package manager morpheus_utils_initialize_package_manager( - MORPHEUS_USE_CONDA - BUILD_SHARED_LIBS + MORPHEUS_USE_CONDA + BUILD_SHARED_LIBS ) -# Configure CUDA architecture -# NOTE: This MUST occur before any 'project' calls because of rapids_cmake requirements. -if (DEFINED MORPHEUS_CUDA_ARCHITECTURES) +# Initialize CUDA +# This is a two-step process. We need to call morpheus_utils_initialize_cuda_arch which in turn calls +# rapids_cuda_init_architectures prior to calling project(). This is because rapids_cuda_init_architectures defines a +# `CMAKE_PROJECT__INCLUDE` hook which is invoked by the project() call. This hook is what allows us to +# set `CMAKE_CUDA_ARCHITECTURES=rapids` when performing a release build which will be expanded to the current list of +# supported architectures by our version of rapids. +# +# After the call to project() we can then call morpheus_utils_enable_cuda() which will set some CUDA+clang settings +# which can only be performed after calling project(), but which must be set prior to calling enable_language(CUDA) +if(DEFINED MORPHEUS_CUDA_ARCHITECTURES) set(CMAKE_CUDA_ARCHITECTURES "${MORPHEUS_CUDA_ARCHITECTURES}") endif() morpheus_utils_initialize_cuda_arch(morpheus) -# Set a default build type if none was specified -rapids_cmake_build_type(Release) # Project definition +# Note intentionally excluding CUDA from the LANGUAGES list allowing us to set some clang specific settings later when +# we call morpheus_utils_enable_cuda() project(morpheus - VERSION 23.11.00 - LANGUAGES C CXX CUDA) + VERSION 24.03.00 + LANGUAGES C CXX +) + +# This sets some clang specific settings for CUDA prior to calling enable_language(CUDA) +morpheus_utils_enable_cuda() rapids_cmake_write_version_file(${CMAKE_BINARY_DIR}/autogenerated/include/morpheus/version.hpp) -# Ccache configuration +# Set a default build type if none was specified +rapids_cmake_build_type(Release) + +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CXX_EXTENSIONS ON) +set(CMAKE_POSITION_INDEPENDENT_CODE TRUE) +set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) +set(CMAKE_INSTALL_RPATH "$ORIGIN") + +# Setup cache before dependencies +# Configure CCache if requested include(environment/init_ccache) +# Disable exporting compile commands for dependencies +set(CMAKE_EXPORT_COMPILE_COMMANDS OFF) + +# Create a custom target to allow preparing for style checks +add_custom_target(${PROJECT_NAME}_style_checks + COMMENT "Building dependencies for style checks" +) + # Configure all dependencies include(dependencies) -#################################### -# - Post dependencies setup -------- -morpheus_utils_compiler_set_defaults(MORPHEUS_USE_CLANG_TIDY) - -# Setup IWYU if enabled -include(environment/init_iwyu) +# Enable for all first party code +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # To make it easier for CI to find output files, set the default executable suffix to .x if not set if("${CMAKE_EXECUTABLE_SUFFIX}" STREQUAL "") set(CMAKE_EXECUTABLE_SUFFIX ".x") endif() -# Create a custom target to allow preparing for style checks -add_custom_target(${PROJECT_NAME}_style_checks - COMMENT "Building dependencies for style checks" -) - - -################################## -##### Morpheus Python Setup ###### -################################## +# ################################### +# - Post dependencies setup -------- +morpheus_utils_compiler_set_defaults(MORPHEUS_USE_CLANG_TIDY) -# Re-enable compile commands -set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +# Setup IWYU if enabled +include(environment/init_iwyu) +# ################################# +# #### Morpheus Python Setup ###### +# ################################# morpheus_utils_python_configure() # Include the main morpheus code morpheus_utils_create_python_package(morpheus - PROJECT_DIRECTORY "${CMAKE_SOURCE_DIR}" - SOURCE_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/morpheus" + PROJECT_DIRECTORY "${CMAKE_SOURCE_DIR}" + SOURCE_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/morpheus" ) add_subdirectory(morpheus) @@ -174,9 +184,9 @@ if(MORPHEUS_ENABLE_DEBUG_INFO) morpheus_utils_print_all_targets() morpheus_utils_print_target_properties( - TARGETS - morpheus morpheus._lib.llm - WRITE_TO_FILE + TARGETS + morpheus + WRITE_TO_FILE ) morpheus_utils_print_global_properties( diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index c82422f036..9406ac0c7e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,5 +1,5 @@ # Simple C++ Stage -> **Note**: The code for this guide can be found in the `examples/developer_guide/3_simple_cpp_stage` directory of the Morpheus repository. To build the C++ examples, pass `-DMORPHEUS_BUILD_EXAMPLES=ON` to CMake when building Morpheus. Users building Morpheus with the provided `scripts/compile.sh` script can do do by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable: -> ```bash -> CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh +## Building the Example +The code for this guide can be found in the `examples/developer_guide/3_simple_cpp_stage` directory of the Morpheus repository. There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` flag to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable: +```bash +CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh +``` + +The second method is to build the example as a standalone project. From the root of the Morpheus repo execute: +```bash +cd examples/developer_guide/3_simple_cpp_stage +./compile.sh +# Optionally install the package into the current python environment +pip install ./ +``` + +## Overview Morpheus offers the choice of writing pipeline stages in either Python or C++. For many use cases, a Python stage is perfectly fine. However, in the event that a Python stage becomes a bottleneck for the pipeline, then writing a C++ implementation for the stage becomes advantageous. The C++ implementations of Morpheus stages and messages utilize the [pybind11](https://pybind11.readthedocs.io/en/stable/index.html) library to provide Python bindings. So far we have been defining our stages in Python, the option of defining a C++ implementation is only available to stages implemented as classes. Many of the stages included with Morpheus have both a Python and a C++ implementation, and Morpheus will use the C++ implementations by default. You can explicitly disable the use of C++ stage implementations by calling `morpheus.config.CppConfig.set_should_use_cpp(False)`: @@ -275,7 +287,7 @@ The Python interface itself defines a Python module named `morpheus_example` and namespace py = pybind11; // Define the pybind11 module m. -PYBIND11_MODULE(morpheus_example, m) +PYBIND11_MODULE(pass_thru_cpp, m) { mrc::pymrc::import(m, "morpheus._lib.messages"); @@ -319,7 +331,7 @@ std::shared_ptr> PassThruStageInterfaceProxy namespace py = pybind11; // Define the pybind11 module m. -PYBIND11_MODULE(morpheus_example, m) +PYBIND11_MODULE(pass_thru_cpp, m) { mrc::pymrc::import(m, "morpheus._lib.messages"); @@ -353,10 +365,9 @@ As mentioned in the previous section, our `_build_single` method needs to be upd ```python def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: if self._build_cpp_node() and issubclass(self._input_type, MultiMessage): - from _lib import morpheus_example as morpheus_example_cpp + from ._lib import pass_thru_cpp - # pylint: disable=c-extension-no-member - node = morpheus_example_cpp.PassThruStage(builder, self.unique_name) + node = pass_thru_cpp.PassThruStage(builder, self.unique_name) else: node = builder.make_node(self.unique_name, ops.map(self.on_data)) @@ -408,9 +419,9 @@ class PassThruStage(PassThruTypeMixin, SinglePortStage): def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: if self._build_cpp_node() and issubclass(self._input_type, MultiMessage): - from _lib import morpheus_example as morpheus_example_cpp + from ._lib import pass_thru_cpp - node = morpheus_example_cpp.PassThruStage(builder, self.unique_name) + node = pass_thru_cpp.PassThruStage(builder, self.unique_name) else: node = builder.make_node(self.unique_name, ops.map(self.on_data)) @@ -420,10 +431,10 @@ class PassThruStage(PassThruTypeMixin, SinglePortStage): ## Testing the Stage To test the updated stage we will build a simple pipeline using the Morpheus command line tool. In order to illustrate the stage building a C++ node only when the input type is a `MultiMessage` we will insert the `pass-thru` stage in twice in the pipeline. In the first instance the input type will be `MessageMeta` and the stage will fallback to using a Python node, and in the second instance the input type will be a `MultiMessage` and the stage will build a C++ node. - + ```bash -PYTHONPATH="examples/developer_guide/3_simple_cpp_stage" \ -morpheus --log_level=debug --plugin "pass_thru" \ +PYTHONPATH="examples/developer_guide/3_simple_cpp_stage/src" \ +morpheus --log_level=debug --plugin "simple_cpp_stage.pass_thru" \ run pipeline-other \ from-file --filename=examples/data/email_with_addresses.jsonlines \ pass-thru \ @@ -432,5 +443,3 @@ morpheus --log_level=debug --plugin "pass_thru" \ pass-thru \ monitor ``` - -> **Note**: In the above example we set the `PYTHONPATH` environment variable this is to facilitate the relative import the stage performs of the `_lib` module. \ No newline at end of file diff --git a/docs/source/developer_guide/guides/4_source_cpp_stage.md b/docs/source/developer_guide/guides/4_source_cpp_stage.md index 58c0dbaf88..8bc17f1347 100644 --- a/docs/source/developer_guide/guides/4_source_cpp_stage.md +++ b/docs/source/developer_guide/guides/4_source_cpp_stage.md @@ -1,5 +1,5 @@ # Creating a C++ Source Stage -> **Note**: The code for this guide can be found in the `examples/developer_guide/4_rabbitmq_cpp_stage` directory of the Morpheus repository. To build the C++ examples, pass `-DMORPHEUS_BUILD_EXAMPLES=ON` to CMake when building Morpheus. Users building Morpheus with the provided `scripts/compile.sh` script can do do by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable: -> ```bash -> CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh +## Building the Example +The code for this guide can be found in the `examples/developer_guide/4_rabbitmq_cpp_stage` directory of the Morpheus repository. There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` flag to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable: +```bash +CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh +``` + +The second method is to build the example as a standalone project. From the root of the Morpheus repo execute: +```bash +cd examples/developer_guide/4_rabbitmq_cpp_stage +./compile.sh + +# Optionally install the package into the current python environment +pip install ./ +``` +## Overview For this example, we are going to add a C++ implementation for the `RabbitMQSourceStage` we designed in the Python examples. The Python implementation of this stage emits messages of the type `MessageMeta`; as such, our C++ implementation must do the same. For communicating with [RabbitMQ](https://www.rabbitmq.com/) we will be using the [SimpleAmqpClient](https://github.com/alanxz/SimpleAmqpClient) library, and [libcudf](https://docs.rapids.ai/api/libcudf/stable/index.html) for constructing the `DataFrame`. @@ -199,8 +211,8 @@ RabbitMQSourceStage::RabbitMQSourceStage(const std::string& host, const std::string& queue_name, std::chrono::milliseconds poll_interval) : PythonSource(build()), - m_channel{AmqpClient::Channel::Create(host)}, - m_poll_interval{poll_interval} + m_poll_interval{poll_interval}, + m_channel{AmqpClient::Channel::Create(host)} { m_channel->DeclareExchange(exchange, exchange_type); m_queue_name = m_channel->DeclareQueue(queue_name); @@ -316,7 +328,7 @@ std::shared_ptr> RabbitMQSourceStageIn namespace py = pybind11; // Define the pybind11 module m. -PYBIND11_MODULE(morpheus_rabbit, m) +PYBIND11_MODULE(rabbitmq_cpp_stage, m) { mrc::pymrc::import(m, "morpheus._lib.messages"); @@ -358,8 +370,8 @@ RabbitMQSourceStage::RabbitMQSourceStage(const std::string& host, const std::string& queue_name, std::chrono::milliseconds poll_interval) : PythonSource(build()), - m_channel{AmqpClient::Channel::Create(host)}, - m_poll_interval{poll_interval} + m_poll_interval{poll_interval}, + m_channel{AmqpClient::Channel::Create(host)} { m_channel->DeclareExchange(exchange, exchange_type); m_queue_name = m_channel->DeclareQueue(queue_name); @@ -444,7 +456,7 @@ std::shared_ptr> RabbitMQSourceStageIn namespace py = pybind11; // Define the pybind11 module m. -PYBIND11_MODULE(morpheus_rabbit, m) +PYBIND11_MODULE(rabbitmq_cpp_stage, m) { mrc::pymrc::import(m, "morpheus._lib.messages"); @@ -511,7 +523,7 @@ Lastly, our `_build_source` method needs to be updated to build a C++ node when ```python def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: if self._build_cpp_node(): - from _lib import morpheus_rabbit as morpheus_rabbit_cpp + from ._lib import rabbitmq_cpp_stage node = morpheus_rabbit_cpp.RabbitMQSourceStage(builder, self.unique_name, diff --git a/docs/source/developer_guide/guides/5_digital_fingerprinting.md b/docs/source/developer_guide/guides/5_digital_fingerprinting.md index da36f4c24f..cc283c984e 100644 --- a/docs/source/developer_guide/guides/5_digital_fingerprinting.md +++ b/docs/source/developer_guide/guides/5_digital_fingerprinting.md @@ -1,5 +1,5 @@ # Example RabbitMQ stages -This example builds upon the `examples/developer_guide/2_2_rabbitmq` example adding a C++ implementation for the `RabbitMQSourceStage`. +This example builds upon the `examples/developer_guide/2_2_rabbitmq` example adding a C++ implementation for the `RabbitMQSourceStage` along with adding package install scripts. This example adds two flags to the `read_simple.py` script. A `--use_cpp` flag which defaults to `True` and a `--num_threads` flag which defaults to the number of cores on the system as returned by `os.cpu_count()`. +## Installing Pika +The `RabbitMQSourceStage` and `WriteToRabbitMQStage` stages use the [pika](https://pika.readthedocs.io/en/stable/#) RabbitMQ client for Python. To install this into the current env run: +```bash +pip install -r examples/developer_guide/4_rabbitmq_cpp_stage/requirements.txt +``` + +## Building the Example +There are two ways to build the example. The first is to build the examples along with Morpheus by passing the `-DMORPHEUS_BUILD_EXAMPLES=ON` flag to cmake, for users using the `scripts/compile.sh` at the root of the Morpheus repo can do this by setting the `CMAKE_CONFIGURE_EXTRA_ARGS` environment variable: +```bash +CMAKE_CONFIGURE_EXTRA_ARGS="-DMORPHEUS_BUILD_EXAMPLES=ON" ./scripts/compile.sh +``` + +The second is to build the example as a standalone project. From the root of the Morpheus repo execute: +```bash +cd examples/developer_guide/4_rabbitmq_cpp_stage +./compile.sh + +# Optionally install the package into the current python environment +pip install ./ +``` + ## Testing with a RabbitMQ container Testing can be performed locally with the RabbitMQ supplied docker image from the [RabbitMQ container registry](https://registry.hub.docker.com/_/rabbitmq/): ```bash @@ -31,7 +52,7 @@ The image can be verified with the web management console by opening http://loca ## Launch the reader In a second terminal from the root of the Morpheus repo execute: ```bash -python examples/developer_guide/4_rabbitmq_cpp_stage/read_simple.py +python examples/developer_guide/4_rabbitmq_cpp_stage/src/read_simple.py ``` This will read from a RabbitMQ exchange named 'logs', and write the results to `/tmp/results.json`. @@ -41,7 +62,7 @@ If no exchange named 'logs' exists in RabbitMQ it will be created. ## Launch the writer In a third terminal from the root of the Morpheus repo execute: ```bash -python examples/developer_guide/4_rabbitmq_cpp_stage/write_simple.py +python examples/developer_guide/4_rabbitmq_cpp_stage/src/write_simple.py ``` This will read JSON data from the `examples/data/email.jsonlines` file and publish the data into the 'logs' RabbitMQ exchange as a single message. diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/Configure_SimpleAmqpClient.cmake b/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/Configure_SimpleAmqpClient.cmake index 7aa248e8e1..172f2a409f 100644 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/Configure_SimpleAmqpClient.cmake +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/Configure_SimpleAmqpClient.cmake @@ -1,5 +1,5 @@ #============================================================================= -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,10 +25,11 @@ function(find_and_configure_SimpleAmqpClient version) GLOBAL_TARGETS SimpleAmqpClient BUILD_EXPORT_SET - ${PROJECT_NAME}-exports + ${PROJECT_NAME}-core-exports INSTALL_EXPORT_SET - ${PROJECT_NAME}-exports + ${PROJECT_NAME}-core-exports CPM_ARGS + PATCH_COMMAND git checkout -- . && git apply --whitespace=fix ${CMAKE_CURRENT_FUNCTION_LIST_DIR}/patches/simpleamqpclient_cpp_version.patch GIT_REPOSITORY https://github.com/alanxz/SimpleAmqpClient GIT_TAG "v${version}" GIT_SHALLOW TRUE diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/Configure_rabbitmq.cmake b/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/Configure_rabbitmq.cmake index a1534f2e46..ecbcc2b0ed 100644 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/Configure_rabbitmq.cmake +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/Configure_rabbitmq.cmake @@ -1,5 +1,5 @@ #============================================================================= -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -26,9 +26,9 @@ function(find_and_configure_rabbitmq version) GLOBAL_TARGETS rabbitmq rabbitmq::rabbitmq BUILD_EXPORT_SET - ${PROJECT_NAME}-exports + ${PROJECT_NAME}-core-exports INSTALL_EXPORT_SET - ${PROJECT_NAME}-exports + ${PROJECT_NAME}-core-exports CPM_ARGS GIT_REPOSITORY https://github.com/alanxz/rabbitmq-c GIT_SHALLOW TRUE diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/dependencies.cmake b/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/dependencies.cmake index fa24f87b2e..56a692f7b4 100644 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/dependencies.cmake +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/dependencies.cmake @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,6 +15,8 @@ list(APPEND CMAKE_MESSAGE_CONTEXT "dep") +rapids_find_package(CUDAToolkit REQUIRED) + set(RABBITMQ_VERSION "0.12.0" CACHE STRING "Version of RabbitMQ-C to use") include(Configure_rabbitmq) diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/patches/simpleamqpclient_cpp_version.patch b/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/patches/simpleamqpclient_cpp_version.patch new file mode 100644 index 0000000000..0a94dee8eb --- /dev/null +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/cmake/patches/simpleamqpclient_cpp_version.patch @@ -0,0 +1,13 @@ +diff --git a/CMakeLists.txt b/CMakeLists.txt +index efefbfc..47f2040 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -7,7 +7,7 @@ cmake_minimum_required(VERSION 3.5) + + project(SimpleAmqpClient LANGUAGES CXX) + +-set(CMAKE_CXX_STANDARD 98) ++set(CMAKE_CXX_STANDARD 20) + set(CMAKE_CXX_STANDARD_REQUIRED ON) + set(CMAKE_CXX_EXTENSIONS OFF) + diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/compile.sh b/examples/developer_guide/4_rabbitmq_cpp_stage/compile.sh new file mode 100755 index 0000000000..41c4640a49 --- /dev/null +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/compile.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -x +set -e + +# Optionally can set INSTALL_PREFIX to build and install to a specific directory. Also causes cmake install to run +BUILD_DIR=${BUILD_DIR:-"build"} + +echo "Runing CMake configure..." +cmake -B ${BUILD_DIR} -GNinja \ + -DCMAKE_MESSAGE_CONTEXT_SHOW=ON \ + -DMORPHEUS_PYTHON_INPLACE_BUILD:BOOL=ON \ + -DMORPHEUS_PYTHON_PERFORM_INSTALL:BOOL=ON `# Ensure all of the libraries are installed` \ + ${CMAKE_CONFIGURE_EXTRA_ARGS:+CMAKE_CONFIGURE_EXTRA_ARGS} . + +echo "Running CMake build..." +cmake --build ${BUILD_DIR} -j "$@" diff --git a/models/training-tuning-scripts/fraud-detection-models/requirements.yml b/examples/developer_guide/4_rabbitmq_cpp_stage/pyproject.toml similarity index 51% rename from models/training-tuning-scripts/fraud-detection-models/requirements.yml rename to examples/developer_guide/4_rabbitmq_cpp_stage/pyproject.toml index 5714a666e7..ad85e7c003 100644 --- a/models/training-tuning-scripts/fraud-detection-models/requirements.yml +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/pyproject.toml @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,21 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -channels: - - rapidsai - - nvidia - - dglteam/label/cu118 - - pytorch - - conda-forge - - defaults -dependencies: - - click>=8 - - cuml=23.06 - - dgl - - jupyterlab - - pytorch-cuda=11.8 - - pytorch=2.0.1 - - scikit-learn=1.2.2 - - tqdm=4 - - torchmetrics - - xgboost +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools", "wheel", "versioneer[toml]==0.29"] + +[tool.versioneer] +# See the docstring in versioneer.py for instructions. Note that you must +# re-run 'versioneer setup' after changing this section, and commit the +# resulting files. +VCS = "git" +style = "pep440" +tag_prefix = "v" +versionfile_build = "src/rabbitmq_cpp_stage/_version.py" +versionfile_source = "src/rabbitmq_cpp_stage/_version.py" + +[tool.yapfignore] +ignore_patterns = [ + "**/_version.py", +] diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/setup.cfg b/examples/developer_guide/4_rabbitmq_cpp_stage/setup.cfg new file mode 100644 index 0000000000..9f6be7db1a --- /dev/null +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/setup.cfg @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[metadata] +name = rabbitmq_cpp_stage +version = attr: versioneer.get_version +description = Morpheus Example - RabbitMQ C++ Stage +author = NVIDIA Corporation +license = Apache +classifiers = + Intended Audience :: Developers + Programming Language :: Python + Programming Language :: Python :: 3.10 + +[options] +zip_safe = False +include_package_data = True +packages = find: +package_dir = + =src +python_requires = >=3.10 +install_requires = + pika ==1.2.0 + +[options.packages.find] +where = src diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/setup.py b/examples/developer_guide/4_rabbitmq_cpp_stage/setup.py new file mode 100644 index 0000000000..0e0fce75d5 --- /dev/null +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/setup.py @@ -0,0 +1,16 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from setuptools import setup + +setup() diff --git a/models/training-tuning-scripts/abp-models/requirements.yml b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/__init__.py similarity index 68% rename from models/training-tuning-scripts/abp-models/requirements.yml rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/__init__.py index 8bc65d6f54..acc168b4e5 100644 --- a/models/training-tuning-scripts/abp-models/requirements.yml +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/__init__.py @@ -1,11 +1,12 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -13,14 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -channels: - - rapidsai - - nvidia - - conda-forge - - defaults -dependencies: - - cuml=23.06 - - jupyterlab - - python=3.10 - - scikit-learn=1.2.2 - - xgboost +from . import _version + +__version__ = _version.get_versions()['version'] diff --git a/models/validation-inference-scripts/fraud-detection-models/requirements.yml b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/CMakeLists.txt similarity index 59% rename from models/validation-inference-scripts/fraud-detection-models/requirements.yml rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/CMakeLists.txt index c002d618a5..ba768f7a2a 100644 --- a/models/validation-inference-scripts/fraud-detection-models/requirements.yml +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/CMakeLists.txt @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,19 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -channels: - - rapidsai - - nvidia - - conda-forge - - defaults -dependencies: - - click==8.1.3 - - cuml=23.06 - - dgl==1.0.2+cu118 - - numpy==1.23.5 - - pandas==1.5.3 - - scikit_learn==1.2.2 - - torch==2.0.0+cu118 - - torchmetrics==0.11.4 - - tqdm==4.65.0 - - xgboost==1.7.1 +list(APPEND CMAKE_MESSAGE_CONTEXT "rabbitmq_cpp_stage") + +morpheus_add_pybind11_module(rabbitmq_cpp_stage + SOURCE_FILES + "rabbitmq_source.cpp" + INCLUDE_DIRS + $ + ${SimpleAmqpClient_SOURCE_DIR}/src + LINK_TARGETS + morpheus + rabbitmq + SimpleAmqpClient +) + +list(POP_BACK CMAKE_MESSAGE_CONTEXT) diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/__init__.py b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/__init__.py similarity index 100% rename from examples/developer_guide/4_rabbitmq_cpp_stage/__init__.py rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/__init__.py diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_cpp_stage/__init__.pyi b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_cpp_stage/__init__.pyi new file mode 100644 index 0000000000..93e02914b8 --- /dev/null +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_cpp_stage/__init__.pyi @@ -0,0 +1,15 @@ +from __future__ import annotations +import src.rabbitmq_cpp_stage._lib.rabbitmq_cpp_stage +import typing +import datetime +import morpheus._lib.messages +import mrc.core.segment + +__all__ = [ + "RabbitMQSourceStage" +] + + +class RabbitMQSourceStage(mrc.core.segment.SegmentObject): + def __init__(self, builder: mrc.core.segment.Builder, name: str, host: str, exchange: str, exchange_type: str = 'fanout', queue_name: str = '', poll_interval: datetime.timedelta = datetime.timedelta(microseconds=100000)) -> None: ... + pass diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/_lib/rabbitmq_source.cpp b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_source.cpp similarity index 88% rename from examples/developer_guide/4_rabbitmq_cpp_stage/_lib/rabbitmq_source.cpp rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_source.cpp index 4f0e6db7cc..a8a7d8e0cb 100644 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/_lib/rabbitmq_source.cpp +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_source.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,16 +17,24 @@ #include "rabbitmq_source.hpp" +#include +#include +#include #include -#include #include -#include // for timedelta->chrono conversions +#include +#include +#include // IWYU pragma: keep #include +#include #include #include #include // for std::this_thread::sleep_for -#include +#include + +// IWYU pragma: no_include +// IWYU pragma: no_include namespace morpheus_rabbit { @@ -36,8 +44,8 @@ RabbitMQSourceStage::RabbitMQSourceStage(const std::string& host, const std::string& queue_name, std::chrono::milliseconds poll_interval) : PythonSource(build()), - m_channel{AmqpClient::Channel::Create(host)}, - m_poll_interval{poll_interval} + m_poll_interval{poll_interval}, + m_channel{AmqpClient::Channel::Create(host)} { m_channel->DeclareExchange(exchange, exchange_type); m_queue_name = m_channel->DeclareQueue(queue_name); @@ -122,7 +130,7 @@ std::shared_ptr> RabbitMQSourceStageIn namespace py = pybind11; // Define the pybind11 module m. -PYBIND11_MODULE(morpheus_rabbit, m) +PYBIND11_MODULE(rabbitmq_cpp_stage, m) { mrc::pymrc::import(m, "morpheus._lib.messages"); diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/_lib/rabbitmq_source.hpp b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_source.hpp similarity index 83% rename from examples/developer_guide/4_rabbitmq_cpp_stage/_lib/rabbitmq_source.hpp rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_source.hpp index 6f838b69d8..c117bcc5e9 100644 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/_lib/rabbitmq_source.hpp +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_lib/rabbitmq_source.hpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,16 +17,22 @@ #pragma once -#include // for AmqpClient::Channel::ptr_t -#include // for cudf::io::table_with_metadata -#include // for MessageMeta -#include // for Segment Builder -#include // for Segment Object -#include // for mrc::pymrc::PythonSource +#include +#include // for cudf::io::table_with_metadata +#include // for MessageMeta +#include // for Segment Builder +#include // for Segment Object +#include // for mrc::pymrc::PythonSource +#include #include // for chrono::milliseconds #include // for shared_ptr #include +#include +#include + +// IWYU pragma: no_include "morpheus/objects/data_table.hpp" +// IWYU pragma: no_include namespace morpheus_rabbit { diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_version.py b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_version.py new file mode 100644 index 0000000000..a0573c335e --- /dev/null +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/_version.py @@ -0,0 +1,683 @@ + +# This file helps to compute a version number in source trees obtained from +# git-archive tarball (such as those provided by githubs download-from-tag +# feature). Distribution tarballs (built by setup.py sdist) and build +# directories (produced by setup.py build) will contain a much shorter file +# that just contains the computed version number. + +# This file is released into the public domain. +# Generated by versioneer-0.29 +# https://github.com/python-versioneer/python-versioneer + +"""Git implementation of _version.py.""" + +import errno +import os +import re +import subprocess +import sys +from typing import Any, Callable, Dict, List, Optional, Tuple +import functools + + +def get_keywords() -> Dict[str, str]: + """Get the keywords needed to look up the version information.""" + # these strings will be replaced by git during git-archive. + # setup.py/versioneer.py will grep for the variable names, so they must + # each be defined on a line of their own. _version.py will just call + # get_keywords(). + git_refnames = "$Format:%d$" + git_full = "$Format:%H$" + git_date = "$Format:%ci$" + keywords = {"refnames": git_refnames, "full": git_full, "date": git_date} + return keywords + + +class VersioneerConfig: + """Container for Versioneer configuration parameters.""" + + VCS: str + style: str + tag_prefix: str + parentdir_prefix: str + versionfile_source: str + verbose: bool + + +def get_config() -> VersioneerConfig: + """Create, populate and return the VersioneerConfig() object.""" + # these strings are filled in when 'setup.py versioneer' creates + # _version.py + cfg = VersioneerConfig() + cfg.VCS = "git" + cfg.style = "pep440" + cfg.tag_prefix = "v" + cfg.parentdir_prefix = "None" + cfg.versionfile_source = "src/rabbitmq_cpp_stage/_version.py" + cfg.verbose = False + return cfg + + +class NotThisMethod(Exception): + """Exception raised if a method is not valid for the current scenario.""" + + +LONG_VERSION_PY: Dict[str, str] = {} +HANDLERS: Dict[str, Dict[str, Callable]] = {} + + +def register_vcs_handler(vcs: str, method: str) -> Callable: # decorator + """Create decorator to mark a method as the handler of a VCS.""" + def decorate(f: Callable) -> Callable: + """Store f in HANDLERS[vcs][method].""" + if vcs not in HANDLERS: + HANDLERS[vcs] = {} + HANDLERS[vcs][method] = f + return f + return decorate + + +def run_command( + commands: List[str], + args: List[str], + cwd: Optional[str] = None, + verbose: bool = False, + hide_stderr: bool = False, + env: Optional[Dict[str, str]] = None, +) -> Tuple[Optional[str], Optional[int]]: + """Call the given command(s).""" + assert isinstance(commands, list) + process = None + + popen_kwargs: Dict[str, Any] = {} + if sys.platform == "win32": + # This hides the console window if pythonw.exe is used + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW + popen_kwargs["startupinfo"] = startupinfo + + for command in commands: + try: + dispcmd = str([command] + args) + # remember shell=False, so use git.cmd on windows, not just git + process = subprocess.Popen([command] + args, cwd=cwd, env=env, + stdout=subprocess.PIPE, + stderr=(subprocess.PIPE if hide_stderr + else None), **popen_kwargs) + break + except OSError as e: + if e.errno == errno.ENOENT: + continue + if verbose: + print("unable to run %s" % dispcmd) + print(e) + return None, None + else: + if verbose: + print("unable to find command, tried %s" % (commands,)) + return None, None + stdout = process.communicate()[0].strip().decode() + if process.returncode != 0: + if verbose: + print("unable to run %s (error)" % dispcmd) + print("stdout was %s" % stdout) + return None, process.returncode + return stdout, process.returncode + + +def versions_from_parentdir( + parentdir_prefix: str, + root: str, + verbose: bool, +) -> Dict[str, Any]: + """Try to determine the version from the parent directory name. + + Source tarballs conventionally unpack into a directory that includes both + the project name and a version string. We will also support searching up + two directory levels for an appropriately named parent directory + """ + rootdirs = [] + + for _ in range(3): + dirname = os.path.basename(root) + if dirname.startswith(parentdir_prefix): + return {"version": dirname[len(parentdir_prefix):], + "full-revisionid": None, + "dirty": False, "error": None, "date": None} + rootdirs.append(root) + root = os.path.dirname(root) # up a level + + if verbose: + print("Tried directories %s but none started with prefix %s" % + (str(rootdirs), parentdir_prefix)) + raise NotThisMethod("rootdir doesn't start with parentdir_prefix") + + +@register_vcs_handler("git", "get_keywords") +def git_get_keywords(versionfile_abs: str) -> Dict[str, str]: + """Extract version information from the given file.""" + # the code embedded in _version.py can just fetch the value of these + # keywords. When used from setup.py, we don't want to import _version.py, + # so we do it with a regexp instead. This function is not used from + # _version.py. + keywords: Dict[str, str] = {} + try: + with open(versionfile_abs, "r") as fobj: + for line in fobj: + if line.strip().startswith("git_refnames ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["refnames"] = mo.group(1) + if line.strip().startswith("git_full ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["full"] = mo.group(1) + if line.strip().startswith("git_date ="): + mo = re.search(r'=\s*"(.*)"', line) + if mo: + keywords["date"] = mo.group(1) + except OSError: + pass + return keywords + + +@register_vcs_handler("git", "keywords") +def git_versions_from_keywords( + keywords: Dict[str, str], + tag_prefix: str, + verbose: bool, +) -> Dict[str, Any]: + """Get version information from git keywords.""" + if "refnames" not in keywords: + raise NotThisMethod("Short version file found") + date = keywords.get("date") + if date is not None: + # Use only the last line. Previous lines may contain GPG signature + # information. + date = date.splitlines()[-1] + + # git-2.2.0 added "%cI", which expands to an ISO-8601 -compliant + # datestamp. However we prefer "%ci" (which expands to an "ISO-8601 + # -like" string, which we must then edit to make compliant), because + # it's been around since git-1.5.3, and it's too difficult to + # discover which version we're using, or to work around using an + # older one. + date = date.strip().replace(" ", "T", 1).replace(" ", "", 1) + refnames = keywords["refnames"].strip() + if refnames.startswith("$Format"): + if verbose: + print("keywords are unexpanded, not using") + raise NotThisMethod("unexpanded keywords, not a git-archive tarball") + refs = {r.strip() for r in refnames.strip("()").split(",")} + # starting in git-1.8.3, tags are listed as "tag: foo-1.0" instead of + # just "foo-1.0". If we see a "tag: " prefix, prefer those. + TAG = "tag: " + tags = {r[len(TAG):] for r in refs if r.startswith(TAG)} + if not tags: + # Either we're using git < 1.8.3, or there really are no tags. We use + # a heuristic: assume all version tags have a digit. The old git %d + # expansion behaves like git log --decorate=short and strips out the + # refs/heads/ and refs/tags/ prefixes that would let us distinguish + # between branches and tags. By ignoring refnames without digits, we + # filter out many common branch names like "release" and + # "stabilization", as well as "HEAD" and "master". + tags = {r for r in refs if re.search(r'\d', r)} + if verbose: + print("discarding '%s', no digits" % ",".join(refs - tags)) + if verbose: + print("likely tags: %s" % ",".join(sorted(tags))) + for ref in sorted(tags): + # sorting will prefer e.g. "2.0" over "2.0rc1" + if ref.startswith(tag_prefix): + r = ref[len(tag_prefix):] + # Filter out refs that exactly match prefix or that don't start + # with a number once the prefix is stripped (mostly a concern + # when prefix is '') + if not re.match(r'\d', r): + continue + if verbose: + print("picking %s" % r) + return {"version": r, + "full-revisionid": keywords["full"].strip(), + "dirty": False, "error": None, + "date": date} + # no suitable tags, so version is "0+unknown", but full hex is still there + if verbose: + print("no suitable tags, using unknown + full revision id") + return {"version": "0+unknown", + "full-revisionid": keywords["full"].strip(), + "dirty": False, "error": "no suitable tags", "date": None} + + +@register_vcs_handler("git", "pieces_from_vcs") +def git_pieces_from_vcs( + tag_prefix: str, + root: str, + verbose: bool, + runner: Callable = run_command +) -> Dict[str, Any]: + """Get version from 'git describe' in the root of the source tree. + + This only gets called if the git-archive 'subst' keywords were *not* + expanded, and _version.py hasn't already been rewritten with a short + version string, meaning we're inside a checked out source tree. + """ + GITS = ["git"] + if sys.platform == "win32": + GITS = ["git.cmd", "git.exe"] + + # GIT_DIR can interfere with correct operation of Versioneer. + # It may be intended to be passed to the Versioneer-versioned project, + # but that should not change where we get our version from. + env = os.environ.copy() + env.pop("GIT_DIR", None) + runner = functools.partial(runner, env=env) + + _, rc = runner(GITS, ["rev-parse", "--git-dir"], cwd=root, + hide_stderr=not verbose) + if rc != 0: + if verbose: + print("Directory %s not under git control" % root) + raise NotThisMethod("'git rev-parse --git-dir' returned error") + + # if there is a tag matching tag_prefix, this yields TAG-NUM-gHEX[-dirty] + # if there isn't one, this yields HEX[-dirty] (no NUM) + describe_out, rc = runner(GITS, [ + "describe", "--tags", "--dirty", "--always", "--long", + "--match", f"{tag_prefix}[[:digit:]]*" + ], cwd=root) + # --long was added in git-1.5.5 + if describe_out is None: + raise NotThisMethod("'git describe' failed") + describe_out = describe_out.strip() + full_out, rc = runner(GITS, ["rev-parse", "HEAD"], cwd=root) + if full_out is None: + raise NotThisMethod("'git rev-parse' failed") + full_out = full_out.strip() + + pieces: Dict[str, Any] = {} + pieces["long"] = full_out + pieces["short"] = full_out[:7] # maybe improved later + pieces["error"] = None + + branch_name, rc = runner(GITS, ["rev-parse", "--abbrev-ref", "HEAD"], + cwd=root) + # --abbrev-ref was added in git-1.6.3 + if rc != 0 or branch_name is None: + raise NotThisMethod("'git rev-parse --abbrev-ref' returned error") + branch_name = branch_name.strip() + + if branch_name == "HEAD": + # If we aren't exactly on a branch, pick a branch which represents + # the current commit. If all else fails, we are on a branchless + # commit. + branches, rc = runner(GITS, ["branch", "--contains"], cwd=root) + # --contains was added in git-1.5.4 + if rc != 0 or branches is None: + raise NotThisMethod("'git branch --contains' returned error") + branches = branches.split("\n") + + # Remove the first line if we're running detached + if "(" in branches[0]: + branches.pop(0) + + # Strip off the leading "* " from the list of branches. + branches = [branch[2:] for branch in branches] + if "master" in branches: + branch_name = "master" + elif not branches: + branch_name = None + else: + # Pick the first branch that is returned. Good or bad. + branch_name = branches[0] + + pieces["branch"] = branch_name + + # parse describe_out. It will be like TAG-NUM-gHEX[-dirty] or HEX[-dirty] + # TAG might have hyphens. + git_describe = describe_out + + # look for -dirty suffix + dirty = git_describe.endswith("-dirty") + pieces["dirty"] = dirty + if dirty: + git_describe = git_describe[:git_describe.rindex("-dirty")] + + # now we have TAG-NUM-gHEX or HEX + + if "-" in git_describe: + # TAG-NUM-gHEX + mo = re.search(r'^(.+)-(\d+)-g([0-9a-f]+)$', git_describe) + if not mo: + # unparsable. Maybe git-describe is misbehaving? + pieces["error"] = ("unable to parse git-describe output: '%s'" + % describe_out) + return pieces + + # tag + full_tag = mo.group(1) + if not full_tag.startswith(tag_prefix): + if verbose: + fmt = "tag '%s' doesn't start with prefix '%s'" + print(fmt % (full_tag, tag_prefix)) + pieces["error"] = ("tag '%s' doesn't start with prefix '%s'" + % (full_tag, tag_prefix)) + return pieces + pieces["closest-tag"] = full_tag[len(tag_prefix):] + + # distance: number of commits since tag + pieces["distance"] = int(mo.group(2)) + + # commit: short hex revision ID + pieces["short"] = mo.group(3) + + else: + # HEX: no tags + pieces["closest-tag"] = None + out, rc = runner(GITS, ["rev-list", "HEAD", "--left-right"], cwd=root) + pieces["distance"] = len(out.split()) # total number of commits + + # commit date: see ISO-8601 comment in git_versions_from_keywords() + date = runner(GITS, ["show", "-s", "--format=%ci", "HEAD"], cwd=root)[0].strip() + # Use only the last line. Previous lines may contain GPG signature + # information. + date = date.splitlines()[-1] + pieces["date"] = date.strip().replace(" ", "T", 1).replace(" ", "", 1) + + return pieces + + +def plus_or_dot(pieces: Dict[str, Any]) -> str: + """Return a + if we don't already have one, else return a .""" + if "+" in pieces.get("closest-tag", ""): + return "." + return "+" + + +def render_pep440(pieces: Dict[str, Any]) -> str: + """Build up version string, with post-release "local version identifier". + + Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you + get a tagged build and then dirty it, you'll get TAG+0.gHEX.dirty + + Exceptions: + 1: no tags. git_describe was just HEX. 0+untagged.DISTANCE.gHEX[.dirty] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += plus_or_dot(pieces) + rendered += "%d.g%s" % (pieces["distance"], pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + else: + # exception #1 + rendered = "0+untagged.%d.g%s" % (pieces["distance"], + pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + return rendered + + +def render_pep440_branch(pieces: Dict[str, Any]) -> str: + """TAG[[.dev0]+DISTANCE.gHEX[.dirty]] . + + The ".dev0" means not master branch. Note that .dev0 sorts backwards + (a feature branch will appear "older" than the master branch). + + Exceptions: + 1: no tags. 0[.dev0]+untagged.DISTANCE.gHEX[.dirty] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + if pieces["branch"] != "master": + rendered += ".dev0" + rendered += plus_or_dot(pieces) + rendered += "%d.g%s" % (pieces["distance"], pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + else: + # exception #1 + rendered = "0" + if pieces["branch"] != "master": + rendered += ".dev0" + rendered += "+untagged.%d.g%s" % (pieces["distance"], + pieces["short"]) + if pieces["dirty"]: + rendered += ".dirty" + return rendered + + +def pep440_split_post(ver: str) -> Tuple[str, Optional[int]]: + """Split pep440 version string at the post-release segment. + + Returns the release segments before the post-release and the + post-release version number (or -1 if no post-release segment is present). + """ + vc = str.split(ver, ".post") + return vc[0], int(vc[1] or 0) if len(vc) == 2 else None + + +def render_pep440_pre(pieces: Dict[str, Any]) -> str: + """TAG[.postN.devDISTANCE] -- No -dirty. + + Exceptions: + 1: no tags. 0.post0.devDISTANCE + """ + if pieces["closest-tag"]: + if pieces["distance"]: + # update the post release segment + tag_version, post_version = pep440_split_post(pieces["closest-tag"]) + rendered = tag_version + if post_version is not None: + rendered += ".post%d.dev%d" % (post_version + 1, pieces["distance"]) + else: + rendered += ".post0.dev%d" % (pieces["distance"]) + else: + # no commits, use the tag as the version + rendered = pieces["closest-tag"] + else: + # exception #1 + rendered = "0.post0.dev%d" % pieces["distance"] + return rendered + + +def render_pep440_post(pieces: Dict[str, Any]) -> str: + """TAG[.postDISTANCE[.dev0]+gHEX] . + + The ".dev0" means dirty. Note that .dev0 sorts backwards + (a dirty tree will appear "older" than the corresponding clean one), + but you shouldn't be releasing software with -dirty anyways. + + Exceptions: + 1: no tags. 0.postDISTANCE[.dev0] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + rendered += plus_or_dot(pieces) + rendered += "g%s" % pieces["short"] + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + rendered += "+g%s" % pieces["short"] + return rendered + + +def render_pep440_post_branch(pieces: Dict[str, Any]) -> str: + """TAG[.postDISTANCE[.dev0]+gHEX[.dirty]] . + + The ".dev0" means not master branch. + + Exceptions: + 1: no tags. 0.postDISTANCE[.dev0]+gHEX[.dirty] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["branch"] != "master": + rendered += ".dev0" + rendered += plus_or_dot(pieces) + rendered += "g%s" % pieces["short"] + if pieces["dirty"]: + rendered += ".dirty" + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["branch"] != "master": + rendered += ".dev0" + rendered += "+g%s" % pieces["short"] + if pieces["dirty"]: + rendered += ".dirty" + return rendered + + +def render_pep440_old(pieces: Dict[str, Any]) -> str: + """TAG[.postDISTANCE[.dev0]] . + + The ".dev0" means dirty. + + Exceptions: + 1: no tags. 0.postDISTANCE[.dev0] + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"] or pieces["dirty"]: + rendered += ".post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + else: + # exception #1 + rendered = "0.post%d" % pieces["distance"] + if pieces["dirty"]: + rendered += ".dev0" + return rendered + + +def render_git_describe(pieces: Dict[str, Any]) -> str: + """TAG[-DISTANCE-gHEX][-dirty]. + + Like 'git describe --tags --dirty --always'. + + Exceptions: + 1: no tags. HEX[-dirty] (note: no 'g' prefix) + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + if pieces["distance"]: + rendered += "-%d-g%s" % (pieces["distance"], pieces["short"]) + else: + # exception #1 + rendered = pieces["short"] + if pieces["dirty"]: + rendered += "-dirty" + return rendered + + +def render_git_describe_long(pieces: Dict[str, Any]) -> str: + """TAG-DISTANCE-gHEX[-dirty]. + + Like 'git describe --tags --dirty --always -long'. + The distance/hash is unconditional. + + Exceptions: + 1: no tags. HEX[-dirty] (note: no 'g' prefix) + """ + if pieces["closest-tag"]: + rendered = pieces["closest-tag"] + rendered += "-%d-g%s" % (pieces["distance"], pieces["short"]) + else: + # exception #1 + rendered = pieces["short"] + if pieces["dirty"]: + rendered += "-dirty" + return rendered + + +def render(pieces: Dict[str, Any], style: str) -> Dict[str, Any]: + """Render the given version pieces into the requested style.""" + if pieces["error"]: + return {"version": "unknown", + "full-revisionid": pieces.get("long"), + "dirty": None, + "error": pieces["error"], + "date": None} + + if not style or style == "default": + style = "pep440" # the default + + if style == "pep440": + rendered = render_pep440(pieces) + elif style == "pep440-branch": + rendered = render_pep440_branch(pieces) + elif style == "pep440-pre": + rendered = render_pep440_pre(pieces) + elif style == "pep440-post": + rendered = render_pep440_post(pieces) + elif style == "pep440-post-branch": + rendered = render_pep440_post_branch(pieces) + elif style == "pep440-old": + rendered = render_pep440_old(pieces) + elif style == "git-describe": + rendered = render_git_describe(pieces) + elif style == "git-describe-long": + rendered = render_git_describe_long(pieces) + else: + raise ValueError("unknown style '%s'" % style) + + return {"version": rendered, "full-revisionid": pieces["long"], + "dirty": pieces["dirty"], "error": None, + "date": pieces.get("date")} + + +def get_versions() -> Dict[str, Any]: + """Get version information or return default if unable to do so.""" + # I am in _version.py, which lives at ROOT/VERSIONFILE_SOURCE. If we have + # __file__, we can work backwards from there to the root. Some + # py2exe/bbfreeze/non-CPython implementations don't do __file__, in which + # case we can only use expanded keywords. + + cfg = get_config() + verbose = cfg.verbose + + try: + return git_versions_from_keywords(get_keywords(), cfg.tag_prefix, + verbose) + except NotThisMethod: + pass + + try: + root = os.path.realpath(__file__) + # versionfile_source is the relative path from the top of the source + # tree (where the .git directory might live) to this file. Invert + # this to find the root from __file__. + for _ in cfg.versionfile_source.split('/'): + root = os.path.dirname(root) + except NameError: + return {"version": "0+unknown", "full-revisionid": None, + "dirty": None, + "error": "unable to find root of source tree", + "date": None} + + try: + pieces = git_pieces_from_vcs(cfg.tag_prefix, root, verbose) + return render(pieces, cfg.style) + except NotThisMethod: + pass + + try: + if cfg.parentdir_prefix: + return versions_from_parentdir(cfg.parentdir_prefix, root, verbose) + except NotThisMethod: + pass + + return {"version": "0+unknown", "full-revisionid": None, + "dirty": None, + "error": "unable to compute version", "date": None} diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/rabbitmq_source_stage.py b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py similarity index 85% rename from examples/developer_guide/4_rabbitmq_cpp_stage/rabbitmq_source_stage.py rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py index 9557e4f6da..4516f7d87b 100755 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/rabbitmq_source_stage.py +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -87,16 +87,15 @@ def compute_schema(self, schema: StageSchema): def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: if self._build_cpp_node(): - # pylint: disable=c-extension-no-member,no-name-in-module - from _lib import morpheus_rabbit as morpheus_rabbit_cpp - - node = morpheus_rabbit_cpp.RabbitMQSourceStage(builder, - self.unique_name, - self._host, - self._exchange, - self._exchange_type, - self._queue_name, - self._poll_interval.to_pytimedelta()) + from ._lib import rabbitmq_cpp_stage + + node = rabbitmq_cpp_stage.RabbitMQSourceStage(builder, + self.unique_name, + self._host, + self._exchange, + self._exchange_type, + self._queue_name, + self._poll_interval.to_pytimedelta()) else: self.connect() node = builder.make_source(self.unique_name, self.source_generator) diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/write_to_rabbitmq_stage.py b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/write_to_rabbitmq_stage.py similarity index 97% rename from examples/developer_guide/4_rabbitmq_cpp_stage/write_to_rabbitmq_stage.py rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/write_to_rabbitmq_stage.py index 3516580bda..401d8b785e 100644 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/write_to_rabbitmq_stage.py +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/write_to_rabbitmq_stage.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/read_simple.py b/examples/developer_guide/4_rabbitmq_cpp_stage/src/read_simple.py similarity index 94% rename from examples/developer_guide/4_rabbitmq_cpp_stage/read_simple.py rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/read_simple.py index 7581cf38b1..1edec18e75 100755 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/read_simple.py +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/read_simple.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ import os import click -from rabbitmq_source_stage import RabbitMQSourceStage +from rabbitmq_cpp_stage.rabbitmq_source_stage import RabbitMQSourceStage from morpheus.common import FileTypes from morpheus.config import Config diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/write_simple.py b/examples/developer_guide/4_rabbitmq_cpp_stage/src/write_simple.py similarity index 92% rename from examples/developer_guide/4_rabbitmq_cpp_stage/write_simple.py rename to examples/developer_guide/4_rabbitmq_cpp_stage/src/write_simple.py index 239f205350..9cac7d7331 100755 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/write_simple.py +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/write_simple.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ import logging import os -from write_to_rabbitmq_stage import WriteToRabbitMQStage +from rabbitmq_cpp_stage.write_to_rabbitmq_stage import WriteToRabbitMQStage from morpheus.config import Config from morpheus.pipeline import LinearPipeline diff --git a/examples/developer_guide/7_python_modules/my_compound_module_consumer_stage.py b/examples/developer_guide/7_python_modules/my_compound_module_consumer_stage.py index c2daf0b358..ed9e575240 100644 --- a/examples/developer_guide/7_python_modules/my_compound_module_consumer_stage.py +++ b/examples/developer_guide/7_python_modules/my_compound_module_consumer_stage.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/developer_guide/7_python_modules/my_test_compound_module.py b/examples/developer_guide/7_python_modules/my_test_compound_module.py index 89ead0159b..a9a5bf66b5 100644 --- a/examples/developer_guide/7_python_modules/my_test_compound_module.py +++ b/examples/developer_guide/7_python_modules/my_test_compound_module.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/developer_guide/7_python_modules/my_test_module.py b/examples/developer_guide/7_python_modules/my_test_module.py index e52acd5d4c..36c2e83f6a 100644 --- a/examples/developer_guide/7_python_modules/my_test_module.py +++ b/examples/developer_guide/7_python_modules/my_test_module.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/developer_guide/7_python_modules/my_test_module_consumer.py b/examples/developer_guide/7_python_modules/my_test_module_consumer.py index fd8126c4bb..b3dca2bbf4 100644 --- a/examples/developer_guide/7_python_modules/my_test_module_consumer.py +++ b/examples/developer_guide/7_python_modules/my_test_module_consumer.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/developer_guide/7_python_modules/my_test_module_consumer_stage.py b/examples/developer_guide/7_python_modules/my_test_module_consumer_stage.py index b2897628c3..998026f5ed 100644 --- a/examples/developer_guide/7_python_modules/my_test_module_consumer_stage.py +++ b/examples/developer_guide/7_python_modules/my_test_module_consumer_stage.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/developer_guide/7_python_modules/run.py b/examples/developer_guide/7_python_modules/run.py index a9df420367..7defcba876 100644 --- a/examples/developer_guide/7_python_modules/run.py +++ b/examples/developer_guide/7_python_modules/run.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/examples/developer_guide/CMakeLists.txt b/examples/developer_guide/CMakeLists.txt new file mode 100644 index 0000000000..16220563c7 --- /dev/null +++ b/examples/developer_guide/CMakeLists.txt @@ -0,0 +1,19 @@ +# ============================================================================= +# Copyright (c) 2020-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= +list(APPEND CMAKE_MESSAGE_CONTEXT "developer_guide") + +add_subdirectory(3_simple_cpp_stage) +add_subdirectory(4_rabbitmq_cpp_stage) + +list(POP_BACK CMAKE_MESSAGE_CONTEXT) diff --git a/examples/digital_fingerprinting/README.md b/examples/digital_fingerprinting/README.md index 67283cfe21..d9296eb5f6 100644 --- a/examples/digital_fingerprinting/README.md +++ b/examples/digital_fingerprinting/README.md @@ -1,5 +1,5 @@ -# Grafana DFP Dashboard Example +# Using Grafana with Morpheus DFP Pipeline -This example demonstrates how to use [Grafana](https://grafana.com/grafana/) to visualize the inference results from the [Azure DFP pipeline example](../production/README.md). +This example builds on the [Azure DFP pipeline example](../production/README.md) to demonstrate how [Grafana](https://grafana.com/grafana/) can be used for log monitoring, error alerting, and inference results visualization. ## Grafana Configuration -### CSV data source plugin +The data sources and dashboards in this example are managed using config files. [Grafana's provisioning system](https://grafana.com/docs/grafana/latest/administration/provisioning/) then uses these files to add the data sources and dashboards to Grafana upon startup. -The [CSV data source plugin](https://grafana.com/grafana/plugins/marcusolsson-csv-datasource/) is installed to Grafana to read the Azure inference results CSV file. This example assumes we are using the CSV file generated from running the Python script for [Azure DFP pipeline example](../production/README.md). +### Data Sources -If using the [notebook version](../production/morpheus/notebooks/dfp_azure_inference.ipynb) to run inference, you'll need to update the `url` in [datasources.yaml](./datasources/datasources.yaml) as follows: -``` -url: /workspace/notebooks/dfp_detections_azure.csv -``` +Grafana includes built-in support for many data sources. There are also several data sources available that can be installed as plugins. More information about how to manage Grafana data sources can be found [here](https://grafana.com/docs/grafana/latest/datasources/). + +The following data sources for this example are configured in [datasources.yaml](./datasources/datasources.yaml): + +#### Loki data source + +[Loki](https://grafana.com/docs/loki/latest/) is Grafana's log aggregation system. The Loki service is started automatically when the Grafana service starts up. The [Python script for running the DFP pipeline](./run.py) has been updated to configure a logging handler that sends the Morpheus logs to the Loki service. + +#### CSV data source plugin + +The [CSV data source plugin](https://grafana.com/grafana/plugins/marcusolsson-csv-datasource/) is installed to Grafana to read the Azure inference results CSV file. This example assumes we are using the CSV file generated from running the Python script for [Azure DFP pipeline example](../production/README.md). Please note that the use of the CSV plugin is for demonstration purposes only. Grafana includes support for many data sources more suitable for production deployments. See [here](https://grafana.com/docs/grafana/latest/datasources/) for more information. -### Updates to grafana.ini +#### Updates to grafana.ini The following is added to the default `grafana.ini` to enable local mode for CSV data source plugin. This allows the CSV data source plugin to access files on local file system. @@ -40,14 +47,24 @@ The following is added to the default `grafana.ini` to enable local mode for CSV allow_local_mode = true ``` -## Run Azure Production DFP Training and Inference Examples +## Add Loki logging handler to DFP pipeline -### Start Morpheus DFP pipeline container +The [pipeline run script](./run.py) for the Azure DFP example has been updated with the following to add the Loki logging handler which will publish the Morpheus logs to our Loki service: -The following steps are taken from [Azure DFP pipeline example](../production/README.md). Run the followng commands to start the Morpheus container: +``` +loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", +) -Build the Morpheus container: +configure_logging(loki_handler, log_level=log_level) +``` + +More information about Loki Python logging can be found [here](https://pypi.org/project/python-logging-loki/). +## Build the Morpheus container: +From the root of the Morpheus repo: ```bash ./docker/build_container_release.sh ``` @@ -60,45 +77,96 @@ export MORPHEUS_CONTAINER_VERSION="$(git describe --tags --abbrev=0)-runtime" docker compose build ``` -Create `bash` shell in `morpheus_pipeline` container: +## Start Grafana and Loki services: +To start Grafana and Loki, run the following command on host in `examples/digital_fingerprinting/production`: ```bash -docker compose run morpheus_pipeline bash +docker compose up grafana ``` -### Run Azure training pipeline +## Run Azure DFP Training -Run the following in the container to train Azure models. +Create `bash` shell in `morpheus_pipeline` container: ```bash -python dfp_azure_pipeline.py --log_level INFO --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" +docker compose run --rm morpheus_pipeline bash ``` -### Run Azure inference pipeline: - -Run the inference pipeline with `filter_threshold=0.0`. This will disable the filtering of the inference results. +Set `PYTHONPATH` environment variable to allow import of production DFP Morpheus stages: +``` +export PYTHONPATH=/workspace/examples/digital_fingerprinting/production/morpheus +``` +Run the following in the container to train the Azure models. ```bash -python dfp_azure_pipeline.py --log_level INFO --train_users none --start_time "2022-08-30" --input_file="../../../data/dfp/azure-inference-data/*.json" --filter_threshold=0.0 +cd /workspace/examples/digital_fingerprinting/production/grafana +python run.py --log_level DEBUG --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" ``` -The inference results will be saved to `dfp_detection_azure.csv` in the directory where script was run. +## View DFP Logs Dashboard in Grafana + +While the training pipeline is running, you can view Morpheus logs live in a Grafana dashboard at http://localhost:3000/dashboards. + +Click on `DFP Logs` in the `General` folder. You may need to expand the `General` folder to see the link. + + + +This dashboard was provisioned using config files but can also be manually created with the following steps: +1. Click `Dashboards` in the left-side menu. +2. Click `New` and select `New Dashboard`. +3. On the empty dashboard, click `+ Add visualization`. +4. In the dialog box that opens, Select the `Loki` data source. +5. In the `Edit Panel` view, change from `Time Series` visualization to `Logs`. +6. Add label filter: `app = morpheus`. +7. Change Order to `Oldest first`. +8. Click `Apply` to see your changes applied to the dashboard. Then click the save icon in the dashboard header. -## Run Grafana Docker Image +## Set up Error Alerting -To start Grafana, run the following command on host in `examples/digital_fingerprinting/production`: +We demonstrate here with a simple example how we can use Grafana Alerting to notify us of a pipeline error moments after it occurs. This is especially useful with long-running pipelines. +1. Click `Alert Rules` under `Alerting` in the left-side menu. +2. Click `New Alert Rule` +3. Enter alert rule name: `DFP Error Alert Rule` +4. In `Define query and alert condition` section, select `Loki` data source. +5. Switch to `Code` view by clicking the `Code` button on the right. +6. Enter the following Loki Query which counts the number of log lines in last minute that have an error label (`severity=error`): ``` -docker compose up grafana +count_over_time({severity="error"}[1m]) +``` +7. Under `Expressions`, keep default configurations for `Reduce` and `Threshold`. The alert condition threshold will be error counts > 0. +7. In `Set evaluation behavior` section, click `+ New folder`, enter `morpheus` then click `Create` button. +8. Click `+ New evaluation group`, enter `dfp` for `Evaluation group name` and `1m` for `Evaluation interval`, then click `Create` button. +9. Enter `0s` for `Pending period`. This configures alerts to be fired instantly when alert condition is met. +10. Test your alert rule, by running the following in your `morpheus_pipeline` container. This will cause an error because `--input-file` glob will no longer match any of our training data files. +``` +python run.py --log_level DEBUG --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" ``` +11. Click the `Preview` button to test run the alert rule. You should now see how our alert query picks up the error log, processes it through our reduce/threshold expressions and satisfies our alert condition. This is indicated by the `Firing` label in the `Threshold` section. + + + +12. Finally, click `Save rule and exit` at top right of the page. + +By default, all alerts will be sent through the `grafana-default-email` contact point. You can add email addresses to this contact point by clicking on `Contact points` under `Alerting` in the left-side menu. You would also have to configure SMTP in the `[smtp]` section of your `grafana.ini`. More information about about Grafana Alerting contact points can found [here](https://grafana.com/docs/grafana/latest/alerting/fundamentals/contact-points/). + +## Run Azure DFP Inference: + +Run the inference pipeline with `filter_threshold=0.0`. This will disable the filtering of the inference results. + +```bash +python run.py --log_level DEBUG --train_users none --start_time "2022-08-30" --input_file="../../../data/dfp/azure-inference-data/*.json" --filter_threshold=0.0 +``` + +The inference results will be saved to `dfp_detection_azure.csv` in the directory where script was run. -## View DFP Dashboard +## View DFP Detections Dashboard in Grafana -Our Grafana DFP dashboard can now be accessed via web browser at http://localhost:3000/dashboards. +When the inference pipeline completes, you can view visualizations of the inference results at http://localhost:3000/dashboards. -Click on `DFP_Dashboard` in the `General` folder. You may need to expand the `General` folder to see the link. +Click on `DFP Detections` in the `General` folder. You may need to expand the `General` folder to see the link. - + The dashboard has the following visualization panels: diff --git a/examples/digital_fingerprinting/production/grafana/config/dashboards.yaml b/examples/digital_fingerprinting/production/grafana/config/dashboards.yaml index d82167ce4d..ac6563bfee 100644 --- a/examples/digital_fingerprinting/production/grafana/config/dashboards.yaml +++ b/examples/digital_fingerprinting/production/grafana/config/dashboards.yaml @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/examples/digital_fingerprinting/production/grafana/config/grafana.ini b/examples/digital_fingerprinting/production/grafana/config/grafana.ini index 6b30172ff5..97df7a8bba 100644 --- a/examples/digital_fingerprinting/production/grafana/config/grafana.ini +++ b/examples/digital_fingerprinting/production/grafana/config/grafana.ini @@ -379,7 +379,7 @@ ;token_rotation_interval_minutes = 10 # Set to true to disable (hide) the login form, useful if you use OAuth, defaults to false -disable_login_form = true +;disable_login_form = true # Set to true to disable the sign out link in the side menu. Useful if you use auth.proxy or auth.jwt, defaults to false ;disable_signout_menu = false diff --git a/examples/digital_fingerprinting/production/grafana/config/loki-config.yml b/examples/digital_fingerprinting/production/grafana/config/loki-config.yml new file mode 100644 index 0000000000..77cfa39956 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/config/loki-config.yml @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +auth_enabled: false + +server: + http_listen_port: 3100 + grpc_listen_port: 9096 + +common: + instance_addr: 127.0.0.1 + path_prefix: /tmp/loki + storage: + filesystem: + chunks_directory: /tmp/loki/chunks + rules_directory: /tmp/loki/rules + replication_factor: 1 + ring: + kvstore: + store: inmemory + +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + max_size_mb: 100 + +schema_config: + configs: + - from: 2020-10-24 + store: boltdb-shipper + object_store: filesystem + schema: v11 + index: + prefix: index_ + period: 24h + +ruler: + alertmanager_url: http://localhost:9093 + +# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration +# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/ +# +# Statistics help us better understand how Loki is used, and they show us performance +# levels for most users. This helps us prioritize features and documentation. +# For more information on what's sent, look at +# https://github.com/grafana/loki/blob/main/pkg/usagestats/stats.go +# Refer to the buildReport method to see what goes into a report. +# +# If you would like to disable reporting, uncomment the following lines: +#analytics: +# reporting_enabled: false diff --git a/examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json similarity index 99% rename from examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json rename to examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json index f80780a381..9167ecaca5 100644 --- a/examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json +++ b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json @@ -557,7 +557,7 @@ }, "timepicker": {}, "timezone": "", - "title": "DFP_Dashboard", + "title": "DFP Detections", "uid": "f810d98f-bf31-42d4-98aa-9eb3fa187184", "version": 1, "weekStart": "" diff --git a/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json new file mode 100644 index 0000000000..c4ed0448c9 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json @@ -0,0 +1,78 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "gridPos": { + "h": 18, + "w": 23, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "dedupStrategy": "none", + "enableLogDetails": true, + "prettifyLogMessage": false, + "showCommonLabels": false, + "showLabels": false, + "showTime": false, + "sortOrder": "Ascending", + "wrapLogMessage": false + }, + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "editorMode": "builder", + "expr": "{app=\"morpheus\"} |= ``", + "queryType": "range", + "refId": "A" + } + ], + "type": "logs" + } + ], + "refresh": "5s", + "schemaVersion": 38, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "DFP Logs", + "uid": "dfb4fe34-daae-4894-9ff0-b8f89b7d256e", + "version": 1, + "weekStart": "" +} \ No newline at end of file diff --git a/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml b/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml index 6489932465..edcebabb8a 100644 --- a/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml +++ b/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,6 +16,12 @@ apiVersion: 1 datasources: + - name: Loki + type: loki + access: proxy + url: http://loki:3100 + jsonData: + maxLines: 1000 - name: csv-datasource uid: 1257c93b-f998-438c-a784-7e90fb94fb36 type: marcusolsson-csv-datasource diff --git a/examples/digital_fingerprinting/production/grafana/img/screenshot.png b/examples/digital_fingerprinting/production/grafana/img/dfp_detections_dashboard.png similarity index 100% rename from examples/digital_fingerprinting/production/grafana/img/screenshot.png rename to examples/digital_fingerprinting/production/grafana/img/dfp_detections_dashboard.png diff --git a/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png b/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png new file mode 100644 index 0000000000..516249f41a --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9ada0a466cfeb34363623cf78d478e17737a1d439b43bec989c7b026749a0fe2 +size 474978 diff --git a/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png b/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png new file mode 100644 index 0000000000..8cec30b668 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:753f199658371c462e6cbdbc324ee08acdafcf5453d0dae9b4042d133dfbabe0 +size 581211 diff --git a/examples/digital_fingerprinting/production/grafana/run.py b/examples/digital_fingerprinting/production/grafana/run.py new file mode 100644 index 0000000000..1f10cd4f67 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/run.py @@ -0,0 +1,476 @@ +# Copyright (c) 2022-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""DFP training & inference pipelines for Azure Active Directory logs.""" + +import functools +import logging +import logging.handlers +import os +import typing +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +import click +import logging_loki +import mlflow +import pandas as pd +from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage +from dfp.stages.dfp_file_to_df import DFPFileToDataFrameStage +from dfp.stages.dfp_inference_stage import DFPInferenceStage +from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage +from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage +from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage +from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage +from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage +from dfp.stages.dfp_training import DFPTraining +from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.regex_utils import iso_date_regex + +from morpheus.cli.utils import get_log_levels +from morpheus.cli.utils import get_package_relative_file +from morpheus.cli.utils import load_labels_file +from morpheus.cli.utils import parse_log_level +from morpheus.common import FileTypes +from morpheus.common import FilterSource +from morpheus.config import Config +from morpheus.config import ConfigAutoEncoder +from morpheus.config import CppConfig +from morpheus.pipeline import LinearPipeline +from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.output.write_to_file_stage import WriteToFileStage +from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage +from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.utils.column_info import ColumnInfo +from morpheus.utils.column_info import DataFrameInputSchema +from morpheus.utils.column_info import DateTimeColumn +from morpheus.utils.column_info import DistinctIncrementColumn +from morpheus.utils.column_info import IncrementColumn +from morpheus.utils.column_info import RenameColumn +from morpheus.utils.column_info import StringCatColumn +from morpheus.utils.file_utils import date_extractor +from morpheus.utils.logger import configure_logging + + +def _file_type_name_to_enum(file_type: str) -> FileTypes: + """Converts a file type name to a FileTypes enum.""" + if (file_type == "JSON"): + return FileTypes.JSON + if (file_type == "CSV"): + return FileTypes.CSV + if (file_type == "PARQUET"): + return FileTypes.PARQUET + + return FileTypes.Auto + + +@click.command() +@click.option( + "--train_users", + type=click.Choice(["all", "generic", "individual", "none"], case_sensitive=False), + help=("Indicates whether or not to train per user or a generic model for all users. " + "Selecting none runs the inference pipeline."), +) +@click.option( + "--skip_user", + multiple=True, + type=str, + help="User IDs to skip. Mutually exclusive with only_user", +) +@click.option( + "--only_user", + multiple=True, + type=str, + help="Only users specified by this option will be included. Mutually exclusive with skip_user", +) +@click.option( + "--start_time", + type=click.DateTime( + formats=['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%d %H:%M:%S%z']), + default=None, + help="The start of the time window, if undefined start_date will be `now()-duration`", +) +@click.option( + "--duration", + type=str, + default="60d", + help="The duration to run starting from start_time", +) +@click.option( + "--cache_dir", + type=str, + default="./.cache/dfp", + show_envvar=True, + help="The location to cache data such as S3 downloads and pre-processed data", +) +@click.option("--log_level", + default=logging.getLevelName(Config().log_level), + type=click.Choice(get_log_levels(), case_sensitive=False), + callback=parse_log_level, + help="Specify the logging level to use.") +@click.option("--sample_rate_s", + type=int, + default=0, + show_envvar=True, + help="Minimum time step, in milliseconds, between object logs.") +@click.option("--filter_threshold", + type=float, + default=2.0, + show_envvar=True, + help="Filter out inference results below this threshold") +@click.option( + "--input_file", + "-f", + type=str, + multiple=True, + help=("List of files to process. Can specify multiple arguments for multiple files. " + "Also accepts glob (*) wildcards and schema prefixes such as `s3://`. " + "For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. " + "Refer to fsspec documentation for list of possible options."), +) +@click.option("--file_type_override", + "-t", + type=click.Choice(["AUTO", "JSON", "CSV", "PARQUET"], case_sensitive=False), + default="JSON", + help="Override the detected file type. Values can be 'AUTO', 'JSON', 'CSV', or 'PARQUET'.", + callback=lambda _, + __, + value: None if value is None else _file_type_name_to_enum(value)) +@click.option('--watch_inputs', + type=bool, + is_flag=True, + default=False, + help=("Instructs the pipeline to continuously check the paths specified by `--input_file` for new files. " + "This assumes that the at least one paths contains a wildcard.")) +@click.option("--watch_interval", + type=float, + default=1.0, + help=("Amount of time, in seconds, to wait between checks for new files. " + "Only used if --watch_inputs is set.")) +@click.option('--tracking_uri', + type=str, + default="http://mlflow:5000", + help=("The MLflow tracking URI to connect to the tracking backend.")) +@click.option('--mlflow_experiment_name_template', + type=str, + default="dfp/azure/training/{reg_model_name}", + help="The MLflow experiment name template to use when logging experiments. ") +@click.option('--mlflow_model_name_template', + type=str, + default="DFP-azure-{user_id}", + help="The MLflow model name template to use when logging models. ") +@click.option('--use_postproc_schema', is_flag=True, help='Assume that input data has already been preprocessed.') +@click.option('--inference_detection_file_name', type=str, default="dfp_detections_azure.csv") +@click.option('--loki_url', + type=str, + default="http://loki:3100", + help=("Loki URL for error logging and alerting in Grafana.")) +def run_pipeline(train_users, + skip_user: typing.Tuple[str], + only_user: typing.Tuple[str], + start_time: datetime, + duration, + cache_dir, + log_level, + sample_rate_s, + filter_threshold, + mlflow_experiment_name_template, + mlflow_model_name_template, + file_type_override, + use_postproc_schema, + inference_detection_file_name, + loki_url, + **kwargs): + """Runs the DFP pipeline.""" + # To include the generic, we must be training all or generic + include_generic = train_users in ("all", "generic") + + # To include individual, we must be either training or inferring + include_individual = train_users != "generic" + + # None indicates we aren't training anything + is_training = train_users != "none" + + skip_users = list(skip_user) + only_users = list(only_user) + + duration = timedelta(seconds=pd.Timedelta(duration).total_seconds()) + if start_time is None: + end_time = datetime.now(tz=timezone.utc) + start_time = end_time - duration + else: + if start_time.tzinfo is None: + start_time = start_time.replace(tzinfo=timezone.utc) + + end_time = start_time + duration + + # Enable the Morpheus logger + loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", + ) + configure_logging(loki_handler, log_level=log_level) + logging.getLogger("mlflow").setLevel(log_level) + + if (len(skip_users) > 0 and len(only_users) > 0): + logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") + + logger = logging.getLogger(f"morpheus.{__name__}") + + logger.info("Running training pipeline with the following options: ") + logger.info("Train generic_user: %s", include_generic) + logger.info("Skipping users: %s", skip_users) + logger.info("Start Time: %s", start_time) + logger.info("Duration: %s", duration) + logger.info("Cache Dir: %s", cache_dir) + + if ("tracking_uri" in kwargs): + # Initialize ML Flow + mlflow.set_tracking_uri(kwargs["tracking_uri"]) + logger.info("Tracking URI: %s", mlflow.get_tracking_uri()) + + config = Config() + + CppConfig.set_should_use_cpp(False) + + config.num_threads = os.cpu_count() + + config.ae = ConfigAutoEncoder() + + config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt")) + config.ae.userid_column_name = "username" + config.ae.timestamp_column_name = "timestamp" + + # Specify the column names to ensure all data is uniform + if (use_postproc_schema): + + source_column_info = [ + ColumnInfo(name="autonomousSystemNumber", dtype=str), + ColumnInfo(name="location_geoCoordinates_latitude", dtype=float), + ColumnInfo(name="location_geoCoordinates_longitude", dtype=float), + ColumnInfo(name="resourceDisplayName", dtype=str), + ColumnInfo(name="travel_speed_kmph", dtype=float), + DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"), + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="userPrincipalName"), + RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="deviceDetail_browser"), + RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="deviceDetail_displayName"), + RenameColumn(name="deviceDetailoperatingSystem", dtype=str, input_name="deviceDetail_operatingSystem"), + + # RenameColumn(name="location_country", dtype=str, input_name="location_countryOrRegion"), + ColumnInfo(name="location_city_state_country", dtype=str), + ColumnInfo(name="location_state_country", dtype=str), + ColumnInfo(name="location_country", dtype=str), + + # Non-features + ColumnInfo(name="is_corp_vpn", dtype=bool), + ColumnInfo(name="distance_km", dtype=float), + ColumnInfo(name="ts_delta_hour", dtype=float), + ] + source_schema = DataFrameInputSchema(column_info=source_column_info) + + preprocess_column_info = [ + ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime), + ColumnInfo(name=config.ae.userid_column_name, dtype=str), + + # Resource access + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="resourceDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + + # Device detail + ColumnInfo(name="deviceDetailbrowser", dtype=str), + ColumnInfo(name="deviceDetaildisplayName", dtype=str), + ColumnInfo(name="deviceDetailoperatingSystem", dtype=str), + + # Location information + ColumnInfo(name="autonomousSystemNumber", dtype=str), + ColumnInfo(name="location_geoCoordinates_latitude", dtype=float), + ColumnInfo(name="location_geoCoordinates_longitude", dtype=float), + ColumnInfo(name="location_city_state_country", dtype=str), + ColumnInfo(name="location_state_country", dtype=str), + ColumnInfo(name="location_country", dtype=str), + + # Derived information + ColumnInfo(name="travel_speed_kmph", dtype=float), + + # Non-features + ColumnInfo(name="is_corp_vpn", dtype=bool), + ColumnInfo(name="distance_km", dtype=float), + ColumnInfo(name="ts_delta_hour", dtype=float), + ] + + preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + + exclude_from_training = [ + config.ae.userid_column_name, + config.ae.timestamp_column_name, + "is_corp_vpn", + "distance_km", + "ts_delta_hour", + ] + + config.ae.feature_columns = [ + name for (name, dtype) in preprocess_schema.output_columns if name not in exclude_from_training + ] + else: + source_column_info = [ + DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"), + RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="properties.userPrincipalName"), + RenameColumn(name="appDisplayName", dtype=str, input_name="properties.appDisplayName"), + ColumnInfo(name="category", dtype=str), + RenameColumn(name="clientAppUsed", dtype=str, input_name="properties.clientAppUsed"), + RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="properties.deviceDetail.browser"), + RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="properties.deviceDetail.displayName"), + RenameColumn(name="deviceDetailoperatingSystem", + dtype=str, + input_name="properties.deviceDetail.operatingSystem"), + StringCatColumn(name="location", + dtype=str, + input_columns=[ + "properties.location.city", + "properties.location.countryOrRegion", + ], + sep=", "), + RenameColumn(name="statusfailureReason", dtype=str, input_name="properties.status.failureReason"), + ] + + source_schema = DataFrameInputSchema(json_columns=["properties"], column_info=source_column_info) + + # Preprocessing schema + preprocess_column_info = [ + ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime), + ColumnInfo(name=config.ae.userid_column_name, dtype=str), + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + ColumnInfo(name="deviceDetailbrowser", dtype=str), + ColumnInfo(name="deviceDetaildisplayName", dtype=str), + ColumnInfo(name="deviceDetailoperatingSystem", dtype=str), + ColumnInfo(name="statusfailureReason", dtype=str), + + # Derived columns + IncrementColumn(name="logcount", + dtype=int, + input_name=config.ae.timestamp_column_name, + groupby_column=config.ae.userid_column_name), + DistinctIncrementColumn(name="locincrement", + dtype=int, + input_name="location", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name), + DistinctIncrementColumn(name="appincrement", + dtype=int, + input_name="appDisplayName", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name) + ] + + preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + + # Create a linear pipeline object + pipeline = LinearPipeline(config) + + pipeline.set_source( + MultiFileSource(config, + filenames=list(kwargs["input_file"]), + watch=kwargs["watch_inputs"], + watch_interval=kwargs["watch_interval"])) + + # Batch files into batches by time. Use the default ISO date extractor from the filename + pipeline.add_stage( + DFPFileBatcherStage(config, + period="D", + sampling_rate_s=sample_rate_s, + date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex), + start_time=start_time, + end_time=end_time)) + + parser_kwargs = None + if (file_type_override == FileTypes.JSON): + parser_kwargs = {"lines": False, "orient": "records"} + # Output is a list of fsspec files. Convert to DataFrames. This caches downloaded data + pipeline.add_stage( + DFPFileToDataFrameStage( + config, + schema=source_schema, + file_type=file_type_override, + parser_kwargs=parser_kwargs, # TODO(Devin) probably should be configurable too + cache_dir=cache_dir)) + + pipeline.add_stage(MonitorStage(config, description="Input data rate")) + + # This will split users or just use one single user + pipeline.add_stage( + DFPSplitUsersStage(config, + include_generic=include_generic, + include_individual=include_individual, + skip_users=skip_users, + only_users=only_users)) + + # Next, have a stage that will create rolling windows + pipeline.add_stage( + DFPRollingWindowStage( + config, + min_history=300 if is_training else 1, + min_increment=300 if is_training else 0, + # For inference, we only ever want 1 day max + max_history="60d" if is_training else "1d", + cache_dir=cache_dir)) + + # Output is UserMessageMeta -- Cached frame set + pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema)) + + model_name_formatter = mlflow_model_name_template + experiment_name_formatter = mlflow_experiment_name_template + + if (is_training): + # Finally, perform training which will output a model + pipeline.add_stage(DFPTraining(config, epochs=100, validation_size=0.15)) + + pipeline.add_stage(MonitorStage(config, description="Training rate", smoothing=0.001)) + + # Write that model to MLFlow + pipeline.add_stage( + DFPMLFlowModelWriterStage(config, + model_name_formatter=model_name_formatter, + experiment_name_formatter=experiment_name_formatter)) + else: + # Perform inference on the preprocessed data + pipeline.add_stage(DFPInferenceStage(config, model_name_formatter=model_name_formatter)) + + pipeline.add_stage(MonitorStage(config, description="Inference rate", smoothing=0.001)) + + # Filter for only the anomalous logs + pipeline.add_stage( + FilterDetectionsStage(config, + threshold=filter_threshold, + filter_source=FilterSource.DATAFRAME, + field_name='mean_abs_z')) + pipeline.add_stage(DFPPostprocessingStage(config)) + + # Exclude the columns we don't want in our output + pipeline.add_stage(SerializeStage(config, exclude=['batch_count', 'origin_hash', '_row_hash', '_batch_id'])) + + # Write all anomalies to a CSV file + pipeline.add_stage(WriteToFileStage(config, filename=inference_detection_file_name, overwrite=True)) + + # Run the pipeline + pipeline.run() + + +if __name__ == "__main__": + # pylint: disable=no-value-for-parameter + run_pipeline(obj={}, auto_envvar_prefix='DFP', show_default=True, prog_name="dfp") diff --git a/examples/digital_fingerprinting/production/mlflow/Dockerfile b/examples/digital_fingerprinting/production/mlflow/Dockerfile index 0afdfc44c1..df6e7b4edb 100644 --- a/examples/digital_fingerprinting/production/mlflow/Dockerfile +++ b/examples/digital_fingerprinting/production/mlflow/Dockerfile @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM python:3.8-slim-buster +FROM python:3.10-slim-buster # Install curl for health check RUN apt update && \ diff --git a/examples/digital_fingerprinting/production/morpheus/benchmarks/README.md b/examples/digital_fingerprinting/production/morpheus/benchmarks/README.md index cad1fe96d6..a9c09197d2 100644 --- a/examples/digital_fingerprinting/production/morpheus/benchmarks/README.md +++ b/examples/digital_fingerprinting/production/morpheus/benchmarks/README.md @@ -1,5 +1,5 @@