diff --git a/ci/release/update-version.sh b/ci/release/update-version.sh index 6651dcdb15..cc7b26d073 100755 --- a/ci/release/update-version.sh +++ b/ci/release/update-version.sh @@ -91,7 +91,6 @@ sed_runner "s/v${CURRENT_FULL_VERSION}-runtime/v${NEXT_FULL_VERSION}-runtime/g" examples/digital_fingerprinting/production/docker-compose.yml \ examples/digital_fingerprinting/production/Dockerfile sed_runner "s/v${CURRENT_FULL_VERSION}-runtime/v${NEXT_FULL_VERSION}-runtime/g" examples/digital_fingerprinting/production/Dockerfile -sed_runner "s|blob/branch-${CURRENT_SHORT_TAG}|blob/branch-${NEXT_SHORT_TAG}|g" examples/digital_fingerprinting/starter/README.md # examples/developer_guide sed_runner 's/'"VERSION ${CURRENT_FULL_VERSION}.*"'/'"VERSION ${NEXT_FULL_VERSION}"'/g' \ diff --git a/docs/source/basics/overview.rst b/docs/source/basics/overview.rst index 4b00a59062..a93ff35fda 100644 --- a/docs/source/basics/overview.rst +++ b/docs/source/basics/overview.rst @@ -27,7 +27,7 @@ The Morpheus CLI is built on the Click Python package which allows for nested co together. At a high level, the CLI is broken up into two main sections: * ``run`` - * For running AE, FIL, NLP or OTHER pipelines. + * For running FIL, NLP or OTHER pipelines. * ``tools`` * Tools/Utilities to help set up, configure and run pipelines and external resources. @@ -58,7 +58,6 @@ run: --help Show this message and exit. Commands: - pipeline-ae Run the inference pipeline with an AutoEncoder model pipeline-fil Run the inference pipeline with a FIL model pipeline-nlp Run the inference pipeline with a NLP model pipeline-other Run a custom inference pipeline without a specific model type @@ -66,8 +65,6 @@ run: Currently, Morpheus pipeline can be operated in four different modes. - * ``pipeline-ae`` - * This pipeline mode is used to run training/inference on the AutoEncoder model. * ``pipeline-fil`` * This pipeline mode is used to run inference on FIL (Forest Inference Library) models such as XGBoost, RandomForestClassifier, etc. * ``pipeline-nlp`` diff --git a/docs/source/cloud_deployment_guide.md b/docs/source/cloud_deployment_guide.md index 060e85a452..fd79c0f05e 100644 --- a/docs/source/cloud_deployment_guide.md +++ b/docs/source/cloud_deployment_guide.md @@ -32,7 +32,6 @@ limitations under the License. - [Verify Model Deployment](#verify-model-deployment) - [Create Kafka Topics](#create-kafka-topics) - [Example Workflows](#example-workflows) - - [Run AutoEncoder Digital Fingerprinting Pipeline](#run-autoencoder-digital-fingerprinting-pipeline) - [Run NLP Phishing Detection Pipeline](#run-nlp-phishing-detection-pipeline) - [Run NLP Sensitive Information Detection Pipeline](#run-nlp-sensitive-information-detection-pipeline) - [Run FIL Anomalous Behavior Profiling Pipeline](#run-fil-anomalous-behavior-profiling-pipeline) @@ -383,10 +382,9 @@ kubectl -n $NAMESPACE exec deploy/broker -c broker -- kafka-topics.sh \ This section describes example workflows to run on Morpheus. Four sample pipelines are provided. -1. AutoEncoder pipeline performing Digital Fingerprinting (DFP). -2. NLP pipeline performing Phishing Detection (PD). -3. NLP pipeline performing Sensitive Information Detection (SID). -4. FIL pipeline performing Anomalous Behavior Profiling (ABP). +1. NLP pipeline performing Phishing Detection (PD). +2. NLP pipeline performing Sensitive Information Detection (SID). +3. FIL pipeline performing Anomalous Behavior Profiling (ABP). Multiple command options are given for each pipeline, with varying data input/output methods, ranging from local files to Kafka Topics. @@ -424,44 +422,6 @@ helm install --set ngc.apiKey="$API_KEY" \ morpheus-sdk-client ``` - -### Run AutoEncoder Digital Fingerprinting Pipeline -The following AutoEncoder pipeline example shows how to train and validate the AutoEncoder model and write the inference results to a specified location. Digital fingerprinting has also been referred to as **HAMMAH (Human as Machine <> Machine as Human)**. -These use cases are currently implemented to detect user behavior changes that indicate a change from a human to a machine or a machine to a human, thus leaving a "digital fingerprint." The model is an ensemble of an autoencoder and fast Fourier transform reconstruction. - -Inference and training based on a user ID (`user123`). The model is trained once and inference is conducted on the supplied input entries in the example pipeline below. The `--train_data_glob` parameter must be removed for continuous training. - -```bash -helm install --set ngc.apiKey="$API_KEY" \ - --set sdk.args="morpheus --log_level=DEBUG run \ - --edge_buffer_size=4 \ - --pipeline_batch_size=1024 \ - --model_max_batch_size=1024 \ - pipeline-ae \ - --columns_file=data/columns_ae_cloudtrail.txt \ - --userid_filter=user123 \ - --feature_scaler=standard \ - --userid_column_name=userIdentitysessionContextsessionIssueruserName \ - --timestamp_column_name=event_dt \ - from-cloudtrail --input_glob=/common/models/datasets/validation-data/dfp-cloudtrail-*-input.csv \ - --max_files=200 \ - train-ae --train_data_glob=/common/models/datasets/training-data/dfp-cloudtrail-*.csv \ - --source_stage_class=morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage \ - --seed 42 \ - preprocess \ - inf-pytorch \ - add-scores \ - timeseries --resolution=1m --zscore_threshold=8.0 --hot_start \ - monitor --description 'Inference Rate' --smoothing=0.001 --unit inf \ - serialize \ - to-file --filename=/common/data//cloudtrail-dfp-detections.csv --overwrite" \ - --namespace $NAMESPACE \ - \ - morpheus-sdk-client -``` - -For more information on the Digital Fingerprint use cases, refer to the starter example and a more production-ready example that can be found in the `examples` source directory. - ### Run NLP Phishing Detection Pipeline The following Phishing Detection pipeline examples use a pre-trained NLP model to analyze emails (body) and determine phishing or benign. Here is the sample data as shown below is used to pass as an input to the pipeline. diff --git a/docs/source/developer_guide/contributing.md b/docs/source/developer_guide/contributing.md index 2b69eb7a53..1441936735 100644 --- a/docs/source/developer_guide/contributing.md +++ b/docs/source/developer_guide/contributing.md @@ -375,7 +375,7 @@ Launching a full production Kafka cluster is outside the scope of this project; ### Pipeline Validation -To verify that all pipelines are working correctly, validation scripts have been added at `${MORPHEUS_ROOT}/scripts/validation`. There are scripts for each of the main workflows: Anomalous Behavior Profiling (ABP), Humans-as-Machines-Machines-as-Humans (HAMMAH), Phishing Detection (Phishing), and Sensitive Information Detection (SID). +To verify that all pipelines are working correctly, validation scripts have been added at `${MORPHEUS_ROOT}/scripts/validation`. There are scripts for each of the main workflows: Anomalous Behavior Profiling (ABP), Phishing Detection (Phishing), and Sensitive Information Detection (SID). To run all of the validation workflow scripts, use the following commands: diff --git a/docs/source/developer_guide/guides/5_digital_fingerprinting.md b/docs/source/developer_guide/guides/5_digital_fingerprinting.md index 4ad65fa6d2..e96f7526bc 100644 --- a/docs/source/developer_guide/guides/5_digital_fingerprinting.md +++ b/docs/source/developer_guide/guides/5_digital_fingerprinting.md @@ -23,7 +23,7 @@ Every account, user, service, and machine has a digital fingerprint that represe To construct this digital fingerprint, we will be training unsupervised behavioral models at various granularities, including a generic model for all users in the organization along with fine-grained models for each user to monitor their behavior. These models are continuously updated and retrained over time​, and alerts are triggered when deviations from normality occur for any user​. ## Training Sources -The data we will want to use for the training and inference will be any sensitive system that the user interacts with, such as VPN, authentication and cloud services. The digital fingerprinting example (`examples/digital_fingerprinting/README.md`) included in Morpheus ingests logs from [AWS CloudTrail](https://docs.aws.amazon.com/cloudtrail/index.html), [Azure Active Directory](https://docs.microsoft.com/en-us/azure/active-directory/reports-monitoring/concept-sign-ins), and [Duo Authentication](https://duo.com/docs/adminapi). +The data we will want to use for the training and inference will be any sensitive system that the user interacts with, such as VPN, authentication and cloud services. The digital fingerprinting example (`examples/digital_fingerprinting/README.md`) included in Morpheus ingests logs from [Azure Active Directory](https://docs.microsoft.com/en-us/azure/active-directory/reports-monitoring/concept-sign-ins), and [Duo Authentication](https://duo.com/docs/adminapi). The location of these logs could be either local to the machine running Morpheus, a shared file system like NFS, or on a remote store such as [Amazon S3](https://aws.amazon.com/s3/). @@ -44,27 +44,13 @@ Adding a new source for the DFP pipeline requires defining five critical pieces: 1. A [`DataFrameInputSchema`](6_digital_fingerprinting_reference.md#dataframe-input-schema-dataframeinputschema) for the [`DFPFileToDataFrameStage`](6_digital_fingerprinting_reference.md#file-to-dataframe-stage-dfpfiletodataframestage) stage. 1. A [`DataFrameInputSchema`](6_digital_fingerprinting_reference.md#dataframe-input-schema-dataframeinputschema) for the [`DFPPreprocessingStage`](6_digital_fingerprinting_reference.md#preprocessing-stage-dfppreprocessingstage). -## DFP Examples -The DFP workflow is provided as two separate examples: a simple, "starter" pipeline for new users and a complex, "production" pipeline for full scale deployments. While these two examples both perform the same general tasks, they do so in very different ways. The following is a breakdown of the differences between the two examples. - -### The "Starter" Example - -This example is designed to simplify the number of stages and components and provide a fully contained workflow in a single pipeline. - -Key Differences: - * A single pipeline which performs both training and inference - * Requires no external services - * Can be run from the Morpheus CLI - -This example is described in more detail in `examples/digital_fingerprinting/starter/README.md`. - -### The "Production" Example +## Production Deployment Example This example is designed to illustrate a full-scale, production-ready, DFP deployment in Morpheus. It contains all of the necessary components (such as a model store), to allow multiple Morpheus pipelines to communicate at a scale that can handle the workload of an entire company. -Key Differences: +Key Features: * Multiple pipelines are specialized to perform either training or inference - * Requires setting up a model store to allow the training and inference pipelines to communicate + * Uses a model store to allow the training and inference pipelines to communicate * Organized into a docker-compose deployment for easy startup * Contains a Jupyter notebook service to ease development and debugging * Can be deployed to Kubernetes using provided Helm charts @@ -72,26 +58,9 @@ Key Differences: This example is described in `examples/digital_fingerprinting/production/README.md` as well as the rest of this document. -### DFP Features +## DFP Features -#### AWS CloudTrail -| Feature | Description | -| ------- | ----------- | -| `userIdentityaccessKeyId` | for example, `ACPOSBUM5JG5BOW7B2TR`, `ABTHWOIIC0L5POZJM2FF`, `AYI2CM8JC3NCFM4VMMB4` | -| `userAgent` | for example, `Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 10.0; Trident/5.1)`, `Mozilla/5.0 (Linux; Android 4.3.1) AppleWebKit/536.1 (KHTML, like Gecko) Chrome/62.0.822.0 Safari/536.1`, `Mozilla/5.0 (Macintosh; U; PPC Mac OS X 10 7_0; rv:1.9.4.20) Gecko/2012-06-10 12:09:43 Firefox/3.8` | -| `userIdentitysessionContextsessionIssueruserName` | for example, `role-g` | -| `sourceIPAddress` | for example, `208.49.113.40`, `123.79.131.26`, `128.170.173.123` | -| `userIdentityaccountId` | for example, `Account-123456789` | -| `errorMessage` | for example, `The input fails to satisfy the constraints specified by an AWS service.`, `The specified subnet cannot be found in the VPN with which the Client VPN endpoint is associated.`, `Your account is currently blocked. Contact aws-verification@amazon.com if you have questions.` | -| `userIdentitytype` | for example, `FederatedUser` | -| `eventName` | for example, `GetSendQuota`, `ListTagsForResource`, `DescribeManagedPrefixLists` | -| `userIdentityprincipalId` | for example, `39c71b3a-ad54-4c28-916b-3da010b92564`, `0baf594e-28c1-46cf-b261-f60b4c4790d1`, `7f8a985f-df3b-4c5c-92c0-e8bffd68abbf` | -| `errorCode` | for example, success, `MissingAction`, `ValidationError` | -| `eventSource` | for example, `lopez-byrd.info`, `robinson.com`, `lin.com` | -| `userIdentityarn` | for example, `arn:aws:4a40df8e-c56a-4e6c-acff-f24eebbc4512`, `arn:aws:573fd2d9-4345-487a-9673-87de888e4e10`, `arn:aws:c8c23266-13bb-4d89-bce9-a6eef8989214` | -| `apiVersion` | for example, `1984-11-26`, `1990-05-27`, `2001-06-09` | - -#### Azure Active Directory +### Azure Active Directory | Feature | Description | | ------- | ----------- | | `appDisplayName` | for example, `Windows sign in`, `MS Teams`, `Office 365`​ | @@ -104,14 +73,14 @@ This example is described in `examples/digital_fingerprinting/production/README. | `location.countryOrRegion` | country or region name​ | | `location.city` | city name | -##### Derived Features +#### Derived Features | Feature | Description | | ------- | ----------- | | `logcount` | tracks the number of logs generated by a user within that day (increments with every log)​ | | `locincrement` | increments every time we observe a new city (`location.city`) in a user's logs within that day​ | | `appincrement` | increments every time we observe a new app (`appDisplayName`) in a user's logs within that day​ | -#### Duo Authentication +### Duo Authentication | Feature | Description | | ------- | ----------- | | `auth_device.name` | phone number​ | @@ -121,7 +90,7 @@ This example is described in `examples/digital_fingerprinting/production/README. | `reason` | reason for the results, for example, `User Cancelled`, `User Approved`, `User Mistake`, `No Response`​ | | `access_device.location.city` | city name | -##### Derived Features +#### Derived Features | Feature | Description | | ------- | ----------- | | `logcount` | tracks the number of logs generated by a user within that day (increments with every log)​ | @@ -133,16 +102,16 @@ DFP in Morpheus is accomplished via two independent pipelines: training and infe ![High Level Architecture](img/dfp_high_level_arch.png) -#### Training Pipeline +### Training Pipeline * Trains user models and uploads to the model store​ * Capable of training individual user models or a fallback generic model for all users​ -#### Inference Pipeline +### Inference Pipeline * Downloads user models from the model store​ * Generates anomaly scores per log​ * Sends detected anomalies to monitoring services -#### Monitoring +### Monitoring * Detected anomalies are published to an S3 bucket, directory or a Kafka topic. * Output can be integrated with a monitoring tool. diff --git a/docs/source/extra_info/known_issues.md b/docs/source/extra_info/known_issues.md index 7feb1dca62..b009da5ce9 100644 --- a/docs/source/extra_info/known_issues.md +++ b/docs/source/extra_info/known_issues.md @@ -17,7 +17,6 @@ limitations under the License. # Known Issues -- TrainAEStage fails with a Segmentation fault ([#1641](https://github.com/nv-morpheus/Morpheus/issues/1641)) - `vdb_upload` example pipeline triggers an internal error in Triton ([#1649](https://github.com/nv-morpheus/Morpheus/issues/1649)) Refer to [open issues in the Morpheus project](https://github.com/nv-morpheus/Morpheus/issues) diff --git a/docs/source/getting_started.md b/docs/source/getting_started.md index 7b0b43217a..55fe26f802 100644 --- a/docs/source/getting_started.md +++ b/docs/source/getting_started.md @@ -375,36 +375,6 @@ Commands: trigger Buffer data until the previous stage has completed. validate Validate pipeline output for testing. ``` - -And for the AE pipeline: - -``` -$ morpheus run pipeline-ae --help -Usage: morpheus run pipeline-ae [OPTIONS] COMMAND1 [ARGS]... [COMMAND2 [ARGS]...]... - - - -Commands: - add-class Add detected classifications to each message. - add-scores Add probability scores to each message. - buffer (Deprecated) Buffer results. - delay (Deprecated) Delay results for a certain duration. - filter Filter message by a classification threshold. - from-azure Source stage is used to load Azure Active Directory messages. - from-cloudtrail Load messages from a CloudTrail directory. - from-duo Source stage is used to load Duo Authentication messages. - inf-pytorch Perform inference with PyTorch. - inf-triton Perform inference with Triton Inference Server. - monitor Display throughput numbers at a specific point in the pipeline. - preprocess Prepare Autoencoder input DataFrames for inference. - serialize Includes & excludes columns from messages. - timeseries Perform time series anomaly detection and add prediction. - to-file Write all messages to a file. - to-kafka Write all messages to a Kafka cluster. - train-ae Train an Autoencoder model on incoming data. - trigger Buffer data until the previous stage has completed. - validate Validate pipeline output for testing. -``` Note: The available commands for different types of pipelines are not the same. This means that the same stage, when used in different pipelines, may have different options. Check the CLI help for the most up-to-date information during development. ## Next Steps diff --git a/docs/source/stages/morpheus_stages.md b/docs/source/stages/morpheus_stages.md index e860beff38..75c6999f32 100644 --- a/docs/source/stages/morpheus_stages.md +++ b/docs/source/stages/morpheus_stages.md @@ -44,19 +44,15 @@ Stages are the building blocks of Morpheus pipelines. Below is a list of the mos ## Inference -- Auto Encoder Inference Stage {py:class}`~morpheus.stages.inference.auto_encoder_inference_stage.AutoEncoderInferenceStage` PyTorch inference stage used for Auto Encoder pipeline mode. - PyTorch Inference Stage {py:class}`~morpheus.stages.inference.pytorch_inference_stage.PyTorchInferenceStage` PyTorch inference stage used for most pipeline modes with the exception of Auto Encoder. - Triton Inference Stage {py:class}`~morpheus.stages.inference.triton_inference_stage.TritonInferenceStage` Inference stage which utilizes a [Triton Inference Server](https://developer.nvidia.com/nvidia-triton-inference-server). ## Input - App Shield Source Stage {py:class}`~morpheus.stages.input.appshield_source_stage.AppShieldSourceStage` Load App Shield messages from one or more plugins into a DataFrame. -- Azure Source Stage {py:class}`~morpheus.stages.input.azure_source_stage.AzureSourceStage` Load Azure Active Directory messages. -- Cloud Trail Source Stage {py:class}`~morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage` Load messages from a CloudTrail directory. - Control Message File Source Stage {py:class}`~morpheus.stages.input.control_message_file_source_stage.ControlMessageFileSourceStage` Receives control messages from different sources specified by a list of (fsspec)[https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files] strings. - Control Message Kafka Source Stage {py:class}`~morpheus.stages.input.control_message_kafka_source_stage.ControlMessageKafkaSourceStage` Load control messages from a Kafka cluster. - Databricks Delta Lake Source Stage {py:class}`~morpheus.stages.input.databricks_deltalake_source_stage.DataBricksDeltaLakeSourceStage` Source stage used to load messages from a DeltaLake table. -- Duo Source Stage {py:class}`~morpheus.stages.input.duo_source_stage.DuoSourceStage` Load Duo Authentication messages. - File Source Stage {py:class}`~morpheus.stages.input.file_source_stage.FileSourceStage` Load messages from a file. - HTTP Client Source Stage {py:class}`~morpheus.stages.input.http_client_source_stage.HttpClientSourceStage` Poll a remote HTTP server for incoming data. - HTTP Server Source Stage {py:class}`~morpheus.stages.input.http_server_source_stage.HttpServerSourceStage` Start an HTTP server and listens for incoming requests on a specified endpoint. @@ -92,7 +88,5 @@ Stages are the building blocks of Morpheus pipelines. Below is a list of the mos - Deserialize Stage {py:class}`~morpheus.stages.preprocess.deserialize_stage.DeserializeStage` Partition messages based on the `pipeline_batch_size` parameter of the pipeline's `morpheus.config.Config` object. - Drop Null Stage {py:class}`~morpheus.stages.preprocess.drop_null_stage.DropNullStage` Drop null data entries from a DataFrame. -- Preprocess AE Stage {py:class}`~morpheus.stages.preprocess.preprocess_ae_stage.PreprocessAEStage` Prepare Autoencoder input DataFrames for inference. - Preprocess FIL Stage {py:class}`~morpheus.stages.preprocess.preprocess_fil_stage.PreprocessFILStage` Prepare FIL input DataFrames for inference. - Preprocess NLP Stage {py:class}`~morpheus.stages.preprocess.preprocess_nlp_stage.PreprocessNLPStage` Prepare NLP input DataFrames for inference. -- Train AE Stage {py:class}`~morpheus.stages.preprocess.train_ae_stage.TrainAEStage` Train an Autoencoder model on incoming data. diff --git a/examples/digital_fingerprinting/README.md b/examples/digital_fingerprinting/README.md deleted file mode 100644 index d9296eb5f6..0000000000 --- a/examples/digital_fingerprinting/README.md +++ /dev/null @@ -1,47 +0,0 @@ - - -# Digital Fingerprinting (DFP) in Morpheus - -## Organization - -The DFP example workflows in Morpheus are designed to scale up to company wide workloads and handle several different log types which resulted in a large number of moving parts to handle the various services and configuration options. To simplify things, the DFP workflow is provided as two separate examples: a simple, "starter" pipeline for new users and a complex, "production" pipeline for full scale deployments. While these two examples both perform the same general tasks, they do so in very different ways. The following is a breakdown of the differences between the two examples. - -### The "Starter" Example - -This example is designed to simplify the number of stages and components and provided a fully contained workflow in a single pipeline. - -Key Differences: - * A single pipeline which performs both training and inference - * Requires no external services - * Can be run from the Morpheus CLI - - -### The "Production" Example - -This example is designed to illustrate a full-scale, production-ready, DFP deployment in Morpheus. It contains all of the necessary components (such as a model store), to allow multiple Morpheus pipelines to communicate at a scale that can handle the workload of an entire company. - -Key Differences: - * Multiple pipelines are specialized to perform either training or inference - * Requires setting up a model store to allow the training and inference pipelines to communicate - * Organized into a `docker compose` deployment for easy startup - * Contains a Jupyter notebook service to ease development and debugging - * Can be deployed to Kubernetes using provided Helm charts - * Uses many customized stages to maximize performance. - -## Getting Started - -Guides for each of the two examples can be found in their respective directories: [The Starter Example](./starter/README.md) and [The Production Example](./production/README.md) diff --git a/examples/digital_fingerprinting/production/README.md b/examples/digital_fingerprinting/production/README.md index 289634790e..69668db317 100644 --- a/examples/digital_fingerprinting/production/README.md +++ b/examples/digital_fingerprinting/production/README.md @@ -19,9 +19,9 @@ limitations under the License. This example is designed to illustrate a full-scale, production-ready, DFP deployment in Morpheus. It contains all of the necessary components (such as a model store), to allow multiple Morpheus pipelines to communicate at a scale that can handle the workload of an entire company. -Key Differences: +Key Features: * Multiple pipelines are specialized to perform either training or inference - * Requires setting up a model store to allow the training and inference pipelines to communicate + * Uses a model store to allow the training and inference pipelines to communicate * Organized into a `docker compose` deployment for easy startup * Contains a Jupyter notebook service to ease development and debugging * Can be deployed to Kubernetes using provided Helm charts diff --git a/examples/digital_fingerprinting/starter/README.md b/examples/digital_fingerprinting/starter/README.md deleted file mode 100644 index 30e96f3011..0000000000 --- a/examples/digital_fingerprinting/starter/README.md +++ /dev/null @@ -1,295 +0,0 @@ - - -> **Warning**: This example is currently broken and fails with a Segmentation fault [#1641](https://github.com/nv-morpheus/Morpheus/issues/1641) - -# "Starter" Digital Fingerprinting Pipeline - -We show here how to set up and run the DFP pipeline for three log types: CloudTrail, Duo, and Azure. Each of these log types uses a built-in source stage that handles that specific data format. New source stages can be added to allow the DFP pipeline to process different log types. All stages after the source stages are identical across all log types but can be configured differently via pipeline or stage configuration options. - -## Environment Setup - -Follow the instructions [here](../../../docs/source/developer_guide/contributing.md) to set up your development environment in either a Docker container or Conda environment. - -## Morpheus CLI - -DFP pipelines can be constructed and run using the Morpheus CLI command `morpheus run pipeline-ae ...` - -Use `--help` to display information about the autoencoder pipeline command line options: - -``` -Usage: morpheus run pipeline-ae [OPTIONS] COMMAND1 [ARGS]... [COMMAND2 [ARGS]...]... - - Configure and run the pipeline. To configure the pipeline, list the stages in the order that data should flow. The output of each stage will become the input for the next - stage. For example, to read, classify and write to a file, the following stages could be used - - pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model --server_url=localhost:8001 filter --threshold=0.5 to-file - --filename=classifications.json - - Pipelines must follow a few rules: 1. Data must originate in a source stage. Current options are `from-file` or `from-kafka` 2. A `deserialize` stage must be placed - between the source stages and the rest of the pipeline 3. Only one inference stage can be used. Zero is also fine 4. The following stages must come after an inference - stage: `add-class`, `filter`, `gen-viz` - -Options: - --columns_file DATA FILE Specifies a file to read column features. [required] - --labels_file DATA FILE Specifies a file to read labels from in order to convert class IDs into labels. A label file is a simple text file where each line - corresponds to a label. - --userid_column_name TEXT Which column to use as the User ID. [default: userIdentityaccountId; required] - --userid_filter TEXT Specifying this value will filter all incoming data to only use rows with matching User IDs. Which column is used for the User ID is - specified by `userid_column_name` - --feature_scaler [NONE|STANDARD|GAUSSRANK] - Autoencoder feature scaler [default: STANDARD] - --use_generic_model Whether to use a generic model when user does not have minimum number of training rows - --viz_file FILE Save a visualization of the pipeline at the specified location - --viz_direction [BT|LR|RL|TB] Set the direction for the Graphviz pipeline diagram, ignored unless --viz_file is also specified. [default: LR] - --timestamp_column_name TEXT Which column to use as the timestamp. [default: timestamp; required] - --help Show this message and exit. - -Commands: - add-class Add detected classifications to each message. - add-scores Add probability scores to each message. - buffer (Deprecated) Buffer results. - delay (Deprecated) Delay results for a certain duration. - filter Filter message by a classification threshold. - from-arxiv Source stage that downloads PDFs from arxiv and converts them to dataframes. - from-azure Source stage is used to load Azure Active Directory messages. - from-cloudtrail Load messages from a CloudTrail directory. - from-databricks-deltalake Source stage used to load messages from a DeltaLake table. - from-duo Source stage is used to load Duo Authentication messages. - from-http Source stage that starts an HTTP server and listens for incoming requests on a specified endpoint. - from-http-client Source stage that polls a remote HTTP server for incoming data. - from-rss Load RSS feed items into a DataFrame. - inf-pytorch Perform inference with PyTorch. - monitor Display throughput numbers at a specific point in the pipeline. - preprocess Prepare Autoencoder input DataFrames for inference. - serialize Includes & excludes columns from messages. - timeseries Perform time series anomaly detection and add prediction. - to-elasticsearch This class writes the messages as documents to Elasticsearch. - to-file Write all messages to a file. - to-http Write all messages to an HTTP endpoint. - to-http-server Sink stage that starts an HTTP server and listens for incoming requests on a specified endpoint. - to-kafka Write all messages to a Kafka cluster. - train-ae Train an Autoencoder model on incoming data. - trigger Buffer data until the previous stage has completed. - validate Validate pipeline output for testing. -``` -The commands above correspond to the Morpheus stages that can be used to construct your DFP pipeline. Options are available to configure pipeline and stages. -The following table shows mapping between the main Morpheus CLI commands and underlying Morpheus Python stage classes: - -| CLI Command | Stage Class | Python File | -| ------------------| ----------------------------| ----------------------------------------------------------- -| `from-azure` | `AzureSourceStage` | `morpheus/stages/input/azure_source_stage.py` -| `from-cloudtrail` | `CloudTrailSourceStage` | `morpheus/stages/input/clout_trail_source_stage.py` -| `from-duo` | `DuoSourceStage` | `morpheus/stages/input/duo_source_stage.py` -| `train-ae` | `TrainAEStage` | `morpheus/stages/preprocess/train_ae_stage.py` -| `preprocess` | `PreprocessAEStage` | `morpheus/stages/preprocess/preprocess_ae_stage.py` -| `inf-pytorch` | `AutoEncoderInferenceStage` | `morpheus/stages/inference/auto_encoder_inference_stage.py` -| `add-scores` | `AddScoresStage` | `morpheus/stages/postprocess/add_scores_stage.py` -| `serialize` | `SerializeStage` | `morpheus/stages/postprocess/serialize_stage.py` -| `to-file ` | `WriteToFileStage` | `morpheus/stages/output/write_to_file_stage.py` - - -## Morpheus DFP Stages - -**Source stages** - These include `AzureSourceStage`, `CloudTrailSourceStage` and `DuoSourceStage`. They are responsible for reading log files that match provided `--input_glob` (for example `/duo_logs/*.json`). Data is grouped by user so that each batch processed by the pipeline will only contain rows corresponding to a single user. Feature engineering also happens in this stage. All DFP source stages must extend `AutoencoderSourceStage` and implement the `files_to_dfs_per_user` abstract method. Feature columns can be managed by overriding the `derive_features` method. Otherwise, all columns from input data pass through to next stage. - -**Preprocessing stages** - -`TrainAEStage` can either train user models using data matching a provided `--train_data_glob` or load pre-trained models from file using `--pretrained_filename`. When using `--train_data_glob`, user models can be saved using the `--models_output_filename` option. The `--source_stage_class` must also be used with `--train_data_glob` so that the training stage knows how to read the training data. The autoencoder implementation used for user model training can be found [here](https://github.com/nv-morpheus/dfencoder). The following are the available CLI options for the `TrainAEStage` (train-ae): - -| Option | Description -| -------------------------| --------------------------------------------------------- -| `pretrained_filename` | File path to pickled user models saved from previous training run using `--models_output_filename`. -| `train_data_glob` | Glob path to training data. -| `source_stage_class` | Source stage so that training stage knows how to read/parse training data. -| `train_epochs` | Number of training epochs. Default is 25. -| `min_train_rows` | Minimum number of training rows required to train user model. Default is 300. -| `train_max_history` | Maximum number of training rows per user. Default is 1000. -| `seed` | When not None, ensure random number generators are seeded with `seed` to control reproducibility of user model. -| `sort_glob` | If true the list of files matching `input_glob` will be processed in sorted order. Default is False. -| `models_output_filename` | Can be used with `--train_data_glob` to save trained user models to file using provided file path. Models can be loaded later using `--pretrained_filename`. - -The `PreprocessAEStage` is responsible for creating a Morpheus message that contains everything needed by the inference stage. For DFP inference, this stage must pass a `ControlMessage` to the inference stage. Each message will correspond to a single user and include the input feature columns, the user's model and training data anomaly scores. - -**Inference stage** - `AutoEncoderInferenceStage` calculates anomaly scores (specifically, reconstruction loss) and z-scores for each user input dataset. - -**Post-processing stage** - The DFP pipeline uses the `AddScoresStage` for post-processing to add anomaly scores and z-scores from previous inference stage with matching labels. - -**Serialize stage** - `SerializeStage` is used to convert `ControlMessage` from previous stage to a `MessageMeta` to make it suitable for output (for example writing to file or Kafka). - -**Write stage** - `WriteToFileStage` writes input data with inference results to an output file path. - -## Download DFP Example Data from S3 - -``` -pip install s3fs -``` - -``` -./examples/digital_fingerprinting/fetch_example_data.py all -``` - -Azure training data will be saved to `examples/data/dfp/azure-training-data`, inference data to `examples/data/dfp/azure-inference-data`. -Duo training data will be saved to `examples/data/dfp/duo-training-data`, inference data to `examples/data/dfp/duo-inference-data`. - -## CloudTrail DFP Pipeline - -Run the following in your Morpheus container to start the CloudTrail DFP pipeline: - -``` -morpheus --log_level=DEBUG \ - run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ - pipeline-ae \ - --columns_file=data/columns_ae_cloudtrail.txt \ - --userid_column_name=userIdentitysessionContextsessionIssueruserName \ - --userid_filter=user123 \ - --feature_scaler=standard \ - from-cloudtrail \ - --input_glob=models/datasets/validation-data/dfp-cloudtrail-*-input.csv \ - --max_files=200 \ - train-ae \ - --train_data_glob=models/datasets/training-data/dfp-cloudtrail-*.csv \ - --source_stage_class=morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage \ - --seed=42 \ - preprocess \ - inf-pytorch \ - add-scores \ - serialize \ - to-file --filename=./cloudtrail-dfp-detections.csv --overwrite -``` - -## Duo DFP Pipeline - -The following pipeline trains user models from downloaded training data and saves user models to file. Pipeline then uses these models to run inference -on downloaded inference data. Inference results are written to `duo-detections.csv`. -``` -morpheus --log_level=DEBUG \ - run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ - pipeline-ae \ - --columns_file=data/columns_ae_duo.txt \ - --userid_column_name=username \ - --feature_scaler=standard \ - from-duo \ - --input_glob=examples/data/dfp/duo-inference-data/*.json \ - --max_files=200 \ - monitor --description='Input rate' \ - train-ae \ - --train_data_glob=examples/data/dfp/duo-training-data/*.json \ - --source_stage_class=morpheus.stages.input.duo_source_stage.DuoSourceStage \ - --seed=42 \ - --models_output_filename=models/dfp-models/duo_ae_user_models.pkl \ - preprocess \ - inf-pytorch \ - monitor --description='Inference rate' --unit inf \ - add-scores \ - serialize \ - to-file --filename=./duo-detections.csv --overwrite -``` - -The following example shows how we can load pre-trained user models from the file (`models/dfp-models/duo_ae_user_models.pkl`) we created in the previous example. Pipeline then uses these models to run inference on validation data in `models/datasets/validation-data/duo`. Inference results are written to `duo-detections.csv`. -``` -morpheus --log_level=DEBUG \ - run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ - pipeline-ae \ - --columns_file=data/columns_ae_duo.txt \ - --userid_column_name=username \ - --feature_scaler=standard \ - from-duo \ - --input_glob=examples/data/dfp/duo-inference-data/*.json \ - --max_files=200 \ - monitor --description='Input rate' \ - train-ae \ - --pretrained_filename=models/dfp-models/duo_ae_user_models.pkl \ - preprocess \ - inf-pytorch \ - monitor --description='Inference rate' --unit inf \ - add-scores \ - serialize \ - to-file --filename=./duo-detections.csv --overwrite -``` - -## Azure DFP Pipeline - -The following pipeline trains user models from downloaded training data and saves user models to file. Pipeline then uses these models to run inference -on downloaded inference data. Inference results are written to `azure-detections.csv`. -``` -morpheus --log_level=DEBUG \ - run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=False \ - pipeline-ae \ - --columns_file=morpheus/data/columns_ae_azure.txt \ - --userid_column_name=userPrincipalName \ - --feature_scaler=standard \ - from-azure \ - --input_glob=examples/data/dfp/azure-inference-data/*.json \ - --max_files=200 \ - train-ae \ - --train_data_glob=examples/data/dfp/azure-training-data/*.json \ - --source_stage_class=morpheus.stages.input.azure_source_stage.AzureSourceStage \ - --seed=42 \ - --models_output_filename=models/dfp-models/azure_ae_user_models.pkl \ - preprocess \ - inf-pytorch \ - monitor --description='Inference rate' --unit inf \ - add-scores \ - serialize \ - to-file --filename=./azure-detections.csv --overwrite -``` - -The following example shows how we can load pre-trained user models from the file (`models/dfp-models/azure_ae_user_models.pkl`) we created in the previous example. Pipeline then uses these models to run inference on validation data in `models/datasets/validation-data/azure`. Inference results are written to `azure-detections.csv`. -``` -morpheus --log_level=DEBUG \ - run --num_threads=1 --pipeline_batch_size=1024 --model_max_batch_size=1024 \ - pipeline-ae \ - --columns_file=data/columns_ae_azure.txt \ - --userid_column_name=userPrincipalName \ - --feature_scaler=standard \ - from-azure \ - --input_glob=examples/data/dfp/azure-inference-data/*.json \ - --max_files=200 \ - train-ae \ - --pretrained_filename=models/dfp-models/azure_ae_user_models.pkl \ - preprocess \ - inf-pytorch \ - monitor --description='Inference rate' --unit inf \ - add-scores \ - serialize \ - to-file --filename=./azure-detections.csv --overwrite -``` - - -## Using Morpheus Python API - -The DFP pipelines can also be constructed and run via the Morpheus Python API. An [example](./run_cloudtrail_dfp.py) is included for the CloudTrail DFP pipeline. The following are some commands to -run the example. - -Train user models from files in `models/datasets/training-data/dfp-cloudtrail-*.csv` and saves user models to file. Pipeline then uses these models to run inference on CloudTrail validation data in `models/datasets/validation-data/dfp-cloudtrail-*-input.csv`. Inference results are written to `cloudtrail-dfp-results.csv`. -``` -python ./examples/digital_fingerprinting/starter/run_cloudtrail_dfp.py \ - --columns_file=data/columns_ae_cloudtrail.txt \ - --input_glob=models/datasets/validation-data/dfp-cloudtrail-*-input.csv \ - --train_data_glob=models/datasets/training-data/dfp-*.csv \ - --models_output_filename=models/dfp-models/cloudtrail_ae_user_models.pkl \ - --output_file ./cloudtrail-dfp-results.csv -``` - -Here we load pre-trained user models from the file (`models/dfp-models/cloudtrail_ae_user_models.pkl`) we created in the previous example. Pipeline then uses these models to run inference on validation data in `models/datasets/validation-data/dfp-cloudtrail-*-input.csv`. Inference results are written to `cloudtrail-dfp-results.csv`. -``` -python ./examples/digital_fingerprinting/starter/run_cloudtrail_dfp.py \ - --columns_file=data/columns_ae_cloudtrail.txt \ - --input_glob=models/datasets/validation-data/dfp-cloudtrail-*-input.csv \ - --pretrained_filename=models/dfp-models/cloudtrail_ae_user_models.pkl \ - --output_file=./cloudtrail-dfp-results.csv -``` diff --git a/examples/digital_fingerprinting/starter/run_cloudtrail_dfp.py b/examples/digital_fingerprinting/starter/run_cloudtrail_dfp.py deleted file mode 100644 index ef4f00b7dc..0000000000 --- a/examples/digital_fingerprinting/starter/run_cloudtrail_dfp.py +++ /dev/null @@ -1,156 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Executes a pipeline that trains an autoencoder and then uses it to detect anomalies in the same data.""" - -import logging -import os - -import click - -from morpheus.cli.utils import MorpheusRelativePath -from morpheus.common import TypeId -from morpheus.config import AEFeatureScalar -from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder -from morpheus.config import PipelineModes -from morpheus.pipeline import LinearPipeline -from morpheus.stages.general.monitor_stage import MonitorStage -from morpheus.stages.inference.auto_encoder_inference_stage import AutoEncoderInferenceStage -from morpheus.stages.input.cloud_trail_source_stage import CloudTrailSourceStage -from morpheus.stages.output.write_to_file_stage import WriteToFileStage -from morpheus.stages.postprocess.add_scores_stage import AddScoresStage -from morpheus.stages.postprocess.serialize_stage import SerializeStage -from morpheus.stages.preprocess.preprocess_ae_stage import PreprocessAEStage -from morpheus.stages.preprocess.train_ae_stage import TrainAEStage -from morpheus.utils.file_utils import load_labels_file -from morpheus.utils.logger import configure_logging - - -@click.command() -@click.option( - "--num_threads", - default=len(os.sched_getaffinity(0)), - type=click.IntRange(min=1), - help="Number of internal pipeline threads to use", -) -@click.option( - "--pipeline_batch_size", - default=1024, - type=click.IntRange(min=1), - help=("Internal batch size for the pipeline. Can be much larger than the model batch size. " - "Also used for Kafka consumers"), -) -@click.option( - "--model_max_batch_size", - default=1024, - type=click.IntRange(min=1), - help="Max batch size to use for the model", -) -@click.option( - "--columns_file", - type=MorpheusRelativePath(exists=True, readable=True), - required=True, - help="Feature columns file", -) -@click.option( - "--input_glob", - type=str, - required=True, - help="Inference input glob", -) -@click.option( - "--train_data_glob", - type=str, - required=False, - help="Train data glob", -) -@click.option( - "--pretrained_filename", - type=click.Path(exists=True, readable=True), - required=False, - help="File with pre-trained user models", -) -@click.option( - "--models_output_filename", - help="The path to the file where the inference output will be saved.", -) -@click.option( - "--output_file", - default="./cloudtrail-detections.csv", - help="The path to the file where the inference output will be saved.", -) -def run_pipeline(num_threads, - pipeline_batch_size, - model_max_batch_size, - columns_file, - input_glob, - train_data_glob, - pretrained_filename, - models_output_filename, - output_file): - """Configure and run the pipeline.""" - configure_logging(log_level=logging.DEBUG) - - config = Config() - config.mode = PipelineModes.AE - config.ae = ConfigAutoEncoder() - config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" - config.ae.feature_scaler = AEFeatureScalar.STANDARD - config.ae.feature_columns = load_labels_file(columns_file) - config.num_threads = num_threads - config.pipeline_batch_size = pipeline_batch_size - config.model_max_batch_size = model_max_batch_size - config.class_labels = ["reconstruct_loss", "zscore"] - - # Create a pipeline object - pipeline = LinearPipeline(config) - - # Add a source stage - pipeline.set_source(CloudTrailSourceStage(config, input_glob=input_glob)) - - # Add a training stage - pipeline.add_stage( - TrainAEStage(config, - pretrained_filename=pretrained_filename, - train_data_glob=train_data_glob, - source_stage_class="morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", - models_output_filename=models_output_filename, - seed=42, - sort_glob=True)) - - # Add a preprocessing stage - pipeline.add_stage(PreprocessAEStage(config)) - - # Add a inference stage - pipeline.add_stage(AutoEncoderInferenceStage(config)) - - # Add anomaly scores and z-scores to each message - pipeline.add_stage(AddScoresStage(config, probs_type=TypeId.FLOAT64)) - - # Add serialize stage - pipeline.add_stage(SerializeStage(config)) - - # Add a write file stage - pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) - - pipeline.add_stage(MonitorStage(config, description="Postprocessing rate")) - - # Run the pipeline - pipeline.run() - - -if __name__ == "__main__": - # The click decordators add all of the needed arguments to the `run_pipeline` function but pylint doesn't know that - # pylint: disable=no-value-for-parameter - run_pipeline() diff --git a/morpheus.code-workspace b/morpheus.code-workspace index 092d5d6d20..0b3edb2aee 100644 --- a/morpheus.code-workspace +++ b/morpheus.code-workspace @@ -140,68 +140,6 @@ "subProcess": true, "type": "debugpy" }, - { - "args": [ - "--log_level=DEBUG", - "run", - "--num_threads=1", - "--pipeline_batch_size=1024", - "--model_max_batch_size=1024", - "--use_cpp=False", - "pipeline-ae", - "--columns_file=python/morpheus/morpheus/data/columns_ae_cloudtrail.txt", - "--userid_column_name=userIdentitysessionContextsessionIssueruserName", - "--userid_filter=user123", - "--timestamp_column_name=event_dt", - "from-cloudtrail", - "--input_glob=models/datasets/validation-data/dfp-cloudtrail-*-input.csv", - "--max_files=200", - "train-ae", - "--train_data_glob=models/datasets/training-data/dfp-*.csv", - "--source_stage_class=morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", - "--seed=42", - "preprocess", - "inf-pytorch", - "add-scores", - "timeseries", - "--resolution=1m", - "--zscore_threshold=8.0", - "--hot_start", - "monitor", - "--description", - "Inference Rate", - "--unit", - "inf", - "validate", - "--val_file_name=models/datasets/validation-data/dfp-cloudtrail-user123-validation-data-output.csv", - "--results_file_name=./.tmp/validation_results-ae.json", - "--index_col=_index_", - "--exclude=event_dt", - "--rel_tol=0.1", - "--overwrite", - "serialize", - // "--include", - // "timestamp", - // "--exclude", - // "^_ts_", - // "--exclude", - // "^nvidia_smi_log", - // "to-kafka", - // "--output_topic", - // "inference_output", - "to-file", - "--filename=./.tmp/detections.csv", - "--overwrite" - ], - "console": "integratedTerminal", - "cwd": "${workspaceFolder}", - "justMyCode": false, - "name": "Python: Run Pipeline (AE)", - "program": "${workspaceFolder}/python/morpheus/morpheus/cli/run.py", - "request": "launch", - "subProcess": true, - "type": "debugpy" - }, { "args": [ "--log_level=DEBUG", @@ -505,97 +443,6 @@ }, "type": "cppdbg" }, - { - "MIMode": "gdb", - "args": [ - "./python/morpheus/morpheus/cli.py", - "--log_level=DEBUG", - "run", - "--num_threads=1", - "--pipeline_batch_size=128", - "--model_max_batch_size=128", - "--use_cpp=False", - "pipeline-ae", - // "--ae_path=models/hammah-models/hammah-role-g-20211017.pkl", - "--ae_path=../data/ae_model.pkl", - "from-cloudtrail", - "--input_glob=models/datasets/validation-data/dfp-cloudtrail-role-g-validation-data.csv", - // "--input_glob=./data/red_team.csv", - "--max_files=200", - "--iterative", - "deserialize", - "preprocess", - "inf-triton", - "--model_name=autoencoder-onnx", - "--server_url=localhost:8001", - "timeseries", - "--resolution=10m", - "monitor", - "--description", - "Inference Rate", - "--smoothing=0.001", - "--unit", - "inf" - // "add-class", - // "filter", - // "serialize", - // "--include", - // "timestamp", - // "--exclude", - // "^_ts_", - // "--exclude", - // "^nvidia_smi_log", - // "to-kafka", - // "--output_topic", - // "inference_output", - // "to-file", - // "--filename=./.tmp/detections.json", - // "--overwrite", - ], - "cwd": "${workspaceFolder}", - "environment": [ - { - "name": "MORPHEUS_ROOT", - "value": "${workspaceFolder}" - }, - { - "name": "GLOG_v", - "value": "10" - }, - { - "name": "CUDA_LAUNCH_BLOCKING", - "value": "1" - } - ], - "externalConsole": false, - "miDebuggerPath": "gdb", - "name": "Debug MRC from Python (Morpheus-AE)", - "program": "python", - "request": "launch", - "setupCommands": [ - { - "description": "Enable pretty-printing for gdb", - "ignoreFailures": true, - "text": "-enable-pretty-printing" - }, - { - "description": "Skip stdio-common files", - "text": "-interpreter-exec console \"skip -gfi **/bits/*.h\"" - } - ], - "sourceFileMap": { - "${workspaceFolder}": { - "editorPath": "${workspaceFolder}", - "useForBreakpoints": "true" - } - }, - "stopAtEntry": false, - "symbolLoadInfo": { - "exceptionList": "libmrc*.so;cudf_helpers.*;executor.*;morpheus.*;node.*;options.*;pipeline.*;segment.*;subscriber.*;stages.*;messages.*;common*.so", - "loadAll": false - }, - "type": "cppdbg" - }, { "MIMode": "gdb", "args": [ diff --git a/python/morpheus/morpheus/cli/commands.py b/python/morpheus/morpheus/cli/commands.py index 78e5834e16..ddc61af5bf 100644 --- a/python/morpheus/morpheus/cli/commands.py +++ b/python/morpheus/morpheus/cli/commands.py @@ -25,15 +25,11 @@ from morpheus.cli.stage_registry import LazyStageInfo from morpheus.cli.utils import MorpheusRelativePath from morpheus.cli.utils import get_config_from_ctx -from morpheus.cli.utils import get_enum_keys from morpheus.cli.utils import get_log_levels from morpheus.cli.utils import get_pipeline_from_ctx -from morpheus.cli.utils import parse_enum from morpheus.cli.utils import parse_log_level from morpheus.cli.utils import prepare_command -from morpheus.config import AEFeatureScalar from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder from morpheus.config import ConfigFIL from morpheus.config import ConfigOnnxToTRT from morpheus.config import CppConfig @@ -473,103 +469,6 @@ def pipeline_fil(ctx: click.Context, **kwargs): return p -@click.group(chain=True, - short_help="Run the inference pipeline with an AutoEncoder model", - no_args_is_help=True, - cls=PluginGroup, - pipeline_mode=PipelineModes.AE) -@click.option('--columns_file', - required=True, - default=None, - type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True), - help=("Specifies a file to read column features.")) -@click.option('--labels_file', - default=None, - type=MorpheusRelativePath(dir_okay=False, exists=True, file_okay=True, resolve_path=True), - help=("Specifies a file to read labels from in order to convert class IDs into labels. " - "A label file is a simple text file where each line corresponds to a label. ")) -@click.option('--userid_column_name', - type=str, - default="userIdentityaccountId", - required=True, - help=("Which column to use as the User ID.")) -@click.option('--userid_filter', - type=str, - default=None, - help=("Specifying this value will filter all incoming data to only use rows with matching User IDs. " - "Which column is used for the User ID is specified by `userid_column_name`")) -@click.option('--feature_scaler', - type=click.Choice(get_enum_keys(AEFeatureScalar), case_sensitive=False), - default=AEFeatureScalar.STANDARD.name, - callback=functools.partial(parse_enum, enum_class=AEFeatureScalar, case_sensitive=False), - help=("Autoencoder feature scaler")) -@click.option('--use_generic_model', - is_flag=True, - type=bool, - help=("Whether to use a generic model when user does not have minimum number of training rows")) -@click.option('--viz_file', - default=None, - type=click.Path(dir_okay=False, writable=True), - help="Save a visualization of the pipeline at the specified location") -@click.option('--viz_direction', - default="LR", - type=click.Choice(RANKDIR_CHOICES, case_sensitive=False), - help=("Set the direction for the Graphviz pipeline diagram, " - "ignored unless --viz_file is also specified.")) -@click.option('--timestamp_column_name', - type=str, - default="timestamp", - required=True, - help=("Which column to use as the timestamp.")) -@prepare_command() -def pipeline_ae(ctx: click.Context, **kwargs): - """ - Configure and run the pipeline. To configure the pipeline, list the stages in the order that data should flow. The - output of each stage will become the input for the next stage. For example, to read, classify and write to a file, - the following stages could be used - - pipeline from-file --filename=my_dataset.json deserialize preprocess inf-triton --model_name=my_model - --server_url=localhost:8001 filter --threshold=0.5 to-file --filename=classifications.json - - Pipelines must follow a few rules: - 1. Data must originate in a source stage. Current options are `from-file` or `from-kafka` - 2. A `deserialize` stage must be placed between the source stages and the rest of the pipeline - 3. Only one inference stage can be used. Zero is also fine - 4. The following stages must come after an inference stage: `add-class`, `filter`, `gen-viz` - - """ - - click.secho("Configuring Pipeline via CLI", fg="green") - - config = get_config_from_ctx(ctx) - config.mode = PipelineModes.AE - config.ae = ConfigAutoEncoder() - config.ae.userid_column_name = kwargs["userid_column_name"] - config.ae.timestamp_column_name = kwargs["timestamp_column_name"] - config.ae.feature_scaler = kwargs["feature_scaler"] - config.ae.use_generic_model = kwargs["use_generic_model"] - config.ae.feature_columns = load_labels_file(kwargs["columns_file"]) - logger.debug("Loaded columns. Current columns: [%s]", str(config.ae.feature_columns)) - - if ("labels_file" in kwargs and kwargs["labels_file"] is not None): - config.class_labels = load_labels_file(kwargs["labels_file"]) - logger.debug("Loaded labels file. Current labels: [%s]", str(config.class_labels)) - else: - # Use default labels - config.class_labels = ["reconstruct_loss", "zscore"] - - if ("userid_filter" in kwargs): - config.ae.userid_filter = kwargs["userid_filter"] - - logger.info("Filtering all users except ID: '%s'", str(config.ae.userid_filter)) - - from morpheus.pipeline import LinearPipeline - - p = ctx.obj["pipeline"] = LinearPipeline(config) - - return p - - @click.group(chain=True, short_help="Run a custom inference pipeline without a specific model type", no_args_is_help=True, @@ -642,7 +541,6 @@ def pipeline_other(ctx: click.Context, **kwargs): @pipeline_nlp.result_callback() @pipeline_fil.result_callback() -@pipeline_ae.result_callback() @pipeline_other.result_callback() @click.pass_context def post_pipeline(ctx: click.Context, *args, **kwargs): @@ -667,12 +565,9 @@ def post_pipeline(ctx: click.Context, *args, **kwargs): # Manually create the subcommands for each command (necessary since commands can be used on multiple groups) run.add_command(pipeline_nlp) run.add_command(pipeline_fil) -run.add_command(pipeline_ae) run.add_command(pipeline_other) -ALL = (PipelineModes.AE, PipelineModes.NLP, PipelineModes.FIL, PipelineModes.OTHER) -NOT_AE = (PipelineModes.NLP, PipelineModes.FIL, PipelineModes.OTHER) -AE_ONLY = (PipelineModes.AE, ) +ALL = (PipelineModes.NLP, PipelineModes.FIL, PipelineModes.OTHER) FIL_ONLY = (PipelineModes.FIL, ) NLP_ONLY = (PipelineModes.NLP, ) @@ -681,37 +576,28 @@ def post_pipeline(ctx: click.Context, *args, **kwargs): add_command("add-scores", "morpheus.stages.postprocess.add_scores_stage.AddScoresStage", modes=ALL) add_command("buffer", "morpheus.stages.general.buffer_stage.BufferStage", modes=ALL) add_command("delay", "morpheus.stages.general.delay_stage.DelayStage", modes=ALL) -add_command("deserialize", "morpheus.stages.preprocess.deserialize_stage.DeserializeStage", modes=NOT_AE) -add_command("dropna", "morpheus.stages.preprocess.drop_null_stage.DropNullStage", modes=NOT_AE) +add_command("deserialize", "morpheus.stages.preprocess.deserialize_stage.DeserializeStage", modes=ALL) +add_command("dropna", "morpheus.stages.preprocess.drop_null_stage.DropNullStage", modes=ALL) add_command("filter", "morpheus.stages.postprocess.filter_detections_stage.FilterDetectionsStage", modes=ALL) add_command("from-arxiv", "morpheus.stages.input.arxiv_source.ArxivSource", modes=ALL) -add_command("from-azure", "morpheus.stages.input.azure_source_stage.AzureSourceStage", modes=AE_ONLY) add_command("from-appshield", "morpheus.stages.input.appshield_source_stage.AppShieldSourceStage", modes=FIL_ONLY) -add_command("from-azure", "morpheus.stages.input.azure_source_stage.AzureSourceStage", modes=AE_ONLY) -add_command("from-cloudtrail", "morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", modes=AE_ONLY) add_command("from-databricks-deltalake", "morpheus.stages.input.databricks_deltalake_source_stage.DataBricksDeltaLakeSourceStage", modes=ALL) -add_command("from-duo", "morpheus.stages.input.duo_source_stage.DuoSourceStage", modes=AE_ONLY) -add_command("from-file", "morpheus.stages.input.file_source_stage.FileSourceStage", modes=NOT_AE) -add_command("from-kafka", "morpheus.stages.input.kafka_source_stage.KafkaSourceStage", modes=NOT_AE) +add_command("from-file", "morpheus.stages.input.file_source_stage.FileSourceStage", modes=ALL) +add_command("from-kafka", "morpheus.stages.input.kafka_source_stage.KafkaSourceStage", modes=ALL) add_command("from-http", "morpheus.stages.input.http_server_source_stage.HttpServerSourceStage", modes=ALL) add_command("from-http-client", "morpheus.stages.input.http_client_source_stage.HttpClientSourceStage", modes=ALL) add_command("from-rss", "morpheus.stages.input.rss_source_stage.RSSSourceStage", modes=ALL) add_command("gen-viz", "morpheus.stages.postprocess.generate_viz_frames_stage.GenerateVizFramesStage", modes=NLP_ONLY) -add_command("inf-identity", "morpheus.stages.inference.identity_inference_stage.IdentityInferenceStage", modes=NOT_AE) -add_command("inf-pytorch", - "morpheus.stages.inference.auto_encoder_inference_stage.AutoEncoderInferenceStage", - modes=AE_ONLY) -add_command("inf-pytorch", "morpheus.stages.inference.pytorch_inference_stage.PyTorchInferenceStage", modes=NOT_AE) -add_command("inf-triton", "morpheus.stages.inference.triton_inference_stage.TritonInferenceStage", modes=NOT_AE) -add_command("mlflow-drift", "morpheus.stages.postprocess.ml_flow_drift_stage.MLFlowDriftStage", modes=NOT_AE) +add_command("inf-identity", "morpheus.stages.inference.identity_inference_stage.IdentityInferenceStage", modes=ALL) +add_command("inf-pytorch", "morpheus.stages.inference.pytorch_inference_stage.PyTorchInferenceStage", modes=ALL) +add_command("inf-triton", "morpheus.stages.inference.triton_inference_stage.TritonInferenceStage", modes=ALL) +add_command("mlflow-drift", "morpheus.stages.postprocess.ml_flow_drift_stage.MLFlowDriftStage", modes=ALL) add_command("monitor", "morpheus.stages.general.monitor_stage.MonitorStage", modes=ALL) -add_command("preprocess", "morpheus.stages.preprocess.preprocess_ae_stage.PreprocessAEStage", modes=AE_ONLY) add_command("preprocess", "morpheus.stages.preprocess.preprocess_fil_stage.PreprocessFILStage", modes=FIL_ONLY) add_command("preprocess", "morpheus.stages.preprocess.preprocess_nlp_stage.PreprocessNLPStage", modes=NLP_ONLY) add_command("serialize", "morpheus.stages.postprocess.serialize_stage.SerializeStage", modes=ALL) -add_command("timeseries", "morpheus.stages.postprocess.timeseries_stage.TimeSeriesStage", modes=AE_ONLY) add_command("to-elasticsearch", "morpheus.stages.output.write_to_elasticsearch_stage.WriteToElasticsearchStage", modes=ALL) @@ -719,7 +605,6 @@ def post_pipeline(ctx: click.Context, *args, **kwargs): add_command("to-kafka", "morpheus.stages.output.write_to_kafka_stage.WriteToKafkaStage", modes=ALL) add_command("to-http", "morpheus.stages.output.http_client_sink_stage.HttpClientSinkStage", modes=ALL) add_command("to-http-server", "morpheus.stages.output.http_server_sink_stage.HttpServerSinkStage", modes=ALL) -add_command("train-ae", "morpheus.stages.preprocess.train_ae_stage.TrainAEStage", modes=AE_ONLY) add_command("trigger", "morpheus.stages.general.trigger_stage.TriggerStage", modes=ALL) add_command("validate", "morpheus.stages.postprocess.validation_stage.ValidationStage", modes=ALL) diff --git a/python/morpheus/morpheus/config.py b/python/morpheus/morpheus/config.py index 2b0073103e..2bc589a186 100644 --- a/python/morpheus/morpheus/config.py +++ b/python/morpheus/morpheus/config.py @@ -137,7 +137,6 @@ class PipelineModes(str, Enum): OTHER = "OTHER" NLP = "NLP" FIL = "FIL" - AE = "AE" class ExecutionMode(str, Enum): diff --git a/python/morpheus/morpheus/stages/inference/auto_encoder_inference_stage.py b/python/morpheus/morpheus/stages/inference/auto_encoder_inference_stage.py deleted file mode 100644 index f731d77a36..0000000000 --- a/python/morpheus/morpheus/stages/inference/auto_encoder_inference_stage.py +++ /dev/null @@ -1,149 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import typing - -import cupy as cp -import numpy as np -import pandas as pd - -from morpheus.cli.register_stage import register_stage -from morpheus.config import Config -from morpheus.config import PipelineModes -from morpheus.messages import ControlMessage -from morpheus.messages import ResponseMemoryAE -from morpheus.messages import TensorMemory -from morpheus.stages.inference.inference_stage import InferenceStage -from morpheus.stages.inference.inference_stage import InferenceWorker -from morpheus.utils.producer_consumer_queue import ProducerConsumerQueue - - -class _AutoEncoderInferenceWorker(InferenceWorker): - - def __init__(self, inf_queue: ProducerConsumerQueue, c: Config): - super().__init__(inf_queue) - - self._max_batch_size = c.model_max_batch_size - self._seq_length = c.feature_length - - self._feature_columns = c.ae.feature_columns - - def init(self): - - pass - - def build_output_message(self, msg: ControlMessage) -> ControlMessage: - """ - Create initial inference response message with result values initialized to zero. Results will be - set in message as each inference batch is processed. - - Parameters - ---------- - msg : `morpheus.messages.ControlMessage` - Batch of ControlMessage. - - Returns - ------- - `morpheus.messages.ControlMessage` - Response ControlMessage. - """ - - dims = self.calc_output_dims(msg) - output_dims = (msg.payload().count, *dims[1:]) - - output_message = ControlMessage(msg) - output_message.payload(msg.payload()) - output_message.tensors(TensorMemory(count=output_dims[0], tensors={"probs": cp.zeros(output_dims)})) - - return output_message - - def calc_output_dims(self, msg: ControlMessage) -> typing.Tuple: - # reconstruction loss and zscore - return (msg.tensors().count, 2) - - def process(self, batch: ControlMessage, callback: typing.Callable[[TensorMemory], None]): - """ - This function processes inference batch by using batch's model to calculate anomaly scores - and adding results to response. - - Parameters - ---------- - batch : `morpheus.messages.ControlMessage` - Batch of inference messages. - callback : typing.Callable[[`morpheus.pipeline.messages.TensorMemory`], None] - Inference callback. - - """ - - data = batch.payload().get_data(batch.payload().df.columns.intersection(self._feature_columns)).to_pandas() - - explain_cols = [x + "_z_loss" for x in self._feature_columns] + ["max_abs_z", "mean_abs_z"] - explain_df = pd.DataFrame(np.empty((batch.tensors().count, (len(self._feature_columns) + 2)), dtype=object), - columns=explain_cols) - - model = batch.get_metadata("model") - if model is not None: - rloss_scores = model.get_anomaly_score(data) - - results = model.get_results(data, return_abs=True) - scaled_z_scores = [col for col in results.columns if col.endswith('_z_loss')] - scaled_z_scores.extend(['max_abs_z', 'mean_abs_z']) - scaledz_df = results[scaled_z_scores] - for col in scaledz_df.columns: - explain_df[col] = scaledz_df[col] - - zscores = (rloss_scores - batch.get_metadata("train_scores_mean")) / batch.get_metadata("train_scores_std") - rloss_scores = rloss_scores.reshape((batch.tensors().count, 1)) - zscores = np.absolute(zscores) - zscores = zscores.reshape((batch.tensors().count, 1)) - else: - rloss_scores = np.empty((batch.tensors().count, 1)) - rloss_scores[:] = np.NaN - zscores = np.empty((batch.tensors().count, 1)) - zscores[:] = np.NaN - - ae_scores = np.concatenate((rloss_scores, zscores), axis=1) - - ae_scores = cp.asarray(ae_scores) - - mem = ResponseMemoryAE(count=batch.tensors().count, probs=ae_scores) - - mem.explain_df = explain_df - - callback(mem) - - -@register_stage("inf-pytorch", modes=[PipelineModes.AE]) -class AutoEncoderInferenceStage(InferenceStage): - """ - Perform inference with PyTorch. - """ - - def __init__(self, c: Config): - super().__init__(c) - - self._config = c - - def _get_inference_worker(self, inf_queue: ProducerConsumerQueue) -> InferenceWorker: - - return _AutoEncoderInferenceWorker(inf_queue, self._config) - - @staticmethod - def _convert_one_response(output: ControlMessage, inf: ControlMessage, res: ResponseMemoryAE): - # Set the explainability and then call the base - res.explain_df.index = range(0, inf.payload().count) - for col in res.explain_df.columns: - inf.payload().set_data(col, res.explain_df[col]) - - return InferenceStage._convert_one_response(output=output, inf=inf, res=res) diff --git a/python/morpheus/morpheus/stages/input/azure_source_stage.py b/python/morpheus/morpheus/stages/input/azure_source_stage.py index 38661e3fc4..ccab3f3cc2 100644 --- a/python/morpheus/morpheus/stages/input/azure_source_stage.py +++ b/python/morpheus/morpheus/stages/input/azure_source_stage.py @@ -17,14 +17,11 @@ import pandas as pd -from morpheus.cli import register_stage -from morpheus.config import PipelineModes from morpheus.stages.input.autoencoder_source_stage import AutoencoderSourceStage logger = logging.getLogger(__name__) -@register_stage("from-azure", modes=[PipelineModes.AE]) class AzureSourceStage(AutoencoderSourceStage): """ Source stage is used to load Azure Active Directory messages. diff --git a/python/morpheus/morpheus/stages/input/cloud_trail_source_stage.py b/python/morpheus/morpheus/stages/input/cloud_trail_source_stage.py index 968fee7ef2..ec78097a25 100644 --- a/python/morpheus/morpheus/stages/input/cloud_trail_source_stage.py +++ b/python/morpheus/morpheus/stages/input/cloud_trail_source_stage.py @@ -19,17 +19,14 @@ import numpy as np import pandas as pd -from morpheus.cli import register_stage from morpheus.common import FileTypes from morpheus.common import determine_file_type -from morpheus.config import PipelineModes from morpheus.io.deserializers import read_file_to_df from morpheus.stages.input.autoencoder_source_stage import AutoencoderSourceStage logger = logging.getLogger(__name__) -@register_stage("from-cloudtrail", modes=[PipelineModes.AE]) class CloudTrailSourceStage(AutoencoderSourceStage): """ Load messages from a CloudTrail directory. diff --git a/python/morpheus/morpheus/stages/input/control_message_kafka_source_stage.py b/python/morpheus/morpheus/stages/input/control_message_kafka_source_stage.py index 6e7967c1d5..7f74bbb5a1 100644 --- a/python/morpheus/morpheus/stages/input/control_message_kafka_source_stage.py +++ b/python/morpheus/morpheus/stages/input/control_message_kafka_source_stage.py @@ -21,9 +21,7 @@ import mrc import pandas as pd -from morpheus.cli.register_stage import register_stage from morpheus.config import Config -from morpheus.config import PipelineModes from morpheus.messages import ControlMessage from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource @@ -33,7 +31,6 @@ logger = logging.getLogger(__name__) -@register_stage("from-cm-kafka", modes=[PipelineModes.AE]) class ControlMessageKafkaSourceStage(PreallocatorMixin, SingleOutputSource): """ Load control messages from a Kafka cluster. diff --git a/python/morpheus/morpheus/stages/input/duo_source_stage.py b/python/morpheus/morpheus/stages/input/duo_source_stage.py index 8f5e9c86c0..c4645cce0e 100644 --- a/python/morpheus/morpheus/stages/input/duo_source_stage.py +++ b/python/morpheus/morpheus/stages/input/duo_source_stage.py @@ -19,15 +19,12 @@ import pandas as pd -from morpheus.cli import register_stage -from morpheus.config import PipelineModes from morpheus.stages.input.autoencoder_source_stage import AutoencoderSourceStage DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00' logger = logging.getLogger(__name__) -@register_stage("from-duo", modes=[PipelineModes.AE]) class DuoSourceStage(AutoencoderSourceStage): """ Source stage is used to load Duo Authentication messages. diff --git a/python/morpheus/morpheus/stages/postprocess/timeseries_stage.py b/python/morpheus/morpheus/stages/postprocess/timeseries_stage.py index 5d7e5d5a67..cdda8d5b12 100644 --- a/python/morpheus/morpheus/stages/postprocess/timeseries_stage.py +++ b/python/morpheus/morpheus/stages/postprocess/timeseries_stage.py @@ -25,9 +25,7 @@ import pandas as pd from mrc.core import operators as ops -from morpheus.cli.register_stage import register_stage from morpheus.config import Config -from morpheus.config import PipelineModes from morpheus.messages import ControlMessage from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin from morpheus.pipeline.single_port_stage import SinglePortStage @@ -403,7 +401,6 @@ def _calc_timeseries(self, x: ControlMessage, is_complete: bool): return output_messages -@register_stage("timeseries", modes=[PipelineModes.AE]) class TimeSeriesStage(PassThruTypeMixin, SinglePortStage): """ Perform time series anomaly detection and add prediction. diff --git a/python/morpheus/morpheus/stages/preprocess/preprocess_ae_stage.py b/python/morpheus/morpheus/stages/preprocess/preprocess_ae_stage.py deleted file mode 100644 index 8cd18cfc87..0000000000 --- a/python/morpheus/morpheus/stages/preprocess/preprocess_ae_stage.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import typing -from functools import partial - -import cupy as cp - -from morpheus.cli.register_stage import register_stage -from morpheus.config import Config -from morpheus.config import PipelineModes -from morpheus.messages import ControlMessage -from morpheus.messages import TensorMemory -from morpheus.stages.preprocess.preprocess_base_stage import PreprocessBaseStage - -logger = logging.getLogger(__name__) - - -@register_stage("preprocess", modes=[PipelineModes.AE]) -class PreprocessAEStage(PreprocessBaseStage): - """ - Prepare Autoencoder input DataFrames for inference. - - Parameters - ---------- - c : morpheus.config.Config - Pipeline configuration instance. - - """ - - def __init__(self, c: Config): - super().__init__(c) - - self._fea_length = c.feature_length - self._feature_columns = c.ae.feature_columns - - @property - def name(self) -> str: - return "preprocess-ae" - - def accepted_types(self) -> typing.Tuple: - """ - Returns accepted input types for this stage. - """ - return (ControlMessage, ) - - def supports_cpp_node(self): - return False - - @staticmethod - def pre_process_batch(msg: ControlMessage, fea_len: int, feature_columns: typing.List[str]) -> ControlMessage: - """ - This function performs pre-processing for autoencoder. - - Parameters - ---------- - msg : morpheus.messages.ControlMessage - Input rows received from Deserialized stage. - fea_len : int - Number of input features. - feature_columns : typing.List[str] - List of feature columns. - - Returns - ------- - morpheus.messages.ControlMessage - - """ - meta_df = msg.payload().get_data(msg.payload().df.columns.intersection(feature_columns)).to_pandas() - - autoencoder = msg.get_metadata("model") - scores_mean = msg.get_metadata("train_scores_mean") - scores_std = msg.get_metadata("train_scores_std") - count = len(meta_df.index) - - inputs = cp.zeros(meta_df.shape, dtype=cp.float32) - - if autoencoder is not None: - data = autoencoder.prepare_df(meta_df) - inputs = autoencoder.build_input_tensor(data) - inputs = cp.asarray(inputs.detach()) - count = inputs.shape[0] - - seg_ids = cp.zeros((count, 3), dtype=cp.uint32) - seg_ids[:, 0] = cp.arange(0, count, dtype=cp.uint32) - seg_ids[:, 2] = fea_len - 1 - - msg.set_metadata("model", autoencoder) - msg.set_metadata("train_scores_mean", scores_mean) - msg.set_metadata("train_scores_std", scores_std) - msg.tensors(TensorMemory(count=count, tensors={"input": inputs, "seq_ids": seg_ids})) - return msg - - def _get_preprocess_fn(self) -> typing.Callable[[ControlMessage], ControlMessage]: - return partial(PreprocessAEStage.pre_process_batch, - fea_len=self._fea_length, - feature_columns=self._feature_columns) diff --git a/python/morpheus/morpheus/stages/preprocess/train_ae_stage.py b/python/morpheus/morpheus/stages/preprocess/train_ae_stage.py deleted file mode 100644 index c765aa7f20..0000000000 --- a/python/morpheus/morpheus/stages/preprocess/train_ae_stage.py +++ /dev/null @@ -1,340 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import glob -import importlib -import logging -import pathlib - -import dill -import mrc -import pandas as pd -from mrc.core import operators as ops - -from morpheus.cli.register_stage import register_stage -from morpheus.config import Config -from morpheus.config import PipelineModes -from morpheus.messages import ControlMessage -from morpheus.models.dfencoder import AutoEncoder -from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin -from morpheus.pipeline.single_port_stage import SinglePortStage -from morpheus.utils.seed import manual_seed - -logger = logging.getLogger(__name__) - - -class _UserModelManager: - - def __init__(self, - config: Config, - user_id: str, - save_model: bool, - epochs: int, - max_history: int, - seed: int = None) -> None: - super().__init__() - - self._user_id = user_id - self._history: pd.DataFrame = None - self._max_history: int = max_history - self._seed: int = seed - self._feature_columns = config.ae.feature_columns - self._feature_scaler = config.ae.feature_scaler - self._epochs = epochs - self._save_model = save_model - - self._model: AutoEncoder = None - self._train_scores_mean = None - self._train_scores_std = None - - @property - def model(self): - return self._model - - @property - def train_scores_mean(self): - return self._train_scores_mean - - @property - def train_scores_std(self): - return self._train_scores_std - - def train(self, df: pd.DataFrame) -> AutoEncoder: - - # Determine how much history to save - if (self._history is not None): - to_drop = max(len(df) + len(self._history) - self._max_history, 0) - - history = self._history.iloc[to_drop:, :] - - train_df = pd.concat([history, df]) - else: - train_df = df - - # If the seed is set, enforce that here - if (self._seed is not None): - manual_seed(self._seed) - - model = AutoEncoder( - encoder_layers=[512, 500], # layers of the encoding part - decoder_layers=[512], # layers of the decoding part - activation='relu', # activation function - swap_probability=0.2, # noise parameter - learning_rate=0.01, # learning rate - learning_rate_decay=.99, # learning decay - batch_size=512, - # logger='ipynb', - verbose=False, - optimizer='sgd', # SGD optimizer is selected(Stochastic gradient descent) - scaler=self._feature_scaler, # feature scaling method - min_cats=1, # cut off for minority categories - progress_bar=False) - - logger.debug("Training AE model for user: '%s'...", self._user_id) - model.fit(train_df, epochs=self._epochs) - train_loss_scores = model.get_anomaly_score(train_df) - scores_mean = train_loss_scores.mean() - scores_std = train_loss_scores.std() - - logger.debug("Training AE model for user: '%s'... Complete.", self._user_id) - - if (self._save_model): - self._model = model - self._train_scores_mean = scores_mean - self._train_scores_std = scores_std - - # Save the history for next time - self._history = train_df.iloc[max(0, len(train_df) - self._max_history):, :] - - return model, scores_mean, scores_std - - -@register_stage("train-ae", modes=[PipelineModes.AE]) -class TrainAEStage(PassThruTypeMixin, SinglePortStage): - """ - Train an Autoencoder model on incoming data. - - This stage is used to train an Autoencoder model on incoming data a supply that model to downstream stages. The - Autoencoder workflows use this stage as a pre-processing step to build the model for inference. - - Parameters - ---------- - c : morpheus.config.Config - Pipeline configuration instance. - pretrained_filename : pathlib.Path, default = None - Loads a single pre-trained model for all users. - train_data_glob : str, default = None - On startup, all files matching this glob pattern will be loaded and used to train a model for each unique user - ID. - source_stage_class : str, default = None - If train_data_glob provided, use source stage to batch training data per user. - train_epochs : int, default = 25, min = 1 - The number of epochs to train user models for. Passed in as the `epoch` parameter to `AutoEncoder.fit` causes - data to be trained in `train_epochs` batches. - train_min_history : int, default = 300 - Minimum number of rows to train user model. - train_max_history : int, default = 1000, min = 1 - Maximum amount of rows that will be retained in history. As new data arrives, models will be retrained with a - maximum number of rows specified by this value. - seed : int, default = None - Seed to use when training. When not None, ensure random number generators are seeded with `seed` to control - reproducibility of user model training. - sort_glob : bool, default = False, is_flag = True - If true the list of files matching `input_glob` will be processed in sorted order. - models_output_filename : pathlib.Path, default = None, writable = True - The location to write trained models to. - """ - - def __init__(self, - c: Config, - pretrained_filename: pathlib.Path = None, - train_data_glob: str = None, - source_stage_class: str = None, - train_epochs: int = 25, - train_min_history: int = 300, - train_max_history: int = 1000, - seed: int = None, - sort_glob: bool = False, - models_output_filename: pathlib.Path = None): - super().__init__(c) - - self._config = c - self._feature_columns = c.ae.feature_columns - self._use_generic_model = c.ae.use_generic_model - self._batch_size = c.pipeline_batch_size - self._pretrained_filename = pretrained_filename - self._train_data_glob: str = train_data_glob - self._train_epochs = train_epochs - self._train_min_history = train_min_history - self._train_max_history = train_max_history - self._seed = seed - self._sort_glob = sort_glob - self._models_output_filename = models_output_filename - - self._source_stage_class = source_stage_class - if self._source_stage_class is not None: - source_stage_module, source_stage_classname = self._source_stage_class.rsplit('.', 1) - # load the source stage module, will raise ImportError if module cannot be loaded - source_stage_module = importlib.import_module(source_stage_module) - # get the source stage class, will raise AttributeError if class cannot be found - self._source_stage_class = getattr(source_stage_module, source_stage_classname) - - # Single model for the entire pipeline - self._pretrained_model: AutoEncoder = None - - # Per user model data - self._user_models: dict[str, _UserModelManager] = {} - - @property - def name(self) -> str: - return "train-ae" - - def accepted_types(self) -> tuple: - """ - Returns accepted input types for this stage. - - """ - return (ControlMessage, ) - - def supports_cpp_node(self): - return False - - def _get_per_user_model(self, msg: ControlMessage): - - model = None - train_scores_mean = None - train_scores_std = None - user_model = None - - user_id = msg.get_metadata("user_id") - - if user_id in self._user_models: - user_model = self._user_models[user_id] - elif self._use_generic_model and "generic" in self._user_models.keys(): - user_model = self._user_models["generic"] - - if (user_model is not None): - model = user_model.model - train_scores_mean = user_model.train_scores_mean - train_scores_std = user_model.train_scores_std - - return model, train_scores_mean, train_scores_std - - def _train_model(self, msg: ControlMessage) -> list[ControlMessage]: - user_id = msg.get_metadata("user_id") - - if (user_id not in self._user_models): - self._user_models[user_id] = _UserModelManager(self._config, - user_id, - False, - self._train_epochs, - self._train_max_history, - self._seed) - - with msg.payload().mutable_dataframe() as cdf: - pdf = cdf.to_pandas() - - return self._user_models[user_id].train(pdf) - - def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - get_model_fn = None - - # If a pretrained model was specified, load that now - if (self._pretrained_filename is not None): - if (self._train_data_glob is not None): - logger.warning("Both 'pretrained_filename' and 'train_data_glob' were specified. " - "The 'train_data_glob' will be ignored") - - with open(self._pretrained_filename, 'rb') as in_strm: - # self._pretrained_model = dill.load(in_strm) - self._user_models = dill.load(in_strm) - - # get_model_fn = self._get_pretrained_model - get_model_fn = self._get_per_user_model - - elif (self._train_data_glob is not None): - if (self._source_stage_class is None): - raise RuntimeError("source_stage_class must be provided with train_data_glob") - file_list = glob.glob(self._train_data_glob) - if self._sort_glob: - file_list = sorted(file_list) - - user_to_df = self._source_stage_class.files_to_dfs_per_user(file_list, - self._config.ae.userid_column_name, - self._feature_columns, - self._config.ae.userid_filter) - - if self._use_generic_model: - self._user_models["generic"] = _UserModelManager(self._config, - "generic", - True, - self._train_epochs, - self._train_max_history, - self._seed) - - all_users_df = pd.concat(user_to_df.values(), ignore_index=True) - all_users_df = self._source_stage_class.derive_features(all_users_df, self._feature_columns) - all_users_df = all_users_df.fillna("nan") - self._user_models["generic"].train(all_users_df) - - for user_id, df in user_to_df.items(): - if len(df.index) >= self._train_min_history: - self._user_models[user_id] = _UserModelManager(self._config, - user_id, - True, - self._train_epochs, - self._train_max_history, - self._seed) - - # Derive features here - # print(df) - df = self._source_stage_class.derive_features(df, self._feature_columns) - df = df.fillna("nan") - self._user_models[user_id].train(df) - - # Save trained user models - if self._models_output_filename is not None: - with open(self._models_output_filename, 'wb') as out_strm: - dill.dump(self._user_models, out_strm) - - get_model_fn = self._get_per_user_model - - else: - get_model_fn = self._train_model - - def on_next(full_message: ControlMessage): - - model, scores_mean, scores_std = get_model_fn(full_message) - - full_message.set_metadata("model", model) - full_message.set_metadata("train_scores_mean", scores_mean) - full_message.set_metadata("train_scores_std", scores_std) - - # cuDF does not yet support timezone-aware datetimes - # Remove timezone information from pd.DatetimeTZDtype columns - meta = full_message.payload() - to_send = [] - - # Now split into batches - for i in range(0, meta.count, self._batch_size): - output_message = ControlMessage(full_message) - output_message.payload(meta.get_slice(i, min(i + self._batch_size, meta.count))) - to_send.append(output_message) - - return to_send - - node = builder.make_node(self.unique_name, ops.map(on_next), ops.flatten()) - builder.make_edge(input_node, node) - - return node diff --git a/scripts/validation/hammah/val-hammah-all.sh b/scripts/validation/hammah/val-hammah-all.sh deleted file mode 100755 index ea5021bc37..0000000000 --- a/scripts/validation/hammah/val-hammah-all.sh +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/bash -# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -e +o pipefail -# set -x -# set -v - - -# RUN OPTIONS -SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" - -# Call minibert first -${SCRIPT_DIR}/val-hammah.sh "user123" -${SCRIPT_DIR}/val-hammah.sh "role-g" diff --git a/scripts/validation/hammah/val-hammah.sh b/scripts/validation/hammah/val-hammah.sh deleted file mode 100755 index 1201502e91..0000000000 --- a/scripts/validation/hammah/val-hammah.sh +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/bash -# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -set -e +o pipefail -# set -x -# set -v - -SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" - -# Override the global defaults -RUN_PYTORCH=1 -RUN_TRITON_ONNX=0 - -# Load the utility scripts -source ${SCRIPT_DIR}/../val-run-pipeline.sh - -# Get the model/data from the argument. Must be 'role-g' or 'user123' -HAMMAH_TYPE=${HAMMAH_TYPE:-$1} - -HAMMAH_INPUT_FILE=${SID_INPUT_FILE:-"${MORPHEUS_ROOT}/models/datasets/validation-data/hammah-${HAMMAH_TYPE}-validation-data.csv"} -HAMMAH_TRUTH_FILE=${SID_TRUTH_FILE:-"${MORPHEUS_ROOT}/models/datasets/validation-data/dfp-cloudtrail-${HAMMAH_TYPE}-validation-data-output.csv"} - -MODEL_FILE=${MODEL_FILE:-"${MORPHEUS_ROOT}/models/hammah-models/hammah-${HAMMAH_TYPE}-20211017.pkl"} -MODEL_DIRECTORY=${MODEL_FILE%/*} -MODEL_FILENAME=$(basename -- "${MODEL_FILE}") -MODEL_EXTENSION="${MODEL_FILENAME##*.}" -MODEL_NAME="${MODEL_FILENAME%.*}" - -OUTPUT_FILE_BASE="${MORPHEUS_ROOT}/.tmp/val_${MODEL_NAME}-" - -if [[ "${RUN_PYTORCH}" = "1" ]]; then - OUTPUT_FILE="${OUTPUT_FILE_BASE}pytorch.csv" - VAL_OUTPUT_FILE="${OUTPUT_FILE_BASE}pytorch-results.json" - - run_pipeline_hammah_${HAMMAH_TYPE} \ - "${HAMMAH_INPUT_FILE}" \ - "inf-pytorch" \ - "${OUTPUT_FILE}" \ - "${HAMMAH_TRUTH_FILE}" \ - "${VAL_OUTPUT_FILE}" - - # Get the diff - PYTORCH_ERROR="${b}$(calc_error_val ${VAL_OUTPUT_FILE})" -else - PYTORCH_ERROR="${y}Skipped" -fi - -if [[ "${RUN_TRITON_ONNX}" = "1" ]]; then - - load_triton_model "phishing-bert-onnx" - - OUTPUT_FILE="${OUTPUT_FILE_BASE}triton-onnx.csv" - VAL_OUTPUT_FILE="${OUTPUT_FILE_BASE}triton-onnx-results.json" - - run_pipeline_hammah_${HAMMAH_TYPE} \ - "${HAMMAH_INPUT_FILE}" \ - "inf-triton --model_name=phishing-bert-onnx --server_url=${TRITON_URL} --force_convert_inputs=True" \ - "${OUTPUT_FILE}" \ - "${HAMMAH_TRUTH_FILE}" \ - "${VAL_OUTPUT_FILE}" - - # Get the diff - TRITON_ONNX_ERROR="${b}$(calc_error_val ${VAL_OUTPUT_FILE})" -else - TRITON_ONNX_ERROR="${y}Skipped" -fi - -if [[ "${RUN_TRITON_TRT}" = "1" ]]; then - load_triton_model "phishing-bert-trt" - - OUTPUT_FILE="${OUTPUT_FILE_BASE}triton-trt.csv" - VAL_OUTPUT_FILE="${OUTPUT_FILE_BASE}triton-trt-results.json" - - run_pipeline_hammah_${HAMMAH_TYPE} \ - "${HAMMAH_INPUT_FILE}" \ - "inf-triton --model_name=phishing-bert-trt --server_url=${TRITON_URL} --force_convert_inputs=True" \ - "${OUTPUT_FILE}" \ - "${HAMMAH_TRUTH_FILE}" \ - "${VAL_OUTPUT_FILE}" - - # Get the diff - TRITON_TRT_ERROR="${b}$(calc_error_val ${VAL_OUTPUT_FILE})" -else - TRITON_TRT_ERROR="${y}Skipped" -fi - -if [[ "${RUN_TENSORRT}" = "1" ]]; then - # Generate the TensorRT model - cd ${MORPHEUS_ROOT}/models/triton-model-repo/sid-${SID_TYPE}-trt/1 - - echo "Generating the TensorRT model. This may take a minute..." - morpheus tools onnx-to-trt --input_model ${MODEL_DIRECTORY}/${MODEL_NAME}.onnx --output_model ./sid-${SID_TYPE}-trt_b1-8_b1-16_b1-32.engine --batches 1 8 --batches 1 16 --batches 1 32 --seq_length 256 --max_workspace_size 16000 - - cd ${MORPHEUS_ROOT} - - OUTPUT_FILE="${OUTPUT_FILE_BASE}tensorrt.csv" - VAL_OUTPUT_FILE="${OUTPUT_FILE_BASE}tensorrt-results.json" - - run_pipeline_hammah_${HAMMAH_TYPE} \ - "${HAMMAH_INPUT_FILE}" \ - "inf-triton --model_name=sid-${SID_TYPE}-trt --server_url=${TRITON_URL} --force_convert_inputs=True" \ - "${OUTPUT_FILE}" \ - "${HAMMAH_TRUTH_FILE}" \ - "${VAL_OUTPUT_FILE}" - - # Get the diff - TRITON_TRT_ERROR="${b}$(calc_error_val ${VAL_OUTPUT_FILE})" - -else - TENSORRT_ERROR="${y}Skipped" -fi - -echo -e "${b}===ERRORS===${x}" -echo -e "PyTorch :${PYTORCH_ERROR}${x}" -echo -e "Triton(ONNX):${TRITON_ONNX_ERROR}${x}" -echo -e "Triton(TRT) :${TRITON_TRT_ERROR}${x}" -echo -e "TensorRT :${TENSORRT_ERROR}${x}" - -echo -e "${g}Complete!${x}" diff --git a/scripts/validation/val-run-pipeline.sh b/scripts/validation/val-run-pipeline.sh index 0af859a6e0..83f76d80d8 100755 --- a/scripts/validation/val-run-pipeline.sh +++ b/scripts/validation/val-run-pipeline.sh @@ -112,47 +112,3 @@ function run_pipeline_phishing_email(){ serialize \ to-file --filename=${OUTPUT_FILE} --overwrite } - -function run_pipeline_hammah_user123(){ - - INPUT_FILE=$1 - INFERENCE_STAGE=$2 - OUTPUT_FILE=$3 - VAL_FILE=$4 - VAL_OUTPUT=$5 - - morpheus --log_level=DEBUG run --num_threads=$(nproc) --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ - pipeline-ae --columns_file="${MORPHEUS_ROOT}/python/morpheus/morpheus/data/columns_ae_cloudtrail.txt" --userid_filter="user123" --userid_column_name="userIdentitysessionContextsessionIssueruserName" --timestamp_column_name="event_dt" \ - from-cloudtrail --input_glob="${MORPHEUS_ROOT}/models/datasets/validation-data/dfp-cloudtrail-*-input.csv" \ - train-ae --train_data_glob="${MORPHEUS_ROOT}/models/datasets/training-data/dfp-cloudtrail-*.csv" --source_stage_class=morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage --seed 42 \ - preprocess \ - ${INFERENCE_STAGE} \ - add-scores \ - timeseries --resolution=1m --zscore_threshold=8.0 --hot_start \ - monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ - validate --val_file_name=${VAL_FILE} --results_file_name=${VAL_OUTPUT} --index_col="_index_" --exclude "event_dt" --rel_tol=0.1 --overwrite \ - serialize \ - to-file --filename=${OUTPUT_FILE} --overwrite -} - -function run_pipeline_hammah_role-g(){ - - INPUT_FILE=$1 - INFERENCE_STAGE=$2 - OUTPUT_FILE=$3 - VAL_FILE=$4 - VAL_OUTPUT=$5 - - morpheus --log_level=DEBUG run --num_threads=$(nproc) --pipeline_batch_size=1024 --model_max_batch_size=1024 --use_cpp=${USE_CPP} \ - pipeline-ae --columns_file="${MORPHEUS_ROOT}/python/morpheus/morpheus/data/columns_ae_cloudtrail.txt" --userid_filter="role-g" --userid_column_name="userIdentitysessionContextsessionIssueruserName" --timestamp_column_name="event_dt" \ - from-cloudtrail --input_glob="${MORPHEUS_ROOT}/models/datasets/validation-data/dfp-cloudtrail-*-input.csv" \ - train-ae --train_data_glob="${MORPHEUS_ROOT}/models/datasets/training-data/dfp-cloudtrail-*.csv" --source_stage_class=morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage --seed 42 \ - preprocess \ - ${INFERENCE_STAGE} \ - add-scores \ - timeseries --resolution=1m --zscore_threshold=8.0 --hot_start \ - monitor --description "Inference Rate" --smoothing=0.001 --unit inf \ - validate --val_file_name=${VAL_FILE} --results_file_name=${VAL_OUTPUT} --index_col="_index_" --exclude "event_dt" --rel_tol=0.15 --overwrite \ - serialize \ - to-file --filename=${OUTPUT_FILE} --overwrite -} diff --git a/tests/benchmarks/README.md b/tests/benchmarks/README.md index 148dbb3d44..c1fa094416 100644 --- a/tests/benchmarks/README.md +++ b/tests/benchmarks/README.md @@ -193,7 +193,7 @@ Additional benchmark stats for each workflow: ### Production DFP E2E Benchmarks -Note that the `test_cloudtrail_ae_e2e` benchmarks measure performance of a pipeline built using [Starter DFP](../../examples/digital_fingerprinting/starter/README.md) stages. Separate benchmark tests are also provided to measure performance of the example [Production DFP](../../examples/digital_fingerprinting/production/README.md) pipelines. More information about running those benchmarks can be found [here](../../examples/digital_fingerprinting/production/morpheus/benchmarks/README.md). +Separate benchmark tests are provided to measure performance of the example [Production DFP](../../examples/digital_fingerprinting/production/README.md) pipelines. More information about running those benchmarks can be found [here](../../examples/digital_fingerprinting/production/morpheus/benchmarks/README.md). You can use the same Dev container created here to run the Production DFP benchmarks. You would just need to install additional dependencies as follows: diff --git a/tests/benchmarks/e2e_test_configs.json b/tests/benchmarks/e2e_test_configs.json index eae85c1deb..83449a5517 100644 --- a/tests/benchmarks/e2e_test_configs.json +++ b/tests/benchmarks/e2e_test_configs.json @@ -26,15 +26,5 @@ "model_max_batch_size": 64, "feature_length": 128, "edge_buffer_size": 4 - }, - "test_cloudtrail_ae_e2e": { - "input_glob_path": "../../models/datasets/validation-data/dfp-cloudtrail-*-input.csv", - "train_glob_path": "../../models/datasets/training-data/dfp-cloudtrail-*-training-data.csv", - "repeat": 1, - "num_threads": 1, - "pipeline_batch_size": 1024, - "model_max_batch_size": 1024, - "feature_length": 32, - "edge_buffer_size": 4 } -} \ No newline at end of file +} diff --git a/tests/benchmarks/test_bench_e2e_pipelines.py b/tests/benchmarks/test_bench_e2e_pipelines.py index e99e7bbc07..b9f6880d3e 100644 --- a/tests/benchmarks/test_bench_e2e_pipelines.py +++ b/tests/benchmarks/test_bench_e2e_pipelines.py @@ -21,25 +21,19 @@ from _utils import TEST_DIRS from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder from morpheus.config import ConfigFIL from morpheus.config import CppConfig from morpheus.config import PipelineModes from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage -from morpheus.stages.inference.auto_encoder_inference_stage import AutoEncoderInferenceStage from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage -from morpheus.stages.input.cloud_trail_source_stage import CloudTrailSourceStage from morpheus.stages.input.file_source_stage import FileSourceStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.add_classifications_stage import AddClassificationsStage -from morpheus.stages.postprocess.add_scores_stage import AddScoresStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.stages.preprocess.preprocess_ae_stage import PreprocessAEStage from morpheus.stages.preprocess.preprocess_fil_stage import PreprocessFILStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage -from morpheus.stages.preprocess.train_ae_stage import TrainAEStage from morpheus.utils.file_utils import load_labels_file from morpheus.utils.logger import configure_logging @@ -97,28 +91,6 @@ def fil_pipeline(config: Config, input_file, repeat, output_file, model_name): pipeline.run() -def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file): - - configure_logging(log_level=logging.INFO) - pipeline = LinearPipeline(config) - pipeline.set_source(CloudTrailSourceStage(config, input_glob=input_glob, max_files=200, repeat=repeat)) - pipeline.add_stage( - TrainAEStage(config, - train_data_glob=train_data_glob, - source_stage_class="morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", - seed=42, - sort_glob=True)) - pipeline.add_stage(PreprocessAEStage(config)) - pipeline.add_stage(AutoEncoderInferenceStage(config)) - pipeline.add_stage(AddScoresStage(config)) - pipeline.add_stage(MonitorStage(config, log_level=logging.INFO)) - pipeline.add_stage(SerializeStage(config)) - pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) - - pipeline.build() - pipeline.run() - - @pytest.mark.benchmark def test_sid_nlp_e2e(benchmark, tmp_path): @@ -196,30 +168,3 @@ def test_phishing_nlp_e2e(benchmark, tmp_path): model_name = "phishing-bert-onnx" benchmark(nlp_pipeline, config, input_filepath, repeat, vocab_filepath, output_filepath, model_name) - - -@pytest.mark.benchmark -def test_cloudtrail_ae_e2e(benchmark, tmp_path): - - config = Config() - config.mode = PipelineModes.AE - config.num_threads = E2E_TEST_CONFIGS["test_cloudtrail_ae_e2e"]["num_threads"] - config.pipeline_batch_size = E2E_TEST_CONFIGS["test_cloudtrail_ae_e2e"]["pipeline_batch_size"] - config.model_max_batch_size = E2E_TEST_CONFIGS["test_cloudtrail_ae_e2e"]["model_max_batch_size"] - config.feature_length = E2E_TEST_CONFIGS["test_cloudtrail_ae_e2e"]["feature_length"] - config.edge_buffer_size = E2E_TEST_CONFIGS["test_cloudtrail_ae_e2e"]["edge_buffer_size"] - config.class_labels = ["reconstruct_loss", "zscore"] - - config.ae = ConfigAutoEncoder() - config.ae.userid_column_name = "userIdentityaccountId" - config.ae.userid_filter = "Account-123456789" - ae_cols_filepath = os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt') - config.ae.feature_columns = load_labels_file(ae_cols_filepath) - CppConfig.set_should_use_cpp(False) - - input_glob = E2E_TEST_CONFIGS["test_cloudtrail_ae_e2e"]["input_glob_path"] - repeat = E2E_TEST_CONFIGS["test_cloudtrail_ae_e2e"]["repeat"] - train_glob = E2E_TEST_CONFIGS["test_cloudtrail_ae_e2e"]["train_glob_path"] - output_filepath = os.path.join(tmp_path, "cloudtrail_ae_e2e_output.csv") - - benchmark(ae_pipeline, config, input_glob, repeat, train_glob, output_filepath) diff --git a/tests/morpheus/stages/test_preprocess_ae_stage.py b/tests/morpheus/stages/test_preprocess_ae_stage.py deleted file mode 100644 index 5202361b41..0000000000 --- a/tests/morpheus/stages/test_preprocess_ae_stage.py +++ /dev/null @@ -1,64 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import typing - -import cupy as cp -import pytest -import typing_utils - -import cudf - -from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder -from morpheus.messages import ControlMessage -from morpheus.messages import MessageMeta -from morpheus.stages.preprocess.preprocess_ae_stage import PreprocessAEStage - - -@pytest.fixture(name='config') -def fixture_config(config: Config): - config.feature_length = 256 - config.ae = ConfigAutoEncoder() - config.ae.feature_columns = ["data"] - yield config - - -def test_constructor(config: Config): - stage = PreprocessAEStage(config) - assert stage.name == "preprocess-ae" - - accepted_union = typing.Union[stage.accepted_types()] - assert typing_utils.issubtype(ControlMessage, accepted_union) - - -def test_process_control_message(config: Config): - stage = PreprocessAEStage(config) - - df = cudf.DataFrame({"data": ["a", "b", "c"]}) - meta = MessageMeta(df) - - input_control_message = ControlMessage() - input_control_message.payload(meta) - - output_control_message = stage.pre_process_batch(input_control_message, fea_len=256, feature_columns=["data"]) - - expected_input = cp.zeros(df.shape, dtype=cp.float32) - assert cp.array_equal(output_control_message.tensors().get_tensor("input"), expected_input) - - expect_seq_ids = cp.zeros((df.shape[0], 3), dtype=cp.uint32) - expect_seq_ids[:, 0] = cp.arange(0, df.shape[0], dtype=cp.uint32) - expect_seq_ids[:, 2] = stage._fea_length - 1 - assert cp.array_equal(output_control_message.tensors().get_tensor("seq_ids"), expect_seq_ids) diff --git a/tests/morpheus/test_cli.py b/tests/morpheus/test_cli.py index 0d108e0623..dfb8a18844 100755 --- a/tests/morpheus/test_cli.py +++ b/tests/morpheus/test_cli.py @@ -32,14 +32,11 @@ from morpheus.common import FileTypes from morpheus.common import FilterSource from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder from morpheus.config import PipelineModes from morpheus.stages.general.monitor_stage import MonitorStage -from morpheus.stages.inference.auto_encoder_inference_stage import AutoEncoderInferenceStage from morpheus.stages.inference.identity_inference_stage import IdentityInferenceStage from morpheus.stages.inference.pytorch_inference_stage import PyTorchInferenceStage from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage -from morpheus.stages.input.cloud_trail_source_stage import CloudTrailSourceStage from morpheus.stages.input.file_source_stage import FileSourceStage from morpheus.stages.input.kafka_source_stage import KafkaSourceStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage @@ -49,14 +46,11 @@ from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage from morpheus.stages.postprocess.ml_flow_drift_stage import MLFlowDriftStage from morpheus.stages.postprocess.serialize_stage import SerializeStage -from morpheus.stages.postprocess.timeseries_stage import TimeSeriesStage from morpheus.stages.postprocess.validation_stage import ValidationStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.stages.preprocess.drop_null_stage import DropNullStage -from morpheus.stages.preprocess.preprocess_ae_stage import PreprocessAEStage from morpheus.stages.preprocess.preprocess_fil_stage import PreprocessFILStage from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage -from morpheus.stages.preprocess.train_ae_stage import TrainAEStage from morpheus.utils.file_utils import load_labels_file GENERAL_ARGS = ['run', '--num_threads=12', '--pipeline_batch_size=1024', '--model_max_batch_size=1024', '--use_cpp=0'] @@ -142,9 +136,8 @@ def config_warning_fixture(): @pytest.mark.usefixtures("chdir_tmpdir", "reload_modules") class TestCLI: - @pytest.mark.parametrize('cmd', - [[], ['tools'], ['run'], ['run', 'pipeline-ae'], ['run', 'pipeline-fil'], - ['run', 'pipeline-nlp'], ['run', 'pipeline-other']]) + @pytest.mark.parametrize( + 'cmd', [[], ['tools'], ['run'], ['run', 'pipeline-fil'], ['run', 'pipeline-nlp'], ['run', 'pipeline-other']]) def test_help(self, cmd: list[str]): runner = CliRunner() result = runner.invoke(commands.cli, cmd + ['--help']) @@ -175,186 +168,6 @@ def test_manual_seed(self, mock_manual_seed: mock.MagicMock, value: int, use_env assert result.exit_code == 0, result.output mock_manual_seed.assert_called_once_with(value) - @pytest.mark.replace_callback('pipeline_ae') - def test_pipeline_ae(self, config, callback_values): - """ - Build a pipeline roughly ressembles the DFP validation script - """ - args = (GENERAL_ARGS + [ - 'pipeline-ae', - '--columns_file=data/columns_ae_cloudtrail.txt', - '--userid_filter=user321', - '--userid_column_name=user_col', - 'from-cloudtrail', - '--input_glob=input_glob*.csv', - 'train-ae', - '--train_data_glob=train_glob*.csv', - '--seed', - '47', - 'preprocess', - 'inf-pytorch', - 'add-scores', - 'timeseries', - '--resolution=1m', - '--zscore_threshold=8.0', - '--hot_start' - ] + MONITOR_ARGS + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS) - - obj = {} - runner = CliRunner() - result = runner.invoke(commands.cli, args, obj=obj) - assert result.exit_code == 47, result.output - - # Ensure our config is populated correctly - - config = obj["config"] - assert config.mode == PipelineModes.AE - assert config.class_labels == ["reconstruct_loss", "zscore"] - assert config.model_max_batch_size == 1024 - assert config.pipeline_batch_size == 1024 - assert config.num_threads == 12 - - assert isinstance(config.ae, ConfigAutoEncoder) - assert config.ae.userid_column_name == "user_col" - assert config.ae.userid_filter == "user321" - - expected_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt')) - assert config.ae.feature_columns == expected_columns - - pipe = callback_values['pipe'] - assert pipe is not None - - stages = callback_values['stages'] - # Verify the stages are as we expect them, if there is a size-mismatch python will raise a Value error - [cloud_trail, train_ae, process_ae, auto_enc, add_scores, time_series, monitor, validation, serialize, - to_file] = stages - - assert isinstance(cloud_trail, CloudTrailSourceStage) - assert cloud_trail._watcher._input_glob == "input_glob*.csv" - - assert isinstance(train_ae, TrainAEStage) - assert train_ae._train_data_glob == "train_glob*.csv" - assert train_ae._seed == 47 - - assert isinstance(process_ae, PreprocessAEStage) - assert isinstance(auto_enc, AutoEncoderInferenceStage) - assert isinstance(add_scores, AddScoresStage) - - assert isinstance(time_series, TimeSeriesStage) - assert time_series._resolution == '1m' - assert time_series._zscore_threshold == 8.0 - assert time_series._hot_start - - assert isinstance(monitor, MonitorStage) - assert monitor._mc._description == 'Unittest' - assert monitor._mc._smoothing == 0.001 - assert monitor._mc._unit == 'inf' - - assert isinstance(validation, ValidationStage) - assert validation._results_file_name == 'results.json' - assert validation._index_col == '_index_' - - # Click appears to be converting this into a tuple - assert list(validation._exclude_columns) == ['event_dt'] - assert validation._rel_tol == 0.1 - - assert isinstance(serialize, SerializeStage) - - assert isinstance(to_file, WriteToFileStage) - assert to_file._controller._output_file == 'out.csv' - - @pytest.mark.replace_callback('pipeline_ae') - def test_pipeline_ae_all(self, callback_values): - """ - Attempt to add all possible stages to the pipeline_ae, even if the pipeline doesn't - actually make sense, just test that cli could assemble it - """ - args = (GENERAL_ARGS + [ - 'pipeline-ae', - '--columns_file=data/columns_ae_cloudtrail.txt', - '--userid_filter=user321', - '--userid_column_name=user_col', - 'from-cloudtrail', - '--input_glob=input_glob*.csv', - 'add-class', - 'unittest-conv-msg', - 'filter', - 'train-ae', - '--train_data_glob=train_glob*.csv', - '--seed', - '47', - 'preprocess', - 'inf-pytorch', - 'add-scores' - ] + ['timeseries', '--resolution=1m', '--zscore_threshold=8.0', '--hot_start'] + MONITOR_ARGS + VALIDATE_ARGS + - ['serialize'] + TO_FILE_ARGS + TO_KAFKA_ARGS) - - runner = CliRunner() - result = runner.invoke(commands.cli, args) - - assert result.exit_code == 47, result.output - - stages = callback_values['stages'] - # Verify the stages are as we expect them, if there is a size-mismatch python will raise a Value error - [ - cloud_trail, - add_class, - conv_msg, - filter_stage, - train_ae, - process_ae, - auto_enc, - add_scores, - time_series, - monitor, - validation, - serialize, - to_file, - to_kafka - ] = stages - - assert isinstance(cloud_trail, CloudTrailSourceStage) - assert cloud_trail._watcher._input_glob == "input_glob*.csv" - - assert isinstance(add_class, AddClassificationsStage) - assert isinstance(conv_msg, ConvMsg) - assert isinstance(filter_stage, FilterDetectionsStage) - - assert isinstance(train_ae, TrainAEStage) - assert train_ae._train_data_glob == "train_glob*.csv" - assert train_ae._seed == 47 - - assert isinstance(process_ae, PreprocessAEStage) - assert isinstance(auto_enc, AutoEncoderInferenceStage) - assert isinstance(add_scores, AddScoresStage) - - assert isinstance(time_series, TimeSeriesStage) - assert time_series._resolution == '1m' - assert time_series._zscore_threshold == 8.0 - assert time_series._hot_start - - assert isinstance(monitor, MonitorStage) - assert monitor._mc._description == 'Unittest' - assert monitor._mc._smoothing == 0.001 - assert monitor._mc._unit == 'inf' - - assert isinstance(validation, ValidationStage) - assert validation._results_file_name == 'results.json' - assert validation._index_col == '_index_' - - # Click appears to be converting this into a tuple - assert list(validation._exclude_columns) == ['event_dt'] - assert validation._rel_tol == 0.1 - - assert isinstance(serialize, SerializeStage) - - assert isinstance(to_file, WriteToFileStage) - assert to_file._controller._output_file == 'out.csv' - - assert isinstance(to_kafka, WriteToKafkaStage) - assert to_kafka._kafka_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' - assert to_kafka._output_topic == 'test_topic' - @pytest.mark.replace_callback('pipeline_fil') def test_pipeline_fil(self, config, callback_values): """ @@ -1025,64 +838,3 @@ def test_pipeline_fil_relative_path_precedence(self, config: Config, tmp_path: s assert config.class_labels == test_labels assert config.fil.feature_columns == test_columns - - # pylint: disable=unused-argument - @pytest.mark.replace_callback('pipeline_ae') - def test_pipeline_ae_relative_path_precedence(self, config: Config, tmp_path: str, callback_values: dict): - """ - Ensure that relative paths are choosen over the morpheus data directory paths - """ - - labels_file = "data/labels_ae.txt" - columns_file = "data/columns_ae_cloudtrail.txt" - - labels_file_local = os.path.join(tmp_path, labels_file) - columns_file_local = os.path.join(tmp_path, columns_file) - - os.makedirs(os.path.join(tmp_path, "data"), exist_ok=True) - - # Use different labels - test_labels = ["label1"] - - # Overwrite the copied labels - with open(labels_file_local, mode="w", encoding='UTF-8') as f: - f.writelines("\n".join(test_labels)) - - # Use different labels - test_columns = [f"column{i}" for i in range(33)] - - # Overwrite the copied labels - with open(columns_file_local, mode="w", encoding='UTF-8') as f: - f.writelines("\n".join(test_columns)) - - args = (GENERAL_ARGS + [ - 'pipeline-ae', - '--userid_filter=user321', - '--userid_column_name=user_col', - f"--labels_file={labels_file}", - f"--columns_file={columns_file}", - 'from-cloudtrail', - '--input_glob=input_glob*.csv', - 'train-ae', - '--train_data_glob=train_glob*.csv', - '--seed', - '47', - 'preprocess', - 'inf-pytorch', - 'add-scores', - 'timeseries', - '--resolution=1m', - '--zscore_threshold=8.0', - '--hot_start' - ] + MONITOR_ARGS + VALIDATE_ARGS + ['serialize'] + TO_FILE_ARGS) - - obj = {} - runner = CliRunner() - result = runner.invoke(commands.cli, args, obj=obj) - assert result.exit_code == 47, result.output - - # Ensure our config is populated correctly - config = obj["config"] - assert config.class_labels == test_labels - - assert config.ae.feature_columns == test_columns diff --git a/tests/morpheus/test_config.py b/tests/morpheus/test_config.py index f958ce2f8b..746acf3771 100755 --- a/tests/morpheus/test_config.py +++ b/tests/morpheus/test_config.py @@ -88,7 +88,7 @@ def test_auto_encoder(): def test_pipeline_modes(): - expected = {"OTHER", "NLP", "FIL", "AE"} + expected = {"OTHER", "NLP", "FIL"} entries = set(pm.name for pm in morpheus.config.PipelineModes) assert entries.issuperset(expected) diff --git a/tests/morpheus_dfp/test_dfp.py b/tests/morpheus_dfp/test_dfp.py deleted file mode 100755 index fa51270930..0000000000 --- a/tests/morpheus_dfp/test_dfp.py +++ /dev/null @@ -1,304 +0,0 @@ -#!/usr/bin/env python -# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -from unittest import mock - -import numpy as np -import pandas as pd -import pytest - -from _utils import TEST_DIRS -from _utils import calc_error_val -from morpheus.common import TypeId -from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder -from morpheus.config import PipelineModes -from morpheus.messages import ControlMessage -from morpheus.messages.message_meta import MessageMeta -from morpheus.pipeline import LinearPipeline -from morpheus.stages.general.monitor_stage import MonitorStage -from morpheus.stages.inference.auto_encoder_inference_stage import AutoEncoderInferenceStage -from morpheus.stages.input.cloud_trail_source_stage import CloudTrailSourceStage -from morpheus.stages.output.write_to_file_stage import WriteToFileStage -from morpheus.stages.postprocess.add_scores_stage import AddScoresStage -from morpheus.stages.postprocess.serialize_stage import SerializeStage -from morpheus.stages.postprocess.timeseries_stage import TimeSeriesStage -from morpheus.stages.postprocess.validation_stage import ValidationStage -from morpheus.stages.preprocess import preprocess_ae_stage -from morpheus.stages.preprocess import train_ae_stage - -# End-to-end test intended to imitate the DFP validation test - - -@pytest.mark.slow -@pytest.mark.gpu_mode -@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage]) -@pytest.mark.usefixtures("reload_modules") -@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder') -def test_dfp_roleg(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int): - tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_tensor.csv'), delimiter=',') - anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_anomaly_score.csv'), delimiter=',') - exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_exp_results.csv')) - - mock_input_tensor = mock.MagicMock() - mock_input_tensor.return_value = mock_input_tensor - mock_input_tensor.detach.return_value = tensor_data - - mock_ae.return_value = mock_ae - mock_ae.build_input_tensor.return_value = mock_input_tensor - mock_ae.get_anomaly_score.return_value = anomaly_score - mock_ae.get_results.return_value = exp_results - - config.mode = PipelineModes.AE - config.class_labels = ["reconstruct_loss", "zscore"] - config.model_max_batch_size = 1024 - config.pipeline_batch_size = 1024 - config.feature_length = 256 - config.edge_buffer_size = 128 - config.num_threads = 1 - - config.ae = ConfigAutoEncoder() - config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" - config.ae.userid_filter = "role-g" - config.ae.timestamp_column_name = "event_dt" - - with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh: - config.ae.feature_columns = [x.strip() for x in fh.readlines()] - - input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - - out_file = os.path.join(tmp_path, 'results.csv') - val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'dfp-cloudtrail-role-g-validation-data-output.csv') - results_file_name = os.path.join(tmp_path, 'results.json') - - pipe = LinearPipeline(config) - pipe.set_source(CloudTrailSourceStage(config, input_glob=input_glob, sort_glob=True)) - pipe.add_stage( - train_ae_stage.TrainAEStage( - config, - train_data_glob=train_data_glob, - source_stage_class="morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", - seed=42, - sort_glob=True)) - pipe.add_stage(preprocess_ae_stage.PreprocessAEStage(config)) - pipe.add_stage(AutoEncoderInferenceStage(config)) - pipe.add_stage(AddScoresStage(config, probs_type=TypeId.FLOAT64)) - pipe.add_stage( - TimeSeriesStage(config, - resolution="1m", - min_window="12 h", - hot_start=True, - cold_end=False, - filter_percent=90.0, - zscore_threshold=8.0)) - pipe.add_stage( - MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) - pipe.add_stage( - ValidationStage(config, - val_file_name=val_file_name, - results_file_name=results_file_name, - index_col="_index_", - exclude=("event_dt", "zscore"), - rel_tol=0.1)) - - pipe.add_stage(SerializeStage(config, include=[])) - pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) - - pipe.run() - - mock_ae.fit.assert_called_once() - mock_ae.build_input_tensor.assert_called_once() - mock_ae.get_anomaly_score.assert_called() - mock_ae.get_results.assert_called_once() - - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 - - -@pytest.mark.slow -@pytest.mark.gpu_mode -@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage]) -@pytest.mark.usefixtures("reload_modules") -@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder') -def test_dfp_user123(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int): - tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',') - anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',') - exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv')) - - mock_input_tensor = mock.MagicMock() - mock_input_tensor.return_value = mock_input_tensor - mock_input_tensor.detach.return_value = tensor_data - - mock_ae.return_value = mock_ae - mock_ae.build_input_tensor.return_value = mock_input_tensor - mock_ae.get_anomaly_score.return_value = anomaly_score - mock_ae.get_results.return_value = exp_results - - config.mode = PipelineModes.AE - config.class_labels = ["reconstruct_loss", "zscore"] - config.model_max_batch_size = 1024 - config.pipeline_batch_size = 1024 - config.edge_buffer_size = 128 - config.num_threads = 1 - - config.ae = ConfigAutoEncoder() - config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" - config.ae.userid_filter = "user123" - config.ae.timestamp_column_name = "event_dt" - - with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh: - config.ae.feature_columns = [x.strip() for x in fh.readlines()] - - input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - out_file = os.path.join(tmp_path, 'results.csv') - val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'dfp-cloudtrail-user123-validation-data-output.csv') - results_file_name = os.path.join(tmp_path, 'results.json') - - pipe = LinearPipeline(config) - pipe.set_source(CloudTrailSourceStage(config, input_glob=input_glob, sort_glob=True)) - pipe.add_stage( - train_ae_stage.TrainAEStage( - config, - train_data_glob=train_data_glob, - source_stage_class="morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", - seed=42, - sort_glob=True)) - pipe.add_stage(preprocess_ae_stage.PreprocessAEStage(config)) - pipe.add_stage(AutoEncoderInferenceStage(config)) - pipe.add_stage(AddScoresStage(config, probs_type=TypeId.FLOAT64)) - pipe.add_stage( - TimeSeriesStage(config, - resolution="1m", - min_window="12 h", - hot_start=True, - cold_end=False, - filter_percent=90.0, - zscore_threshold=8.0)) - pipe.add_stage( - MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) - pipe.add_stage( - ValidationStage(config, - val_file_name=val_file_name, - results_file_name=results_file_name, - index_col="_index_", - exclude=("event_dt", "zscore"), - rel_tol=0.1)) - pipe.add_stage(SerializeStage(config, include=[])) - pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) - - pipe.run() - - mock_ae.fit.assert_called_once() - mock_ae.build_input_tensor.assert_called_once() - mock_ae.get_anomaly_score.assert_called() - mock_ae.get_results.assert_called_once() - - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 - - -@pytest.mark.slow -@pytest.mark.gpu_mode -@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage]) -@pytest.mark.usefixtures("reload_modules") -@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder') -def test_dfp_user123_multi_segment(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int): - tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',') - anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',') - exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv')) - - mock_input_tensor = mock.MagicMock() - mock_input_tensor.return_value = mock_input_tensor - mock_input_tensor.detach.return_value = tensor_data - - mock_ae.return_value = mock_ae - mock_ae.build_input_tensor.return_value = mock_input_tensor - mock_ae.get_anomaly_score.return_value = anomaly_score - mock_ae.get_results.return_value = exp_results - - config.mode = PipelineModes.AE - config.class_labels = ["reconstruct_loss", "zscore"] - config.model_max_batch_size = 1024 - config.pipeline_batch_size = 1024 - config.edge_buffer_size = 128 - config.num_threads = 1 - - config.ae = ConfigAutoEncoder() - config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" - config.ae.userid_filter = "user123" - config.ae.timestamp_column_name = "event_dt" - - with open(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt'), encoding='UTF-8') as fh: - config.ae.feature_columns = [x.strip() for x in fh.readlines()] - - input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - out_file = os.path.join(tmp_path, 'results.csv') - val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'dfp-cloudtrail-user123-validation-data-output.csv') - results_file_name = os.path.join(tmp_path, 'results.json') - - pipe = LinearPipeline(config) - pipe.set_source(CloudTrailSourceStage(config, input_glob=input_glob, sort_glob=True)) - pipe.add_segment_boundary(ControlMessage) # Boundary 1 - pipe.add_stage( - train_ae_stage.TrainAEStage( - config, - train_data_glob=train_data_glob, - source_stage_class="morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", - seed=42, - sort_glob=True)) - pipe.add_segment_boundary(ControlMessage) # Boundary 2 - pipe.add_stage(preprocess_ae_stage.PreprocessAEStage(config)) - pipe.add_segment_boundary(ControlMessage) # Boundary 3 - pipe.add_stage(AutoEncoderInferenceStage(config)) - pipe.add_segment_boundary(ControlMessage) # Boundary 4 - pipe.add_stage(AddScoresStage(config, probs_type=TypeId.FLOAT64)) - pipe.add_segment_boundary(ControlMessage) # Boundary 5 - pipe.add_stage( - TimeSeriesStage(config, - resolution="1m", - min_window="12 h", - hot_start=True, - cold_end=False, - filter_percent=90.0, - zscore_threshold=8.0)) - pipe.add_segment_boundary(ControlMessage) # Boundary 6 - pipe.add_stage( - MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) - pipe.add_stage( - ValidationStage(config, - val_file_name=val_file_name, - results_file_name=results_file_name, - index_col="_index_", - exclude=("event_dt", "zscore"), - rel_tol=0.1)) - pipe.add_segment_boundary(ControlMessage) # Boundary 7 - pipe.add_stage(SerializeStage(config, include=[])) - pipe.add_segment_boundary(MessageMeta) # Boundary 8 - pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) - - pipe.run() - - mock_ae.fit.assert_called_once() - mock_ae.build_input_tensor.assert_called_once() - mock_ae.get_anomaly_score.assert_called() - mock_ae.get_results.assert_called_once() - - results = calc_error_val(results_file_name) - assert results.diff_rows == 0 diff --git a/tests/morpheus_dfp/test_dfp_kafka.py b/tests/morpheus_dfp/test_dfp_kafka.py deleted file mode 100755 index 98ef108350..0000000000 --- a/tests/morpheus_dfp/test_dfp_kafka.py +++ /dev/null @@ -1,256 +0,0 @@ -#!/usr/bin/env python -# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import typing -from io import StringIO -from unittest import mock - -import numpy as np -import pandas as pd -import pytest - -from _utils import TEST_DIRS -from _utils.dataset_manager import DatasetManager -from _utils.kafka import KafkaTopics -from morpheus.cli import commands -from morpheus.common import TypeId -from morpheus.config import Config -from morpheus.config import ConfigAutoEncoder -from morpheus.config import PipelineModes -from morpheus.io.utils import filter_null_data -from morpheus.pipeline import LinearPipeline -from morpheus.stages.general.monitor_stage import MonitorStage -from morpheus.stages.inference.auto_encoder_inference_stage import AutoEncoderInferenceStage -from morpheus.stages.input.cloud_trail_source_stage import CloudTrailSourceStage -from morpheus.stages.output.write_to_kafka_stage import WriteToKafkaStage -from morpheus.stages.postprocess.add_scores_stage import AddScoresStage -from morpheus.stages.postprocess.serialize_stage import SerializeStage -from morpheus.stages.postprocess.timeseries_stage import TimeSeriesStage -from morpheus.stages.preprocess import preprocess_ae_stage -from morpheus.stages.preprocess import train_ae_stage -from morpheus.utils.compare_df import compare_df -from morpheus.utils.file_utils import load_labels_file - -if (typing.TYPE_CHECKING): - from kafka import KafkaConsumer - - -@pytest.mark.kafka -@pytest.mark.slow -@pytest.mark.gpu_mode -@pytest.mark.reload_modules([commands, preprocess_ae_stage, train_ae_stage]) -@pytest.mark.usefixtures("reload_modules", "loglevel_debug") -@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder') -def test_dfp_roleg(mock_ae: mock.MagicMock, - dataset_pandas: DatasetManager, - config: Config, - kafka_bootstrap_servers: str, - kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer", - morpheus_log_level: int): - tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_tensor.csv'), delimiter=',') - anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_anomaly_score.csv'), delimiter=',') - exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_exp_results.csv')) - - mock_input_tensor = mock.MagicMock() - mock_input_tensor.return_value = mock_input_tensor - mock_input_tensor.detach.return_value = tensor_data - - mock_ae.return_value = mock_ae - mock_ae.build_input_tensor.return_value = mock_input_tensor - mock_ae.get_anomaly_score.return_value = anomaly_score - mock_ae.get_results.return_value = exp_results - - config.mode = PipelineModes.AE - config.class_labels = ["reconstruct_loss", "zscore"] - config.model_max_batch_size = 1024 - config.pipeline_batch_size = 1024 - config.feature_length = 256 - config.edge_buffer_size = 128 - config.num_threads = 1 - - config.ae = ConfigAutoEncoder() - config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" - config.ae.userid_filter = "role-g" - config.ae.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt')) - config.ae.timestamp_column_name = "event_dt" - - input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'dfp-cloudtrail-role-g-validation-data-output.csv') - - pipe = LinearPipeline(config) - pipe.set_source(CloudTrailSourceStage(config, input_glob=input_glob, sort_glob=True)) - pipe.add_stage( - train_ae_stage.TrainAEStage( - config, - train_data_glob=train_data_glob, - source_stage_class="morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", - seed=42, - sort_glob=True)) - pipe.add_stage(preprocess_ae_stage.PreprocessAEStage(config)) - pipe.add_stage(AutoEncoderInferenceStage(config)) - pipe.add_stage(AddScoresStage(config, probs_type=TypeId.FLOAT64)) - pipe.add_stage( - TimeSeriesStage(config, - resolution="1m", - min_window="12 h", - hot_start=True, - cold_end=False, - filter_percent=90.0, - zscore_threshold=8.0)) - pipe.add_stage( - MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) - pipe.add_stage(SerializeStage(config, include=[])) - pipe.add_stage( - WriteToKafkaStage(config, bootstrap_servers=kafka_bootstrap_servers, output_topic=kafka_topics.output_topic)) - - pipe.run() - - mock_ae.fit.assert_called_once() - mock_ae.build_input_tensor.assert_called_once() - mock_ae.get_anomaly_score.assert_called() - mock_ae.get_results.assert_called_once() - - val_df = dataset_pandas[val_file_name] - - output_buf = StringIO() - for rec in kafka_consumer: - output_buf.write(f'{rec.value.decode("utf-8")}\n') - - output_buf.seek(0) - output_df = pd.read_json(output_buf, lines=True) - output_df = filter_null_data(output_df) - - assert len(output_df) == len(val_df) - - results = compare_df( - val_df, - output_df, - replace_idx="_index_", - exclude_columns=[ - 'event_dt', - 'zscore', - 'userAgent' # userAgent in output_df includes escape chars in the string - ], - rel_tol=0.15, - show_report=True) - - assert results['diff_rows'] == 0 - - -@pytest.mark.kafka -@pytest.mark.slow -@pytest.mark.gpu_mode -@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage]) -@pytest.mark.usefixtures("reload_modules", "loglevel_debug") -@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder') -def test_dfp_user123(mock_ae: mock.MagicMock, - dataset_pandas: DatasetManager, - config: Config, - kafka_bootstrap_servers: str, - kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer", - morpheus_log_level: int): - tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',') - anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',') - exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv')) - - mock_input_tensor = mock.MagicMock() - mock_input_tensor.return_value = mock_input_tensor - mock_input_tensor.detach.return_value = tensor_data - - mock_ae.return_value = mock_ae - mock_ae.build_input_tensor.return_value = mock_input_tensor - mock_ae.get_anomaly_score.return_value = anomaly_score - mock_ae.get_results.return_value = exp_results - - config.mode = PipelineModes.AE - config.class_labels = ["reconstruct_loss", "zscore"] - config.model_max_batch_size = 1024 - config.pipeline_batch_size = 1024 - config.edge_buffer_size = 128 - config.num_threads = 1 - - config.ae = ConfigAutoEncoder() - config.ae.userid_column_name = "userIdentitysessionContextsessionIssueruserName" - config.ae.userid_filter = "user123" - config.ae.feature_columns = load_labels_file(os.path.join(TEST_DIRS.data_dir, 'columns_ae_cloudtrail.txt')) - config.ae.timestamp_column_name = "event_dt" - - input_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - train_data_glob = os.path.join(TEST_DIRS.validation_data_dir, "dfp-cloudtrail-*-input.csv") - val_file_name = os.path.join(TEST_DIRS.validation_data_dir, 'dfp-cloudtrail-user123-validation-data-output.csv') - - pipe = LinearPipeline(config) - pipe.set_source(CloudTrailSourceStage(config, input_glob=input_glob, sort_glob=True)) - pipe.add_stage( - train_ae_stage.TrainAEStage( - config, - train_data_glob=train_data_glob, - source_stage_class="morpheus.stages.input.cloud_trail_source_stage.CloudTrailSourceStage", - seed=42, - sort_glob=True)) - pipe.add_stage(preprocess_ae_stage.PreprocessAEStage(config)) - pipe.add_stage(AutoEncoderInferenceStage(config)) - pipe.add_stage(AddScoresStage(config, probs_type=TypeId.FLOAT64)) - pipe.add_stage( - TimeSeriesStage(config, - resolution="1m", - min_window="12 h", - hot_start=True, - cold_end=False, - filter_percent=90.0, - zscore_threshold=8.0)) - pipe.add_stage( - MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) - pipe.add_stage(SerializeStage(config, include=[])) - pipe.add_stage( - WriteToKafkaStage(config, bootstrap_servers=kafka_bootstrap_servers, output_topic=kafka_topics.output_topic)) - - pipe.run() - - mock_ae.fit.assert_called_once() - mock_ae.build_input_tensor.assert_called_once() - mock_ae.get_anomaly_score.assert_called() - mock_ae.get_results.assert_called_once() - - val_df = dataset_pandas[val_file_name] - - output_buf = StringIO() - for rec in kafka_consumer: - output_buf.write(f'{rec.value.decode("utf-8")}\n') - - output_buf.seek(0) - output_df = pd.read_json(output_buf, lines=True) - output_df = filter_null_data(output_df) - - assert len(output_df) == len(val_df) - - results = compare_df( - val_df, - output_df, - replace_idx="_index_", - exclude_columns=[ - 'event_dt', - 'zscore', - 'userAgent' # userAgent in output_df includes escape chars in the string - ], - rel_tol=0.1, - show_report=True) - - assert results['diff_rows'] == 0 diff --git a/tests/tests_data/dfp_roleg_anomaly_score.csv b/tests/tests_data/dfp_roleg_anomaly_score.csv deleted file mode 100644 index 00edc1dc37..0000000000 --- a/tests/tests_data/dfp_roleg_anomaly_score.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:12394244db00812eeeb3ca3b5632f3fcde19108aca4fa13d28fccdf68ed64851 -size 5770 diff --git a/tests/tests_data/dfp_roleg_exp_results.csv b/tests/tests_data/dfp_roleg_exp_results.csv deleted file mode 100644 index 622a2ed1d6..0000000000 --- a/tests/tests_data/dfp_roleg_exp_results.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:845c961529eb37822bb3e12c02584d6a7b781217bd6f2156aa3429a7cd982821 -size 100646 diff --git a/tests/tests_data/dfp_roleg_tensor.csv b/tests/tests_data/dfp_roleg_tensor.csv deleted file mode 100644 index 7de2fdb416..0000000000 --- a/tests/tests_data/dfp_roleg_tensor.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:1bfd2ebcda75e56571066c4bd220a393587763890a29ae98be65cdf58bdaa99c -size 2063055 diff --git a/tests/tests_data/dfp_user123_anomaly_score.csv b/tests/tests_data/dfp_user123_anomaly_score.csv deleted file mode 100644 index 6a0427f5b7..0000000000 --- a/tests/tests_data/dfp_user123_anomaly_score.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:91b4ea4de9c51ca459983383e8583f11edc116537da29e695b4976bcc61df462 -size 15657 diff --git a/tests/tests_data/dfp_user123_exp_results.csv b/tests/tests_data/dfp_user123_exp_results.csv deleted file mode 100644 index ad31dfb805..0000000000 --- a/tests/tests_data/dfp_user123_exp_results.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:63d7188d4ec5f0d6d7f7979d423663190e1934b1d19ea5eca3537c98daa0de28 -size 270285 diff --git a/tests/tests_data/dfp_user123_tensor.csv b/tests/tests_data/dfp_user123_tensor.csv deleted file mode 100644 index 62ddfdab4d..0000000000 --- a/tests/tests_data/dfp_user123_tensor.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:61d09187fb624696a4c248a4222caa7c13df5c3399d967659478809c16628228 -size 1636032