diff --git a/ci/vale/styles/config/vocabularies/morpheus/accept.txt b/ci/vale/styles/config/vocabularies/morpheus/accept.txt index 285a85c7d8..1b89df3762 100644 --- a/ci/vale/styles/config/vocabularies/morpheus/accept.txt +++ b/ci/vale/styles/config/vocabularies/morpheus/accept.txt @@ -46,6 +46,7 @@ LLM(s?) # https://github.com/logpai/loghub/ Loghub Milvus +PyPI [Mm]ixin MLflow Morpheus @@ -71,6 +72,7 @@ pytest [Ss]ubcard(s?) [Ss]ubgraph(s?) [Ss]ubword(s?) +[Ss]uperset(s?) [Tt]imestamp(s?) [Tt]okenization [Tt]okenizer(s?) diff --git a/docs/CMakeLists.txt b/docs/CMakeLists.txt index 41aa33a535..5e218fc989 100644 --- a/docs/CMakeLists.txt +++ b/docs/CMakeLists.txt @@ -30,7 +30,7 @@ add_custom_target(${PROJECT_NAME}_docs BUILD_DIR=${CMAKE_CURRENT_BINARY_DIR} ${SPHINX_EXECUTABLE} ${SPHINX_HTML_ARGS} ${SPHINX_SOURCE} ${SPHINX_BUILD} WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} COMMENT "Generating documentation with Sphinx" - DEPENDS morpheus-package-outputs morpheus_llm-package-outputs + DEPENDS morpheus-package-outputs morpheus_llm-package-outputs morpheus_dfp-package-outputs ) add_custom_target(${PROJECT_NAME}_docs_linkcheck @@ -38,7 +38,7 @@ add_custom_target(${PROJECT_NAME}_docs_linkcheck BUILD_DIR=${CMAKE_CURRENT_BINARY_DIR} ${SPHINX_EXECUTABLE} ${SPHINX_LINKCHECK_ARGS} ${SPHINX_SOURCE} ${SPHINX_LINKCHECK_OUT} WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR} COMMENT "Checking documentation links with Sphinx" - DEPENDS morpheus-package-outputs morpheus_llm-package-outputs + DEPENDS morpheus-package-outputs morpheus_llm-package-outputs morpheus_dfp-package-outputs ) list(POP_BACK CMAKE_MESSAGE_CONTEXT) diff --git a/docs/source/conda_packages.md b/docs/source/conda_packages.md new file mode 100644 index 0000000000..1ea5f65828 --- /dev/null +++ b/docs/source/conda_packages.md @@ -0,0 +1,128 @@ + + +# Morpheus Conda Packages +The Morpheus stages are the building blocks for creating pipelines. The stages are organized into libraries by use case. The current libraries are: +- `morpheus-core` +- `morpheus-dfp` +- `morpheus-llm` + +The libraries are hosted as Conda packages on the [`nvidia`](https://anaconda.org/nvidia/) channel. + +The split into multiple libraries allows for a more modular approach to using the Morpheus stages. For example, if you are building an application for Digital Finger Printing, you can install just the `morpheus-dfp` library. This reduces the size of the installed package. It also limits the dependencies eliminating unnecessary version conflicts. + + +## Morpheus Core +The `morpheus-core` library contains the core stages that are common across all use cases. The Morpheus core library is built from the source code in the `python/morpheus` directory of the Morpheus repository. The core library is installed as a dependency when you install any of the other Morpheus libraries. +To set up a Conda environment with the [`morpheus-core`](https://anaconda.org/nvidia/morpheus-core) library you can run the following commands: +### Create a Conda environment +```bash +export CONDA_ENV_NAME=morpheus +conda create -n ${CONDA_ENV_NAME} python=3.10 +conda activate ${CONDA_ENV_NAME} +``` +### Add Conda channels +These channel are required for installing the runtime dependencies +```bash +conda config --env --add channels conda-forge &&\ + conda config --env --add channels nvidia &&\ + conda config --env --add channels rapidsai &&\ + conda config --env --add channels pytorch +``` +### Install the `morpheus-core` library +```bash +conda install -c nvidia morpheus-core +``` +The `morpheus-core` Conda package installs the `morpheus` python package. It also pulls down all the necessary Conda runtime dependencies for the core stages including [`mrc`](https://anaconda.org/nvidia/mrc) and [`libmrc`](https://anaconda.org/nvidia/libmrc). +### Install additional PyPI dependencies +Some of the stages in the core library require additional dependencies that are hosted on PyPI. These dependencies are included as a requirements file in the `morpheus` python package. The requirements files can be located and installed by running the following command: +```bash +MORPHEUS_CORE_PKG_DIR=$(dirname $(python -c "import morpheus; print(morpheus.__file__)")) +pip install -r ${MORPHEUS_CORE_PKG_DIR}/requirements_morpheus_core.txt +``` + +## Morpheus DFP +Digital Finger Printing (DFP) is a technique used to identify anomalous behavior and uncover potential threats in the environment​. The `morpheus-dfp` library contains stages for DFP. It is built from the source code in the `python/morpheus_dfp` directory of the Morpheus repository. To set up a Conda environment with the [`morpheus-dfp`](https://anaconda.org/nvidia/morpheus-dfp) library you can run the following commands: +### Create a Conda environment +```bash +export CONDA_ENV_NAME=morpheus-dfp +conda create -n ${CONDA_ENV_NAME} python=3.10 +conda activate ${CONDA_ENV_NAME} +``` +### Add Conda channels +These channel are required for installing the runtime dependencies +```bash +conda config --env --add channels conda-forge &&\ + conda config --env --add channels nvidia &&\ + conda config --env --add channels rapidsai &&\ + conda config --env --add channels pytorch +``` +### Install the `morpheus-dfp` library +```bash +conda install -c nvidia morpheus-dfp +``` +The `morpheus-dfp` Conda package installs the `morpheus_dfp` python package. It also pulls down all the necessary Conda runtime dependencies including [`morpheus-core`](https://anaconda.org/nvidia/morpheus-core). +### Install additional PyPI dependencies +Some of the DFP stages in the library require additional dependencies that are hosted on PyPI. These dependencies are included as a requirements file in the `morpheus_dfp` python package. And can be installed by running the following command: +```bash +MORPHEUS_DFP_PKG_DIR=$(dirname $(python -c "import morpheus_dfp; print(morpheus_dfp.__file__)")) +pip install -r ${MORPHEUS_DFP_PKG_DIR}/requirements_morpheus_dfp.txt +``` + +## Morpheus LLM +The `morpheus-llm` library contains stages for Large Language Models (LLM) and Vector Databases. These stages are used for setting up Retrieval Augmented Generation (RAG) pipelines. The `morpheus-llm` library is built from the source code in the `python/morpheus_llm` directory of the Morpheus repository. +To set up a Conda environment with the [`morpheus-llm`](https://anaconda.org/nvidia/morpheus-dfp) library you can run the following commands: +### Create a Conda environment +```bash +export CONDA_ENV_NAME=morpheus-llm +conda create -n ${CONDA_ENV_NAME} python=3.10 +conda activate ${CONDA_ENV_NAME} +``` +### Add Conda channels +These channel are required for installing the runtime dependencies +```bash +conda config --env --add channels conda-forge &&\ + conda config --env --add channels nvidia &&\ + conda config --env --add channels rapidsai &&\ + conda config --env --add channels pytorch +``` +### Install the `morpheus-llm` library +```bash +conda install -c nvidia morpheus-llm +``` +The `morpheus-llm` Conda package installs the `morpheus_llm` python package. It also pulls down all the necessary Conda packages including [`morpheus-core`](https://anaconda.org/nvidia/morpheus-core). +### Install additional PyPI dependencies +Some of the stages in the library require additional dependencies that are hosted on PyPI. These dependencies are included as a requirements file in the `morpheus_llm` python package. And can be installed by running the following command: +```bash +MORPHEUS_LLM_PKG_DIR=$(dirname $(python -c "import morpheus_llm; print(morpheus_llm.__file__)")) +pip install -r ${MORPHEUS_LLM_PKG_DIR}/requirements_morpheus_llm.txt +``` + +## Miscellaneous +### Morpheus Examples +The Morpheus examples are not included in the Morpheus Conda packages. To use them you need to clone the Morpheus repository and run the examples from source. For details refer to the [Morpheus Examples](./examples.md). + +### Namespace Updates +If you were using a Morpheus release prior to 24.10 you may need to update the namespace for the DFP, LLM and vector database stages. + +A script, `scripts/morpheus_namespace_update.py`, has been provide to help with that and can be run as follows: +```bash +python scripts/morpheus_namespace_update.py --directory --dfp +``` +```bash +python scripts/morpheus_namespace_update.py --directory --llm +``` diff --git a/docs/source/developer_guide/contributing.md b/docs/source/developer_guide/contributing.md index aa52cccda7..2b69eb7a53 100644 --- a/docs/source/developer_guide/contributing.md +++ b/docs/source/developer_guide/contributing.md @@ -151,7 +151,35 @@ This workflow utilizes a Docker container to set up most dependencies ensuring a ### Build in a Conda Environment -If a Conda environment on the host machine is preferred over Docker, it is relatively easy to install the necessary dependencies (In reality, the Docker workflow creates a Conda environment inside the container). +If a [Conda](https://docs.conda.io/projects/conda/en/latest/) environment on the host machine is preferred over Docker, it is relatively easy to install the necessary dependencies (In reality, the Docker workflow creates a Conda environment inside the container). + +#### Conda Environment YAML Files +Morpheus provides multiple Conda environment files to support different workflows. Morpheus utilizes [rapids-dependency-file-generator](https://pypi.org/project/rapids-dependency-file-generator/) to manage these multiple environment files. All of Morpheus' Conda and [pip](https://pip.pypa.io/en/stable/) dependencies along with the different environments are defined in the `dependencies.yaml` file. + +The following are the available Conda environment files, all are located in the `conda/environments` directory, with the following naming convention: `__arch-.yaml`. +| Environment | File | Description | +| --- | --- | --- | +| `all` | `all_cuda-125_arch-x86_64.yaml` | All dependencies required to build, run and test Morpheus, along with all of the examples. This is a superset of the `dev`, `runtime` and `examples` environments. | +| `dev` | `dev_cuda-125_arch-x86_64.yaml` | Dependencies required to build, run and test Morpheus. This is a superset of the `runtime` environment. | +| `examples` | `examples_cuda-125_arch-x86_64.yaml` | Dependencies required to run all examples. This is a superset of the `runtime` environment. | +| `model-utils` | `model-utils_cuda-125_arch-x86_64.yaml` | Dependencies required to train models independent of Morpheus. | +| `runtime` | `runtime_cuda-125_arch-x86_64.yaml` | Minimal set of dependencies strictly required to run Morpheus. | + + +##### Updating Morpheus Dependencies +Changes to Morpheus dependencies can be made in the `dependencies.yaml` file, then run `rapids-dependency-file-generator` to update the individual environment files in the `conda/environments` directory . + +Install `rapids-dependency-file-generator` into the base Conda environment: +```bash +conda run -n base --live-stream pip install rapids-dependency-file-generator +``` + +Then to generate update the individual environment files run: +```bash +conda run -n base --live-stream rapids-dependency-file-generator +``` + +When ready, commit both the changes to the `dependencies.yaml` file and the updated environment files into the repo. #### Prerequisites @@ -170,19 +198,21 @@ If a Conda environment on the host machine is preferred over Docker, it is relat ```bash git submodule update --init --recursive ``` -1. Create the Morpheus Conda environment +1. Create the Morpheus Conda environment using either the `dev` or `all` environment file. Refer to the [Conda Environment YAML Files](#conda-environment-yaml-files) section for more information. ```bash conda env create --solver=libmamba -n morpheus --file conda/environments/dev_cuda-125_arch-x86_64.yaml - conda activate morpheus ``` + or + ```bash + conda env create --solver=libmamba -n morpheus --file conda/environments/all_cuda-125_arch-x86_64.yaml - This creates a new environment named `morpheus`, and activates that environment. + ``` - > **Note**: The `dev_cuda-121_arch-x86_64.yaml` Conda environment file specifies all of the dependencies required to build Morpheus and run Morpheus. However many of the examples, and optional packages such as `morpheus_llm` require additional dependencies. Alternately the following command can be used to create the Conda environment: + This creates a new environment named `morpheus`. Activate the environment with: ```bash - conda env create --solver=libmamba -n morpheus --file conda/environments/all_cuda-121_arch-x86_64.yaml conda activate morpheus ``` + 1. Build Morpheus ```bash ./scripts/compile.sh diff --git a/docs/source/getting_started.md b/docs/source/getting_started.md index b4d2b04cab..7b0b43217a 100644 --- a/docs/source/getting_started.md +++ b/docs/source/getting_started.md @@ -19,6 +19,7 @@ limitations under the License. There are three ways to get started with Morpheus: - [Using pre-built Docker containers](#using-pre-built-docker-containers) +- [Using the Morpheus Conda packages](#using-morpheus-conda-packages) - [Building the Morpheus Docker container](#building-the-morpheus-container) - [Building Morpheus from source](./developer_guide/contributing.md#building-from-source) @@ -78,6 +79,12 @@ Once launched, users wishing to launch Triton using the included Morpheus models Skip ahead to the [Acquiring the Morpheus Models Container](#acquiring-the-morpheus-models-container) section. +## Using Morpheus Conda Packages +The Morpheus stages are available as libraries that are hosted on the [`nvidia`](https://anaconda.org/nvidia) Conda channel. The Morpheus Conda packages are: +[`morpheus-core`](https://anaconda.org/nvidia/morpheus-core), [`morpheus-dfp`](https://anaconda.org/nvidia/morpheus-dfp) and [`morpheus-llm`](https://anaconda.org/nvidia/morpheus-llm) + +For details on these libraries and how to use them, refer to the [Morpheus Conda Packages](./conda_packages.md) guide. + ## Building the Morpheus Container ### Clone the Repository diff --git a/docs/source/index.rst b/docs/source/index.rst index fae48cc6b0..dce4a88bfd 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -54,6 +54,7 @@ Getting Started Using Morpheus ^^^^^^^^^^^^^^ * :doc:`getting_started` - Using pre-built Docker containers, building Docker containers from source, and fetching models and datasets + * :doc:`Morpheus Conda Packages `- Using Morpheus Libraries via the pre-built Conda Packages * :doc:`basics/overview` - Brief overview of the command line interface * :doc:`basics/building_a_pipeline` - Introduction to building a pipeline using the command line interface * :doc:`Morpheus Examples ` - Example pipelines using both the Python API and command line interface @@ -76,6 +77,7 @@ Deploying Morpheus :hidden: getting_started + conda_packages basics/overview basics/building_a_pipeline models_and_datasets diff --git a/docs/source/py_api.rst b/docs/source/py_api.rst index e4aa991db2..e37d340216 100644 --- a/docs/source/py_api.rst +++ b/docs/source/py_api.rst @@ -22,4 +22,5 @@ Python API :recursive: morpheus + morpheus_dfp morpheus_llm diff --git a/docs/source/stages/morpheus_stages.md b/docs/source/stages/morpheus_stages.md index db2d533606..e860beff38 100644 --- a/docs/source/stages/morpheus_stages.md +++ b/docs/source/stages/morpheus_stages.md @@ -66,7 +66,7 @@ Stages are the building blocks of Morpheus pipelines. Below is a list of the mos ## LLM -- LLM Engine Stage {py:class}`~morpheus.stages.llm.llm_engine_stage.LLMEngineStage` Execute an LLM engine within a Morpheus pipeline. +- LLM Engine Stage {py:class}`~morpheus_llm.stages.llm.llm_engine_stage.LLMEngineStage` Execute an LLM engine within a Morpheus pipeline. ## Output - HTTP Client Sink Stage {py:class}`~morpheus.stages.output.http_client_sink_stage.HttpClientSinkStage` Write all messages to an HTTP endpoint. diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py index 05611fbca0..21a0cfb96d 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py @@ -50,8 +50,8 @@ def dfp_deployment(builder: mrc.Builder): - mlflow_writer_options (dict): Options for the MLflow model writer; Example: See Below - preprocessing_options (dict): Options for preprocessing the data; Example: See Below - stream_aggregation_options (dict): Options for aggregating the data by stream; Example: See Below - - timestamp_column_name (str): Name of the timestamp column used in the data; Example: "my_timestamp"; Default: - "timestamp" + - timestamp_column_name (str): Name of the timestamp column used in the data; Example: "my_timestamp"; + Default: "timestamp" - user_splitting_options (dict): Options for splitting the data by user; Example: See Below Inference Options Parameters: @@ -61,10 +61,10 @@ def dfp_deployment(builder: mrc.Builder): - fallback_username (str): User ID to use if user ID not found; Example: "generic_user"; Default: "generic_user" - inference_options (dict): Options for the inference module; Example: See Below - model_name_formatter (str): Format string for the model name; Example: "model_{timestamp}"; - Default: `[Required]` + Default: `[Required]` - num_output_ports (int): Number of output ports for the module; Example: 3 - timestamp_column_name (str): Name of the timestamp column in the input data; Example: "timestamp"; - Default: "timestamp" + Default: "timestamp" - stream_aggregation_options (dict): Options for aggregating the data by stream; Example: See Below - user_splitting_options (dict): Options for splitting the data by user; Example: See Below - write_to_file_options (dict): Options for writing the detections to a file; Example: See Below @@ -72,7 +72,7 @@ def dfp_deployment(builder: mrc.Builder): batching_options: - end_time (datetime/str): Endtime of the time window; Example: "2023-03-14T23:59:59"; Default: None - iso_date_regex_pattern (str): Regex pattern for ISO date matching; - Example: "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"; Default: + Example: "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"; Default: - parser_kwargs (dict): Additional arguments for the parser; Example: {}; Default: {} - period (str): Time period for grouping files; Example: "1d"; Default: "1d" - sampling_rate_s (int):: Sampling rate in seconds; Example: 0; Default: None @@ -82,18 +82,18 @@ def dfp_deployment(builder: mrc.Builder): - feature_columns (list): List of feature columns to train on; Example: ["column1", "column2", "column3"] - epochs (int): Number of epochs to train for; Example: 50 - model_kwargs (dict): Keyword arguments to pass to the model; Example: {"encoder_layers": [64, 32], - "decoder_layers": [32, 64], "activation": "relu", "swap_p": 0.1, "lr": 0.001, "lr_decay": 0.9, - "batch_size": 32, "verbose": 1, "optimizer": "adam", "scalar": "min_max", "min_cats": 10, - "progress_bar": false, "device": "cpu"} + "decoder_layers": [32, 64], "activation": "relu", "swap_p": 0.1, "lr": 0.001, "lr_decay": 0.9, + "batch_size": 32, "verbose": 1, "optimizer": "adam", "scalar": "min_max", "min_cats": 10, + "progress_bar": false, "device": "cpu"} - validation_size (float): Size of the validation set; Example: 0.1 mlflow_writer_options: - conda_env (str): Conda environment for the model; Example: `path/to/conda_env.yml`; Default: `[Required]` - databricks_permissions (dict): Permissions for the model; Example: See Below; Default: None - experiment_name_formatter (str): Formatter for the experiment name; Example: `experiment_name_{timestamp}`; - Default: `[Required]` + Default: `[Required]` - model_name_formatter (str): Formatter for the model name; Example: `model_name_{timestamp}`; - Default: `[Required]` + Default: `[Required]` - timestamp_column_name (str): Name of the timestamp column; Example: `timestamp`; Default: timestamp stream_aggregation_options: @@ -101,24 +101,24 @@ def dfp_deployment(builder: mrc.Builder): met. Otherwise, continue to aggregate user's history.; Example: 'batch'; Default: 'batch' - trigger_on_min_history (int): Minimum history to trigger a new training event; Example: 1; Default: 1 - trigger_on_min_increment (int): Minmum increment from the last trained to new training event; - Example: 0; Default: 0 + Example: 0; Default: 0 - timestamp_column_name (str): Name of the column containing timestamps; Example: 'timestamp'; - Default: 'timestamp' + Default: 'timestamp' - aggregation_span (str): Lookback timespan for training data in a new training event; Example: '60d'; - Default: '60d' + Default: '60d' - cache_to_disk (bool): Whether to cache streaming data to disk; Example: false; Default: false - cache_dir (str): Directory to use for caching streaming data; Example: './.cache'; Default: './.cache' user_splitting_options: - fallback_username (str): The user ID to use if the user ID is not found; Example: "generic_user"; - Default: 'generic_user' + Default: 'generic_user' - include_generic (bool): Whether to include a generic user ID in the output; Example: false; Default: False - include_individual (bool): Whether to include individual user IDs in the output; Example: true; Default: False - only_users (list): List of user IDs to include; others will be excluded; Example: ["user1", "user2", "user3"]; - Default: [] + Default: [] - skip_users (list): List of user IDs to exclude from the output; Example: ["user4", "user5"]; Default: [] - timestamp_column_name (str): Name of the column containing timestamps; Example: "timestamp"; - Default: 'timestamp' + Default: 'timestamp' - userid_column_name (str): Name of the column containing user IDs; Example: "username"; Default: 'username' detection_criteria: @@ -127,9 +127,9 @@ def dfp_deployment(builder: mrc.Builder): inference_options: - model_name_formatter (str): Formatter for model names; Example: "user_{username}_model"; - Default: `[Required]` + Default: `[Required]` - fallback_username (str): Fallback user to use if no model is found for a user; Example: "generic_user"; - Default: generic_user + Default: generic_user - timestamp_column_name (str): Name of the timestamp column; Example: "timestamp"; Default: timestamp write_to_file_options: @@ -141,19 +141,19 @@ def dfp_deployment(builder: mrc.Builder): monitoring_options: - description (str): Name to show for this Monitor Stage in the console window; Example: 'Progress'; - Default: 'Progress' + Default: 'Progress' - silence_monitors (bool): Slience the monitors on the console; Example: True; Default: False - smoothing (float): Smoothing parameter to determine how much the throughput should be averaged. - 0 = Instantaneous, 1 = Average.; Example: 0.01; Default: 0.05 + 0 = Instantaneous, 1 = Average.; Example: 0.01; Default: 0.05 - unit (str): Units to show in the rate value.; Example: 'messages'; Default: 'messages' - delayed_start (bool): When delayed_start is enabled, the progress bar will not be shown until the first - message is received. Otherwise, the progress bar is shown on pipeline startup and will begin timing - immediately. In large pipelines, this option may be desired to give a more accurate timing; - Example: True; Default: False + message is received. Otherwise, the progress bar is shown on pipeline startup and will begin timing + immediately. In large pipelines, this option may be desired to give a more accurate timing; + Example: True; Default: False - determine_count_fn_schema (str): Custom function for determining the count in a message. Gets called for - each message. Allows for correct counting of batched and sliced messages.; Example: func_str; Default: None + each message. Allows for correct counting of batched and sliced messages.; Example: func_str; Default: None - log_level (str): Enable this stage when the configured log level is at `log_level` or lower; - Example: 'DEBUG'; Default: INFO + Example: 'DEBUG'; Default: INFO """ # MODULE_INPUT_PORT diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference.py index c710d09f9f..b0a37fcbc4 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference.py @@ -46,9 +46,9 @@ def dfp_inference(builder: mrc.Builder): ---------- Configurable parameters: - model_name_formatter (str): Formatter for model names; Example: "user_{username}_model"; - Default: `[Required]` + Default: `[Required]` - fallback_username (str): Fallback user to use if no model is found for a user; Example: "generic_user"; - Default: generic_user + Default: generic_user - timestamp_column_name (str): Name of the timestamp column; Example: "timestamp"; Default: timestamp """ diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py index 26c36ca763..d18809cb2b 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py @@ -49,15 +49,15 @@ def dfp_inference_pipe(builder: mrc.Builder): Configurable parameters: - batching_options (dict): Options for batching the data; Example: See Below - cache_dir (str): Directory to cache the rolling window data; Example: "/path/to/cache/dir"; - Default: ./.cache + Default: ./.cache - detection_criteria (dict): Criteria for filtering detections; Example: See Below - fallback_username (str): User ID to use if user ID not found; Example: "generic_user"; - Default: "generic_user" + Default: "generic_user" - inference_options (dict): Options for the inference module; Example: See Below - model_name_formatter (str): Format string for the model name; Example: "model_{timestamp}"; - Default: `[Required]` + Default: `[Required]` - timestamp_column_name (str): Name of the timestamp column in the input data; Example: "timestamp"; - Default: "timestamp" + Default: "timestamp" - stream_aggregation_options (dict): Options for aggregating the data by stream; Example: See Below - user_splitting_options (dict): Options for splitting the data by user; Example: See Below - write_to_file_options (dict): Options for writing the detections to a file; Example: See Below @@ -66,12 +66,12 @@ def dfp_inference_pipe(builder: mrc.Builder): batching_options: - end_time (datetime/str): End time of the time window; Example: "2023-03-14T23:59:59"; Default: None - iso_date_regex_pattern (str): Regex pattern for ISO date matching; - Example: "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"; Default: + Example: "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"; Default: - parser_kwargs (dict): Additional arguments for the parser; Example: {}; Default: {} - period (str): Time period for grouping files; Example: "1d"; Default: "1d" - sampling_rate_s (int): Sampling rate in seconds; Example: 0; Default: None - start_time (datetime/str): Start time of the time window; Example: "2023-03-01T00:00:00"; - Default: None + Default: None detection_criteria: - copy (bool): Whether to copy the rows or slice them; Example: true; Default: true @@ -87,9 +87,9 @@ def dfp_inference_pipe(builder: mrc.Builder): inference_options: - model_name_formatter (str): Formatter for model names; Example: "user_{username}_model"; - Default: `[Required]` + Default: `[Required]` - fallback_username (str): Fallback user to use if no model is found for a user; Example: "generic_user"; - Default: generic_user + Default: generic_user - timestamp_column_name (str): Name of the timestamp column; Example: "timestamp"; Default: timestamp stream_aggregation_options: @@ -97,44 +97,44 @@ def dfp_inference_pipe(builder: mrc.Builder): are met. Otherwise, continue to aggregate user's history.; Example: 'batch'; Default: 'batch' - trigger_on_min_history (int): Minimum history to trigger a new training event; Example: 1; Default: 1 - trigger_on_min_increment (int): Minimum increment from the last trained to new training event; - Example: 0; Default: 0 + Example: 0; Default: 0 - timestamp_column_name (str): Name of the column containing timestamps; Example: 'timestamp'; - Default: 'timestamp' + Default: 'timestamp' - aggregation_span (str): Lookback timespan for training data in a new training event; Example: '60d'; - Default: '60d' + Default: '60d' - cache_to_disk (bool): Whether to cache streaming data to disk; Example: false; Default: false - cache_dir (str): Directory to use for caching streaming data; Example: './.cache'; Default: './.cache' user_splitting_options: - fallback_username (str): The user ID to use if the user ID is not found; Example: "generic_user"; - Default: 'generic_user' + Default: 'generic_user' - include_generic (bool): Whether to include a generic user ID in the output; Example: false; Default: False - include_individual (bool): Whether to include individual user IDs in the output; Example: true; - Default: False + Default: False - only_users (list): List of user IDs to include; others will be excluded; - Example: ["user1", "user2", "user3"]; - Default: [] + Example: ["user1", "user2", "user3"]; + Default: [] - skip_users (list): List of user IDs to exclude from the output; Example: ["user4", "user5"]; Default: [] - timestamp_column_name (str): Name of the column containing timestamps; Example: "timestamp"; - Default: 'timestamp' + Default: 'timestamp' - userid_column_name (str): Name of the column containing user IDs; Example: "username"; Default: 'username' monitor_options: - description (str): Name to show for this Monitor Stage in the console window; Example: 'Progress'; - Default: 'Progress' + Default: 'Progress' - silence_monitors (bool): Slience the monitors on the console; Example: True; Default: False - smoothing (float): Smoothing parameter to determine how much the throughput should be averaged. - 0 = Instantaneous, 1 = Average.; Example: 0.01; Default: 0.05 + 0 = Instantaneous, 1 = Average.; Example: 0.01; Default: 0.05 - unit (str): Units to show in the rate value.; Example: 'messages'; Default: 'messages' - delayed_start (bool): When delayed_start is enabled, the progress bar will not be shown until the first - message is received. Otherwise, the progress bar is shown on pipeline startup and will begin timing - immediately. In large pipelines, this option may be desired to give a more accurate timing; - Example: True; Default: False + message is received. Otherwise, the progress bar is shown on pipeline startup and will begin timing + immediately. In large pipelines, this option may be desired to give a more accurate timing; + Example: True; Default: False - determine_count_fn_schema (str): Custom function for determining the count in a message. Gets called - for each message. Allows for correct counting of batched and sliced messages.; Example: func_str; - Default: None + for each message. Allows for correct counting of batched and sliced messages.; Example: func_str; + Default: None - log_level (str): Enable this stage when the configured log level is at `log_level` or lower; - Example: 'DEBUG'; Default: INFO + Example: 'DEBUG'; Default: INFO write_to_file_options: - filename (str): Path to the output file; Example: `output.csv`; Default: None diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_rolling_window.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_rolling_window.py index ea9b1fede8..27f4054143 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_rolling_window.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_rolling_window.py @@ -51,11 +51,11 @@ def dfp_rolling_window(builder: mrc.Builder): met. Otherwise, continue to aggregate user's history.; Example: 'batch'; Default: 'batch' - trigger_on_min_history (int): Minimum history to trigger a new training event; Example: 1; Default: 1 - trigger_on_min_increment (int): Minmum increment from the last trained to new training event; - Example: 0; Default: 0 + Example: 0; Default: 0 - timestamp_column_name (str): Name of the column containing timestamps; Example: 'timestamp'; - Default: 'timestamp' + Default: 'timestamp' - aggregation_span (str): Lookback timespan for training data in a new training event; Example: '60d'; - Default: '60d' + Default: '60d' - cache_to_disk (bool): Whether to cache streaming data to disk; Example: false; Default: false - cache_dir (str): Directory to use for caching streaming data; Example: './.cache'; Default: './.cache' """ diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_split_users.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_split_users.py index 7ec8c7f0f4..276113cfb8 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_split_users.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_split_users.py @@ -45,14 +45,14 @@ def dfp_split_users(builder: mrc.Builder): ----- Configurable parameters: - fallback_username (str): The user ID to use if the user ID is not found; Example: "generic_user"; - Default: 'generic_user' + Default: 'generic_user' - include_generic (bool): Whether to include a generic user ID in the output; Example: false; Default: False - include_individual (bool): Whether to include individual user IDs in the output; Example: true; Default: False - only_users (list): List of user IDs to include; others will be excluded; Example: ["user1", "user2", "user3"]; - Default: [] + Default: [] - skip_users (list): List of user IDs to exclude from the output; Example: ["user4", "user5"]; Default: [] - timestamp_column_name (str): Name of the column containing timestamps; Example: "timestamp"; - Default: 'timestamp' + Default: 'timestamp' - userid_column_name (str): Name of the column containing user IDs; Example: "username"; Default: 'username' """ diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_training.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_training.py index 6bc41d1d09..20eebf5570 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_training.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_training.py @@ -46,9 +46,9 @@ def dfp_training(builder: mrc.Builder): - feature_columns (list): List of feature columns to train on; Example: ["column1", "column2", "column3"] - epochs (int): Number of epochs to train for; Example: 50 - model_kwargs (dict): Keyword arguments to pass to the model; Example: {"encoder_layers": [64, 32], - "decoder_layers": [32, 64], "activation": "relu", "swap_p": 0.1, "lr": 0.001, "lr_decay": 0.9, - "batch_size": 32, "verbose": 1, "optimizer": "adam", "scalar": "min_max", "min_cats": 10, - "progress_bar": false, "device": "cpu"} + "decoder_layers": [32, 64], "activation": "relu", "swap_p": 0.1, "lr": 0.001, "lr_decay": 0.9, + "batch_size": 32, "verbose": 1, "optimizer": "adam", "scalar": "min_max", "min_cats": 10, + "progress_bar": false, "device": "cpu"} - validation_size (float): Size of the validation set; Example: 0.1 """ diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py index cfa4c49fdc..394ae0a12b 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py @@ -46,43 +46,43 @@ def dfp_training_pipe(builder: mrc.Builder): Configurable parameters: - batching_options (dict): Options for batching the data; Example: See Below - cache_dir (str): Directory to cache the rolling window data; Example: "/path/to/cache/dir"; - Default: ./.cache + Default: ./.cache - dfencoder_options (dict): Options for configuring the data frame encoder; Example: See Below - mlflow_writer_options (dict): Options for the MLflow model writer; Example: See Below - stream_aggregation_options (dict): Options for aggregating the data by stream; Example: See Below - timestamp_column_name (str): Name of the timestamp column used in the data; Example: "my_timestamp"; - Default: "timestamp" + Default: "timestamp" - user_splitting_options (dict): Options for splitting the data by user; Example: See Below - monitor_options (dict): Options for monitoring throughput; Example: See Below batching_options: - end_time (datetime/string): Endtime of the time window; Example: "2023-03-14T23:59:59"; Default: None - iso_date_regex_pattern (str): Regex pattern for ISO date matching; - Example: "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"; Default: + Example: "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}"; Default: - parser_kwargs (dict): Additional arguments for the parser; Example: {}; Default: {} - period (str): Time period for grouping files; Example: "1d"; Default: "1d" - sampling_rate_s (int): Sampling rate in seconds; Example: 0; Default: None - start_time (datetime/string): Start time of the time window; Example: "2023-03-01T00:00:00"; - Default: Nome + Default: Nome dfencoder_options: - feature_columns (list): List of feature columns to train on; Example: ["column1", "column2", "column3"] - epochs (int): Number of epochs to train for; Example: 50 - model_kwargs (dict): Keyword arguments to pass to the model; Example: {"encoder_layers": [64, 32], - "decoder_layers": [32, 64], "activation": "relu", "swap_p": 0.1, "lr": 0.001, "lr_decay": 0.9, - "batch_size": 32, "verbose": 1, "optimizer": "adam", "scalar": "min_max", "min_cats": 10, - "progress_bar": false, "device": "cpu"} + "decoder_layers": [32, 64], "activation": "relu", "swap_p": 0.1, "lr": 0.001, "lr_decay": 0.9, + "batch_size": 32, "verbose": 1, "optimizer": "adam", "scalar": "min_max", "min_cats": 10, + "progress_bar": false, "device": "cpu"} - validation_size (float): Size of the validation set; Example: 0.1 mlflow_writer_options: - conda_env (str): Conda environment for the model; Example: `path/to/conda_env.yml`; - Default: `[Required]` + Default: `[Required]` - databricks_permissions (dict): Permissions for the model; Example: See Below; Default: None - experiment_name_formatter (str): Formatter for the experiment name; - Example: `experiment_name_{timestamp}`; - Default: `[Required]` + Example: `experiment_name_{timestamp}`; + Default: `[Required]` - model_name_formatter (str): Formatter for the model name; Example: `model_name_{timestamp}`; - Default: `[Required]` + Default: `[Required]` - timestamp_column_name (str): Name of the timestamp column; Example: `timestamp`; Default: timestamp stream_aggregation_options: @@ -90,44 +90,44 @@ def dfp_training_pipe(builder: mrc.Builder): are met. Otherwise, continue to aggregate user's history.; Example: 'batch'; Default: 'batch' - trigger_on_min_history (int): Minimum history to trigger a new training event; Example: 1; Default: 1 - trigger_on_min_increment (int): Minimum increment from the last trained to new training event; - Example: 0; Default: 0 + Example: 0; Default: 0 - timestamp_column_name (str): Name of the column containing timestamps; Example: 'timestamp'; - Default: 'timestamp' + Default: 'timestamp' - aggregation_span (str): Lookback timespan for training data in a new training event; Example: '60d'; - Default: '60d' + Default: '60d' - cache_to_disk (bool): Whether to cache streaming data to disk; Example: false; Default: false - cache_dir (str): Directory to use for caching streaming data; Example: './.cache'; Default: './.cache' user_splitting_options: - fallback_username (str): The user ID to use if the user ID is not found; Example: "generic_user"; - Default: 'generic_user' + Default: 'generic_user' - include_generic (bool): Whether to include a generic user ID in the output; Example: false; Default: False - include_individual (bool): Whether to include individual user IDs in the output; Example: true; - Default: False + Default: False - only_users (list): List of user IDs to include; others will be excluded; - Example: ["user1", "user2", "user3"]; - Default: [] + Example: ["user1", "user2", "user3"]; + Default: [] - skip_users (list): List of user IDs to exclude from the output; Example: ["user4", "user5"]; Default: [] - timestamp_column_name (str): Name of the column containing timestamps; Example: "timestamp"; - Default: 'timestamp' + Default: 'timestamp' - userid_column_name (str): Name of the column containing user IDs; Example: "username"; Default: 'username' monitor_options: - description (str): Name to show for this Monitor Stage in the console window; Example: 'Progress'; - Default: 'Progress' + Default: 'Progress' - silence_monitors (bool): Slience the monitors on the console; Example: True; Default: False - smoothing (float): Smoothing parameter to determine how much the throughput should be averaged. - 0 = Instantaneous, 1 = Average.; Example: 0.01; Default: 0.05 + 0 = Instantaneous, 1 = Average.; Example: 0.01; Default: 0.05 - unit (str): Units to show in the rate value.; Example: 'messages'; Default: 'messages' - delayed_start (bool): When delayed_start is enabled, the progress bar will not be shown until the first - message is received. Otherwise, the progress bar is shown on pipeline startup and will begin timing - immediately. In large pipelines, this option may be desired to give a more accurate timing; - Example: True; Default: False + message is received. Otherwise, the progress bar is shown on pipeline startup and will begin timing + immediately. In large pipelines, this option may be desired to give a more accurate timing; + Example: True; Default: False - determine_count_fn_schema (str): Custom function for determining the count in a message. Gets called - for each message. Allows for correct counting of batched and sliced messages.; Example: func_str; - Default: None + for each message. Allows for correct counting of batched and sliced messages.; Example: func_str; + Default: None - log_level (str): Enable this stage when the configured log level is at `log_level` or lower; - Example: 'DEBUG'; Default: INFO + Example: 'DEBUG'; Default: INFO """ # MODULE_INPUT_PORT diff --git a/scripts/morpheus_namespace_update.py b/scripts/morpheus_namespace_update.py index 2d5056f6a9..96909bf490 100755 --- a/scripts/morpheus_namespace_update.py +++ b/scripts/morpheus_namespace_update.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ''' -This script is used to update the imports from "dfp." to "morpheus_dfp." in all the files in the given directory. +This script is used to update imports related to DFP and LLM morpheus modules. Usage: python morpheus_namespace_update.py --directory --dfp python morpheus_namespace_update.py --directory --llm @@ -129,9 +129,9 @@ def replace_imports(directory, dfp, llm): return for root, _, files in os.walk(directory): - for file in files: file_path = os.path.join(root, file) + # Skip this script if os.path.abspath(file_path) == os.path.abspath(__file__): continue if file.endswith(".py"): diff --git a/tests/_utils/kafka.py b/tests/_utils/kafka.py index 21e8dee721..4bf05e2ea8 100644 --- a/tests/_utils/kafka.py +++ b/tests/_utils/kafka.py @@ -42,11 +42,15 @@ @pytest.fixture(name='kafka_topics', scope='function') def kafka_topics_fixture(): - yield KafkaTopics(f'morpheus_unittest_input_{time.time()}', f'morpheus_unittest_output_{time.time()}') + """ + Every test receives a unique pair of Kafka topics + """ + ts = time.time() + yield KafkaTopics(f'morpheus_unittest_input_{ts}', f'morpheus_unittest_output_{ts}') @pytest.fixture(name='kafka_bootstrap_servers', scope="function") -def kafka_bootstrap_servers_fixture(kafka_server: (subprocess.Popen, int)): # pylint: disable=redefined-outer-name +def kafka_bootstrap_servers_fixture(kafka_server: tuple[subprocess.Popen, int]): # pylint: disable=redefined-outer-name """ Used by tests that require both an input and an output topic """ diff --git a/tests/conftest.py b/tests/conftest.py index 3dca6bc243..093eada5c1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -502,14 +502,12 @@ def disable_gc(): gc.enable() -def wait_for_camouflage(host="localhost", port=8000, timeout=30): +def wait_for_server(url: str, timeout: int, parse_fn: typing.Callable[[requests.Response], bool]) -> bool: start_time = time.time() cur_time = start_time end_time = start_time + timeout - url = f"http://{host}:{port}/ping" - while cur_time - start_time <= timeout: timeout_epoch = min(cur_time + 2.0, end_time) @@ -518,13 +516,9 @@ def wait_for_camouflage(host="localhost", port=8000, timeout=30): resp = requests.get(url, timeout=request_timeout) if (resp.status_code == 200): - if (resp.json()['message'] == 'I am alive.'): + if parse_fn(resp): return True - warnings.warn(("Camoflage returned status 200 but had incorrect response JSON. Continuing to wait. " - "Response JSON:\n%s"), - resp.json()) - except Exception: pass @@ -537,6 +531,31 @@ def wait_for_camouflage(host="localhost", port=8000, timeout=30): return False +def wait_for_camouflage(host: str = "localhost", port: int = 8000, timeout: int = 30): + url = f"http://{host}:{port}/ping" + + def parse_fn(resp: requests.Response) -> bool: + if (resp.json()['message'] == 'I am alive.'): + return True + + warnings.warn(("Camoflage returned status 200 but had incorrect response JSON. Continuing to wait. " + "Response JSON:\n%s"), + resp.json()) + return False + + return wait_for_server(url, timeout=timeout, parse_fn=parse_fn) + + +def wait_for_milvus(host: str = "localhost", port: int = 19530, timeout: int = 180): + url = f'http://{host}:{port}/healthz' + + def parse_fn(resp: requests.Response) -> bool: + content = resp.content.decode('utf-8') + return 'OK' in content + + return wait_for_server(url, timeout=timeout, parse_fn=parse_fn) + + def _set_pdeathsig(sig=signal.SIGTERM): """ Helper function to ensure once parent process exits, its child processes will automatically die @@ -954,18 +973,21 @@ def milvus_server_uri(tmp_path_factory): yield uri else: - from milvus import default_server + from milvus import MilvusServer - # Milvus checks for already bound ports but it doesnt seem to work for webservice_port. Use a random one - default_server.webservice_port = _get_random_port() - with default_server: - default_server.set_base_dir(tmp_path_factory.mktemp("milvus_store")) + milvus_server = MilvusServer(wait_for_started=False) - host = default_server.server_address - port = default_server.listen_port + # Milvus checks for already bound ports but it doesnt seem to work for webservice_port. Use a random one + webservice_port = _get_random_port() + milvus_server.webservice_port = webservice_port + milvus_server.set_base_dir(tmp_path_factory.mktemp("milvus_store")) + with milvus_server: + host = milvus_server.server_address + port = milvus_server.listen_port uri = f"http://{host}:{port}" logger.info("Started Milvus at: %s", uri) + wait_for_milvus(host=host, port=webservice_port, timeout=180) yield uri @@ -976,6 +998,13 @@ def milvus_data_fixture(): yield inital_data +@pytest.fixture(scope="session", name="milvus_service") +def milvus_service_fixture(milvus_server_uri: str): + from morpheus_llm.service.vdb.milvus_vector_db_service import MilvusVectorDBService + service = MilvusVectorDBService(uri=milvus_server_uri) + yield service + + @pytest.fixture(scope="session", name="idx_part_collection_config") def idx_part_collection_config_fixture(): from _utils import load_json_file @@ -1006,6 +1035,26 @@ def bert_cased_vocab_fixture(): yield os.path.join(TEST_DIRS.data_dir, 'bert-base-cased-vocab.txt') +@pytest.fixture(name="morpheus_dfp", scope='session') +def morpheus_dfp_fixture(fail_missing: bool): + """ + Fixture to ensure morpheus_dfp is installed + """ + yield import_or_skip("morpheus_dfp", + reason=OPT_DEP_SKIP_REASON.format(package="morpheus_dfp"), + fail_missing=fail_missing) + + +@pytest.fixture(name="morpheus_llm", scope='session') +def morpheus_llm_fixture(fail_missing: bool): + """ + Fixture to ensure morpheus_llm is installed + """ + yield import_or_skip("morpheus_llm", + reason=OPT_DEP_SKIP_REASON.format(package="morpheus_llm"), + fail_missing=fail_missing) + + @pytest.fixture(name="nemollm", scope='session') def nemollm_fixture(fail_missing: bool): """ diff --git a/tests/morpheus/common/test_http_server.py b/tests/morpheus/common/test_http_server.py index 256ab1a5ff..4f8315ae68 100644 --- a/tests/morpheus/common/test_http_server.py +++ b/tests/morpheus/common/test_http_server.py @@ -38,15 +38,14 @@ def make_parse_fn(status: HTTPStatus = HTTPStatus.OK, @pytest.mark.slow @pytest.mark.parametrize("endpoints", [("/t1", "/t2", "/t3"), ("test/", "123/", "a1d/"), ("/a", "/a/b", "/a/b/c/d")]) -@pytest.mark.parametrize("port", [8088, 9090]) -@pytest.mark.parametrize("method", ["GET", "POST", "PUT"]) +@pytest.mark.parametrize("port", [9090]) +@pytest.mark.parametrize("method", ["GET", "POST"]) @pytest.mark.parametrize("use_callback", [True, False]) @pytest.mark.parametrize("use_context_mgr", [True, False]) -@pytest.mark.parametrize("num_threads", [1, 2, min(8, len(os.sched_getaffinity(0)))]) +@pytest.mark.parametrize("num_threads", [1, min(8, len(os.sched_getaffinity(0)))]) @pytest.mark.parametrize("status,content_type,content", [(HTTPStatus.OK, MimeTypes.TEXT.value, "OK"), (HTTPStatus.OK, MimeTypes.JSON.value, '{"test": "OK"}'), - (HTTPStatus.NOT_FOUND, MimeTypes.TEXT.value, "NOT FOUND"), (HTTPStatus.INTERNAL_SERVER_ERROR, MimeTypes.TEXT.value, "Unexpected error")]) def test_simple_request(port: int, endpoints: typing.Tuple[str, str, str], diff --git a/tests/morpheus/stages/test_kafka_source_stage_pipe.py b/tests/morpheus/stages/test_kafka_source_stage_pipe.py index 92d93a6c6a..fd8e38106b 100644 --- a/tests/morpheus/stages/test_kafka_source_stage_pipe.py +++ b/tests/morpheus/stages/test_kafka_source_stage_pipe.py @@ -30,6 +30,7 @@ from morpheus.config import Config from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.general.trigger_stage import TriggerStage +from morpheus.stages.input.kafka_source_stage import AutoOffsetReset from morpheus.stages.input.kafka_source_stage import KafkaSourceStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage from morpheus.stages.postprocess.serialize_stage import SerializeStage @@ -52,7 +53,7 @@ def test_kafka_source_stage_pipe(config: Config, kafka_bootstrap_servers: str, k KafkaSourceStage(config, bootstrap_servers=kafka_bootstrap_servers, input_topic=kafka_topics.input_topic, - auto_offset_reset="earliest", + auto_offset_reset=AutoOffsetReset.EARLIEST, poll_interval="1seconds", client_id='morpheus_kafka_source_stage_pipe', stop_after=num_records)) @@ -85,7 +86,7 @@ def test_multi_topic_kafka_source_stage_pipe(config: Config, kafka_bootstrap_ser KafkaSourceStage(config, bootstrap_servers=kafka_bootstrap_servers, input_topic=input_topics, - auto_offset_reset="earliest", + auto_offset_reset=AutoOffsetReset.EARLIEST, poll_interval="1seconds", client_id='test_multi_topic_kafka_source_stage_pipe', stop_after=num_records)) @@ -100,25 +101,27 @@ def test_multi_topic_kafka_source_stage_pipe(config: Config, kafka_bootstrap_ser @pytest.mark.gpu_and_cpu_mode @pytest.mark.kafka @pytest.mark.parametrize('async_commits', [True, False]) -@pytest.mark.parametrize('num_records', [10, 100, 1000]) -def test_kafka_source_commit(num_records: int, - async_commits: bool, - config: Config, - kafka_bootstrap_servers: str, - kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer") -> None: +@pytest.mark.parametrize('num_records', [10, 1000]) +def test_kafka_source_batch_pipe(num_records: int, + async_commits: bool, + config: Config, + kafka_bootstrap_servers: str, + kafka_topics: KafkaTopics, + kafka_consumer: "KafkaConsumer") -> None: group_id = 'morpheus' data = [{'v': i} for i in range(num_records)] num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data) assert num_written == num_records + expected_length = config.pipeline_batch_size + num_exact = num_records // expected_length + kafka_consumer.subscribe([kafka_topics.input_topic]) seek_to_beginning(kafka_consumer) partitions = kafka_consumer.assignment() - # This method does not advance the consumer, and even if it did, this consumer has a different group_id than the - # source stage + # This method does not advance the consumer, and this consumer has a different group_id than the source stage expected_offsets = kafka_consumer.end_offsets(partitions) pipe = LinearPipeline(config) @@ -126,12 +129,13 @@ def test_kafka_source_commit(num_records: int, KafkaSourceStage(config, bootstrap_servers=kafka_bootstrap_servers, input_topic=kafka_topics.input_topic, - auto_offset_reset="earliest", + auto_offset_reset=AutoOffsetReset.EARLIEST, poll_interval="1seconds", group_id=group_id, client_id='morpheus_kafka_source_commit', stop_after=num_records, async_commits=async_commits)) + pipe.add_stage(DFPLengthChecker(config, expected_length=expected_length, num_exact=num_exact)) pipe.add_stage(TriggerStage(config)) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(SerializeStage(config)) @@ -148,39 +152,6 @@ def test_kafka_source_commit(num_records: int, # The broker may have created additional partitions, offsets should be a superset of expected_offsets for (topic_partition, expected_offset) in expected_offsets.items(): # The value of the offsets dict being returned is a tuple of (offset, metadata), while the value of the - # expected_offsets is just the offset. + # expected_offsets is just the offset. actual_offset = offsets[topic_partition][0] assert actual_offset == expected_offset - - -@pytest.mark.gpu_and_cpu_mode -@pytest.mark.kafka -@pytest.mark.parametrize('num_records', [1000]) -def test_kafka_source_batch_pipe(config: Config, - kafka_bootstrap_servers: str, - kafka_topics: KafkaTopics, - num_records: int) -> None: - data = [{'v': i} for i in range(num_records)] - num_written = write_data_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, data) - assert num_written == num_records - - expected_length = config.pipeline_batch_size - num_exact = num_records // expected_length - - pipe = LinearPipeline(config) - pipe.set_source( - KafkaSourceStage(config, - bootstrap_servers=kafka_bootstrap_servers, - input_topic=kafka_topics.input_topic, - auto_offset_reset="earliest", - poll_interval="1seconds", - client_id='morpheus_kafka_source_stage_pipe', - stop_after=num_records)) - pipe.add_stage(DFPLengthChecker(config, expected_length=expected_length, num_exact=num_exact)) - pipe.add_stage(DeserializeStage(config)) - pipe.add_stage(SerializeStage(config)) - comp_stage = pipe.add_stage( - CompareDataFrameStage(config, pd.DataFrame(data=data), include=[r'^v$'], reset_index=True)) - pipe.run() - - assert_results(comp_stage.get_results()) diff --git a/tests/morpheus_llm/llm/nodes/test_llm_retriever_node_pipe.py b/tests/morpheus_llm/llm/nodes/test_llm_retriever_node_pipe.py index dba43078af..0f3ef1291a 100644 --- a/tests/morpheus_llm/llm/nodes/test_llm_retriever_node_pipe.py +++ b/tests/morpheus_llm/llm/nodes/test_llm_retriever_node_pipe.py @@ -34,12 +34,6 @@ from morpheus_llm.stages.llm.llm_engine_stage import LLMEngineStage -@pytest.fixture(scope="module", name="milvus_service") -def milvus_service_fixture(milvus_server_uri: str): - service = MilvusVectorDBService(uri=milvus_server_uri) - yield service - - def _build_engine(vdb_service, **similarity_search_kwargs) -> LLMEngine: mock_embedding = mock.AsyncMock(return_value=[[1.2, 2.3, 3.4], [4.5, 5.6, 6.7]]) engine = LLMEngine() diff --git a/tests/morpheus_llm/llm/test_rag_standalone_pipe.py b/tests/morpheus_llm/llm/test_rag_standalone_pipe.py index f3d4128ac1..793638dd9d 100644 --- a/tests/morpheus_llm/llm/test_rag_standalone_pipe.py +++ b/tests/morpheus_llm/llm/test_rag_standalone_pipe.py @@ -15,6 +15,7 @@ """Mimic the examples/llm/rag/standalone_pipeline.py example""" import os +import sys import types from unittest import mock @@ -24,11 +25,11 @@ from _utils import TEST_DIRS from _utils import assert_results -from _utils.dataset_manager import DatasetManager from _utils.llm import mk_mock_openai_response from _utils.milvus import populate_milvus from morpheus.config import Config from morpheus.config import PipelineModes +from morpheus.io.deserializers import read_file_to_df from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage @@ -52,6 +53,31 @@ EXPECTED_RESPONSE = "Ransomware, Phishing, Malware, Denial of Service, SQL injection, and Password Attacks" +@pytest.fixture(name="milvus_rss_collection", scope="module") +def fixture_milvus_rss_collection(milvus_server_uri: str): + collection_name = "milvus_rss_collection" + + llm_common_path = os.path.join(TEST_DIRS.examples_dir, 'llm/common') + needs_sys_path_append = llm_common_path not in sys.path + if needs_sys_path_append: + sys.path.append(llm_common_path) + + import utils + + df_path = os.path.join(TEST_DIRS.tests_data_dir, 'service/milvus_rss_data.json') + df = read_file_to_df(df_path, df_type="cudf") + populate_milvus(milvus_server_uri=milvus_server_uri, + collection_name=collection_name, + resource_kwargs=utils.build_default_milvus_config(embedding_size=EMBEDDING_SIZE), + df=df, + overwrite=True) + yield collection_name + + sys.modules.pop('utils', None) + if needs_sys_path_append: + sys.path.remove(llm_common_path) + + def _build_engine(llm_service_name: str, model_name: str, milvus_server_uri: str, @@ -130,23 +156,17 @@ def _run_pipeline(config: Config, @pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'llm/common/utils.py')) def test_rag_standalone_pipe_nemo(config: Config, mock_nemollm: mock.MagicMock, - dataset: DatasetManager, milvus_server_uri: str, repeat_count: int, + milvus_rss_collection: str, import_mod: types.ModuleType): - collection_name = "test_rag_standalone_pipe_nemo" - populate_milvus(milvus_server_uri=milvus_server_uri, - collection_name=collection_name, - resource_kwargs=import_mod.build_default_milvus_config(embedding_size=EMBEDDING_SIZE), - df=dataset["service/milvus_rss_data.json"], - overwrite=True) mock_nemollm.post_process_generate_response.side_effect = [{"text": EXPECTED_RESPONSE} for _ in range(repeat_count)] results = _run_pipeline( config=config, llm_service_name="nemollm", model_name="test_model", milvus_server_uri=milvus_server_uri, - collection_name=collection_name, + collection_name=milvus_rss_collection, repeat_count=repeat_count, utils_mod=import_mod, ) @@ -160,9 +180,9 @@ def test_rag_standalone_pipe_nemo(config: Config, @pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'llm/common/utils.py')) def test_rag_standalone_pipe_openai(config: Config, mock_chat_completion: tuple[mock.MagicMock, mock.MagicMock], - dataset: DatasetManager, milvus_server_uri: str, repeat_count: int, + milvus_rss_collection: str, import_mod: types.ModuleType): os.environ['OPENAI_API_KEY'] = "test" @@ -171,19 +191,12 @@ def test_rag_standalone_pipe_openai(config: Config, mk_mock_openai_response([EXPECTED_RESPONSE]) for _ in range(repeat_count) ] - collection_name = "test_rag_standalone_pipe_openai" - populate_milvus(milvus_server_uri=milvus_server_uri, - collection_name=collection_name, - resource_kwargs=import_mod.build_default_milvus_config(embedding_size=EMBEDDING_SIZE), - df=dataset["service/milvus_rss_data.json"], - overwrite=True) - results = _run_pipeline( config=config, llm_service_name="openai", model_name="test_model", milvus_server_uri=milvus_server_uri, - collection_name=collection_name, + collection_name=milvus_rss_collection, repeat_count=repeat_count, utils_mod=import_mod, ) @@ -199,22 +212,17 @@ def test_rag_standalone_pipe_openai(config: Config, @pytest.mark.parametrize("repeat_count", [5]) @pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'llm/common/utils.py')) def test_rag_standalone_pipe_integration_nemo(config: Config, - dataset: DatasetManager, milvus_server_uri: str, repeat_count: int, + milvus_rss_collection: str, import_mod: types.ModuleType): - collection_name = "test_rag_standalone_pipe__integration_nemo" - populate_milvus(milvus_server_uri=milvus_server_uri, - collection_name=collection_name, - resource_kwargs=import_mod.build_default_milvus_config(embedding_size=EMBEDDING_SIZE), - df=dataset["service/milvus_rss_data.json"], - overwrite=True) + results = _run_pipeline( config=config, llm_service_name="nemollm", model_name="gpt-43b-002", milvus_server_uri=milvus_server_uri, - collection_name=collection_name, + collection_name=milvus_rss_collection, repeat_count=repeat_count, utils_mod=import_mod, ) @@ -231,23 +239,17 @@ def test_rag_standalone_pipe_integration_nemo(config: Config, @pytest.mark.parametrize("repeat_count", [5]) @pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'llm/common/utils.py')) def test_rag_standalone_pipe_integration_openai(config: Config, - dataset: DatasetManager, milvus_server_uri: str, repeat_count: int, + milvus_rss_collection: str, import_mod: types.ModuleType): - collection_name = "test_rag_standalone_pipe_integration_openai" - populate_milvus(milvus_server_uri=milvus_server_uri, - collection_name=collection_name, - resource_kwargs=import_mod.build_default_milvus_config(embedding_size=EMBEDDING_SIZE), - df=dataset["service/milvus_rss_data.json"], - overwrite=True) results = _run_pipeline( config=config, llm_service_name="openai", model_name="gpt-3.5-turbo", milvus_server_uri=milvus_server_uri, - collection_name=collection_name, + collection_name=milvus_rss_collection, repeat_count=repeat_count, utils_mod=import_mod, ) diff --git a/tests/morpheus_llm/services/test_milvus_vector_db_service.py b/tests/morpheus_llm/services/test_milvus_vector_db_service.py index 39a2b2c495..ab3dce8420 100644 --- a/tests/morpheus_llm/services/test_milvus_vector_db_service.py +++ b/tests/morpheus_llm/services/test_milvus_vector_db_service.py @@ -48,14 +48,6 @@ } -@pytest.fixture(scope="module", name="milvus_service") -def milvus_service_fixture(milvus_server_uri: str): - # This fixture is scoped to the function level since the WriteToVectorDBStage will close the connection on' - # pipeline completion - service = MilvusVectorDBService(uri=milvus_server_uri) - yield service - - @pytest.mark.milvus def test_list_store_objects(milvus_service: MilvusVectorDBService): # List all collections in the Milvus server. diff --git a/tests/scripts/conftest.py b/tests/scripts/conftest.py new file mode 100644 index 0000000000..df8aff444a --- /dev/null +++ b/tests/scripts/conftest.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import types + +import pytest + + +@pytest.fixture(name="morpheus_llm", scope='session', autouse=True) +def morpheus_llm_fixture(morpheus_llm: types.ModuleType): + """ + Fixture to ensure morpheus_llm is installed + """ + yield morpheus_llm + + +@pytest.fixture(name="morpheus_dfp", scope='session', autouse=True) +def morpheus_dfp_fixture(morpheus_dfp: types.ModuleType): + """ + Fixture to ensure morpheus_dfp is installed + """ + yield morpheus_dfp diff --git a/tests/scripts/data/dfp_old_namespace_data.py b/tests/scripts/data/dfp_old_namespace_data.py new file mode 100644 index 0000000000..e1a46532d3 --- /dev/null +++ b/tests/scripts/data/dfp_old_namespace_data.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +''' +This script is used as test input for morpheus_namespace_update.py script. +''' + +# Disable all checkers +# flake8: noqa +# isort: skip_file +# yapf: disable +# pylint: skip-file + +# old DFP import patterns +from dfp.utils.config_generator import ConfigGenerator +from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage +from morpheus_dfp.stages.dfp_split_users_stage import DFPSplitUsersStage # no update +import dfp.stages.dfp_training +import dfp.stages.dfp_inference_stage as inference_stage +import dfp + + +def empty_imports_function_scope(): + ''' + Empty imports from morpheus_dfp, llm and vdb. + ''' + from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage + from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage + from dfp.utils.regex_utils import iso_date_regex + from morpheus_dfp.utils.schema_utils import SchemaBuilder # no update + from dfp.modules import dfp_data_prep diff --git a/scripts/data/namespace_update_data.py b/tests/scripts/data/llm_old_namespace_data.py similarity index 67% rename from scripts/data/namespace_update_data.py rename to tests/scripts/data/llm_old_namespace_data.py index 62f609a11d..958ed67b61 100644 --- a/scripts/data/namespace_update_data.py +++ b/tests/scripts/data/llm_old_namespace_data.py @@ -23,19 +23,8 @@ # yapf: disable # pylint: skip-file -# old DFP import patterns -from dfp.utils.config_generator import ConfigGenerator -from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage -from morpheus_dfp.stages.dfp_split_users_stage import DFPSplitUsersStage # no update -import dfp.stages.dfp_training -import dfp.stages.dfp_inference_stage as inference_stage -import dfp - - # old LLM import patterns from morpheus.llm import LLMContext -from morpheus_llm.llm import LLMEngine # no update -from morpheus.stages.llm.llm_engine_stage import LLMEngineStage from morpheus.llm.services.llm_service import LLMService # old vdb import patterns @@ -43,18 +32,17 @@ from morpheus.service import vdb from morpheus.modules.output import write_to_vector_db from morpheus.modules.output.write_to_vector_db import preprocess_vdb_resources -from morpheus_llm.service.vdb import milvus_client # no update import morpheus.service.vdb +# These should be skipped +from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage +from morpheus_llm.service.vdb import milvus_client # no update +from morpheus_llm.llm import LLMEngine # no update + def empty_imports_function_scope(): ''' - Empty imports from dfp, llm and vdb. + Empty imports from llm and vdb. ''' - from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage - from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage - from dfp.utils.regex_utils import iso_date_regex - from morpheus_dfp.utils.schema_utils import SchemaBuilder # no update - from dfp.modules import dfp_data_prep - + from morpheus.stages.llm.llm_engine_stage import LLMEngineStage from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage import morpheus.modules.schemas.write_to_vector_db_schema diff --git a/tests/scripts/test_namespace_update.py b/tests/scripts/test_namespace_update.py new file mode 100644 index 0000000000..71aef0e790 --- /dev/null +++ b/tests/scripts/test_namespace_update.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib.util +import os + +from _utils import TEST_DIRS + + +def copy_data_to_tmp_path(tmp_path) -> str: + ''' + Copy the data to a temporary directory as we will be modifying the files. + ''' + data_dir = os.path.join(TEST_DIRS.tests_dir, "scripts/data") + tmp_data_dir = tmp_path / "scripts" + tmp_data_dir.mkdir() + os.system(f"cp -r {data_dir} {tmp_data_dir}") + scripts_data_dir = os.path.join(tmp_data_dir, "data") + return scripts_data_dir + + +def import_module_from_path(module_name, path) -> tuple: + ''' + Import a module from the pytest tmp_path. + ''' + # Create a module spec from the given path + spec = importlib.util.spec_from_file_location(module_name, path) + if not spec: + return None, None + + # Load the module from the created spec + module = importlib.util.module_from_spec(spec) + if not module: + return None, None + + return spec, module + + +def test_dfp_namespace_update(tmp_path): + ''' + Update the DFP namespace imports and verify the imports work. + ''' + scripts_data_dir = copy_data_to_tmp_path(tmp_path) + module_name = 'dfp_old_namespace_data' + module_path = os.path.join(scripts_data_dir, f'{module_name}.py') + + # check if the file exists in the tmp_path + assert os.path.exists(module_path), f"Failed to copy {module_name} to {scripts_data_dir}" + + # dfp imports expected to fail before namespace update + spec, module = import_module_from_path(module_name, module_path) + assert module is not None, f"Failed to import {module_name} from {module_path}" + try: + spec.loader.exec_module(module) + assert False, "dfp_namespace_data input is not setup with the old imports" + except ModuleNotFoundError: + pass + + # update imports to the new namespace by running morpheus_namespace_update.py + update_namespace_script = os.path.join(TEST_DIRS.morpheus_root, "scripts/morpheus_namespace_update.py") + os.system(f"python {update_namespace_script} --directory {scripts_data_dir} --dfp") + + # verify the morpheus_dfp imports work + spec, module = import_module_from_path(module_name, module_path) + try: + spec.loader.exec_module(module) + except ModuleNotFoundError: + assert False, "old dfp imports are not updated to the new namespace" + + +def test_llm_namespace_update(tmp_path): + ''' + Update the LLM namespace imports and verify the imports work. + ''' + scripts_data_dir = copy_data_to_tmp_path(tmp_path) + module_name = 'llm_old_namespace_data' + module_path = os.path.join(scripts_data_dir, f'{module_name}.py') + + # check if the file exists in the tmp_path + assert os.path.exists(module_path), f"Failed to copy {module_name} to {scripts_data_dir}" + + # llm imports expected to fail before namespace update + spec, module = import_module_from_path(module_name, module_path) + assert module is not None, f"Failed to import {module_name} from {module_path}" + try: + spec.loader.exec_module(module) + assert False, "llm_namespace_data input is not setup with the old imports" + except ModuleNotFoundError: + pass + + # update imports to the new namespace by running morpheus_namespace_update.py + update_namespace_script = os.path.join(TEST_DIRS.morpheus_root, "scripts/morpheus_namespace_update.py") + os.system(f"python {update_namespace_script} --directory {scripts_data_dir} --llm") + + # verify the morpheus_llm imports work + spec, module = import_module_from_path(module_name, module_path) + try: + spec.loader.exec_module(module) + except ModuleNotFoundError: + assert False, "old llm imports are not updated to the new namespace"