diff --git a/docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md b/docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md index d55f94b86e..c7209c9b9b 100644 --- a/docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md +++ b/docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md @@ -71,7 +71,7 @@ The front-end loader outputs one or more control messages that are passed to the Moreover, the updated pipeline supports human-in-the-loop workflows, such as the ability to manually trigger training or inference tasks against a specific set of data, and the capacity for real-time labeling of production inference events that can be injected back into the training pipeline. -The following content will track the pipeline declared in `examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_streaming_pipeline.py` +The following content will track the pipeline declared in `examples/digital_fingerprinting/production/dfp_integrated_training_streaming_pipeline.py` ```python # Setup and command line argument parsing @@ -115,7 +115,7 @@ For a full introduction to Morpheus modules, refer to the [Python Modules](7_pyt ## DFP Deployment -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_deployment.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py` This is the top level module that encapsulates the entire Digital Fingerprinting pipeline, it is primarily responsible for wrapping the training and inference pipelines, providing the correct module interface, and doing some configuration pre-processing. Since this module is monolithic, it supports a significant number of configuration options; however, the majority of these have intelligent defaults and are not required to be specified. @@ -137,9 +137,9 @@ def dfp_deployment(builder: mrc.Builder): ... # Make an edge between modules - builder.make_edge(fsspec_dataloader_module.output_port("output"), broadcast) - builder.make_edge(broadcast, dfp_training_pipe_module.input_port("input")) - builder.make_edge(broadcast, dfp_inference_pipe_module.input_port("input")) + builder.make_edge(fsspec_dataloader_module.output_port("output"), router) + builder.make_edge(router.get_source("training"), dfp_training_pipe_module.input_port("input")) + builder.make_edge(router.get_source("inference"), dfp_inference_pipe_module.input_port("input")) out_nodes = [dfp_training_pipe_module.output_port("output"), dfp_inference_pipe_module.output_port("output")] @@ -162,14 +162,12 @@ There are a number of modules that are used in both the training and inference p ### DFP Preprocessing -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_preproc.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_preproc.py` -The `dfp_preproc` module is a functional component within the Morpheus framework that combines multiple data filtering and processing pipeline modules related to inference and training. This module simplifies the pipeline by consolidating various modules into a single, cohesive unit. The `dfp_preproc` module supports configuration parameters such as the cache directory, timestamp column name, pre-filter options, batching options, user splitting options, and supported data loaders for various file types. +The `dfp_preproc` module is a functional component within the Morpheus framework that combines multiple processing pipeline modules related to inference and training. This module simplifies the pipeline by consolidating various modules into a single, cohesive unit. The `dfp_preproc` module supports configuration parameters such as the cache directory, timestamp column name, batching options, user splitting options, and supported data loaders for various file types. The module itself consists of a series of chained sub-modules, which are connected in a logical sequence: -- `filter_control_message_module` - - Responsible for early filtering of control messages that should be not processed by the pipeline. - `file_batcher_module` - Responsible for batching files, either into a single control message in the case of an encapsulated training message, or into a series of control messages in the of streaming data. - `file_to_df_dataloader_module` @@ -179,23 +177,6 @@ The module itself consists of a series of chained sub-modules, which are connect For a complete reference, refer to: [`dfp_preproc`](../../modules/examples/digital_fingerprinting/dfp_preproc.md) -```python -@register_module(DFP_PREPROC, MORPHEUS_MODULE_NAMESPACE) -def dfp_preproc(builder: mrc.Builder): - # Setup and configuration parsing - ... - - # Connect the modules. - builder.make_edge(filter_control_message_module.output_port("output"), file_batcher_module.input_port("input")) - builder.make_edge(file_batcher_module.output_port("output"), file_to_df_dataloader_module.input_port("input")) - builder.make_edge(file_to_df_dataloader_module.output_port("output"), dfp_split_users_module.input_port("input")) - - # Register input and output port for a module. - builder.register_module_input("input", filter_control_message_module.input_port("input")) - builder.register_module_output("output", dfp_split_users_module.output_port("output")) - -``` - ### Control Message Filter Source: `morpheus/modules/filter_control_message.py` @@ -216,13 +197,6 @@ In the case of streaming data, the file batcher will operate as it did previousl For a complete reference, refer to: [File Batcher](../../modules/core/file_batcher.md) -```python -@register_module(FILE_BATCHER, MORPHEUS_MODULE_NAMESPACE) -def file_batcher(builder: mrc.Builder): - # Setup and configuration parsing - ... -``` - ### File to DF DataLoader Source: `morpheus/loaders/file_to_df_loader.py` @@ -233,7 +207,7 @@ For a complete reference, refer to: [DataLoader Module](../../modules/core/data_ ### DFP Split Users -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_split_users.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_split_users.py` The `dfp_split_users` module is responsible for splitting the input data based on user IDs. The module provides configuration options, such as fallback username, include generic user, include individual users, and specify lists of user IDs to include or exclude in the output. @@ -241,16 +215,10 @@ The module processes control messages by extracting the user information from th For a complete reference, refer to: [DFP Split Users](../../modules/examples/digital_fingerprinting/dfp_split_users.md) -```python -@register_module(DFP_SPLIT_USERS, MORPHEUS_MODULE_NAMESPACE) -def dfp_split_users(builder: mrc.Builder): - # Setup and configuration parsing - ... -``` ### DFP Rolling Window -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_rolling_window.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_rolling_window.py` The `dfp_rolling_window` module is responsible for maintaining a rolling window of historical data, acting as a streaming caching and batching system. The module provides various configuration options, such as aggregation span, cache directory, caching options, timestamp column name, and trigger conditions. @@ -262,16 +230,10 @@ The Rolling window module has also been updated to support an additional `batch` For a complete reference, refer to: [DFP Rolling Window](../../modules/examples/digital_fingerprinting/dfp_rolling_window.md) -```python -@register_module(DFP_ROLLING_WINDOW, MORPHEUS_MODULE_NAMESPACE) -def dfp_rolling_window(builder: mrc.Builder): - # Setup and configuration parsing - ... -``` ### DFP Data Prep -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_data_prep.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_data_prep.py` The `dfp_data_prep` module is responsible for preparing data for either inference or model training. The module requires a defined schema for data preparation. @@ -279,16 +241,9 @@ The main functionality of the module is in the `process_features` function. For For a complete reference, refer to: [DFP Data Prep](../../modules/examples/digital_fingerprinting/dfp_data_prep.md) -```python -@register_module(DFP_DATA_PREP, MORPHEUS_MODULE_NAMESPACE) -def dfp_data_prep(builder: mrc.Builder): - # Setup and configuration parsing - ... -``` - ## DFP Training Pipeline -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipe.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py` The DFP Training Pipe module is a consolidated module that integrates several DFP pipeline modules that are essential to the training process. This module function provides a single entry point to the training pipeline, simplifying the process of training a model. The module offers configurable parameters for various stages in the pipeline, including data batching, data preprocessing, and data encoding for model training. Additionally, the MLflow model writer options allow for the trained model to be saved for future use. @@ -314,30 +269,24 @@ def dfp_training_pipe(builder: mrc.Builder): ... # Make an edge between the modules. - builder.make_edge(preproc_module.output_port("output"), dfp_rolling_window_module.input_port("input")) - builder.make_edge(dfp_rolling_window_module.output_port("output"), dfp_data_prep_module.input_port("input")) - builder.make_edge(dfp_data_prep_module.output_port("output"), dfp_training_module.input_port("input")) - builder.make_edge(dfp_training_module.output_port("output"), mlflow_model_writer_module.input_port("input")) + builder.make_edge(file_batcher_module.output_port("output"), file_to_df_dataloader_module.input_port("input")) + builder.make_edge(file_to_df_dataloader_module.output_port("output"), file_to_df_monitor_module.input_port("input")) + builder.make_edge(file_to_df_monitor_module.output_port("output"), dfp_split_users_module.input_port("input")) + builder.make_edge(dfp_split_users_module.output_port("output"), dfp_split_users_monitor_module.input_port("input")) # Register input and output port for a module. - builder.register_module_input("input", preproc_module.input_port("input")) - builder.register_module_output("output", mlflow_model_writer_module.output_port("output")) + builder.register_module_input("input", file_batcher_module.input_port("input")) + builder.register_module_output("output", dfp_split_users_monitor_module.output_port("output")) ``` ### DFP Training -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_training.py` The `dfp_training` module function is responsible for training the model. The `on_data` function is defined to handle incoming `ControlMessage` instances. It retrieves the user ID and the input data from the `ControlMessage`, creates an instance of the `AutoEncoder` class with the specified `model_kwargs`, and trains the model on the input data. The output message includes the trained model and metadata. For a complete reference, refer to: [DFP Training](../../modules/examples/digital_fingerprinting/dfp_training.md) -```python -@register_module(DFP_TRAINING, MORPHEUS_MODULE_NAMESPACE) -def dfp_inference(builder: mrc.Builder): - # Setup and config parsing - ... -``` ### MLflow Model Writer @@ -349,16 +298,10 @@ For each `ControlMessage` received, containing a trained model, the function upl For a complete reference, refer to: [MLflow Model Writer](../../modules/core/mlflow_model_writer.md) -```python -@register_module(MLFLOW_MODEL_WRITER, MORPHEUS_MODULE_NAMESPACE) -def mlflow_model_writer(builder: mrc.Builder): - # Setup and configuration parsing - ... -``` ## DFP Inference Pipeline -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipe.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py` The `dfp_inference_pipe` module function consolidates multiple digital fingerprinting pipeline (DFP) modules relevant to the inference process into a single module. Its purpose is to simplify the creation and configuration of an inference pipeline by combining all necessary components. @@ -407,7 +350,7 @@ def dfp_inference_pipe(builder: mrc.Builder): ### DFP Inference -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_inference.py` The `dfp_inference` module function creates an inference module that retrieves trained models and performs inference on the input data. The module requires a `model_name_formatter` and a `fallback_username` to be configured in its parameters. @@ -415,13 +358,6 @@ The function defines a `get_model` method to load the model for a specific user, For a complete reference, refer to: [DFP Inference](../../modules/examples/digital_fingerprinting/dfp_inference.md) -```python -@register_module(DFP_INFERENCE, MORPHEUS_MODULE_NAMESPACE) -def dfp_inference(builder: mrc.Builder): - # Setup and config parsing - ... -``` - ### Filter Detections Source: `morpheus/modules/filter_detections.py` @@ -432,27 +368,15 @@ This module can operate in two modes, set by the copy argument. When `copy=True` The function defines the `find_detections` method to determine the filter source and identify the rows that match the filter criteria. The `filter_copy` and `filter_slice` methods are responsible for handling the filtering process based on the chosen mode. -```python -@register_module(FILTER_DETECTIONS, MORPHEUS_MODULE_NAMESPACE) -def filter_detections(builder: mrc.Builder): - # Setup and config parsing - ... -``` For a complete reference, refer to: [Filter Detections](../../modules/core/filter_detections.md) ### DFP Post Processing -Source: `examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_postprocessing.py` +Source: `python/morpheus_dfp/morpheus_dfp/modules/dfp_postprocessing.py` The `dfp_postprocessing` module function performs post-processing tasks on the input data. -```python -@register_module(DFP_POST_PROCESSING, MORPHEUS_MODULE_NAMESPACE) -def dfp_postprocessing(builder: mrc.Builder): - # Setup and config parsing - ... -``` For a complete reference, refer to: [DFP Post Processing](../../modules/examples/digital_fingerprinting/dfp_postprocessing.md) @@ -466,12 +390,6 @@ The `convert_to_df` function converts a DataFrame to JSON lines. It takes a `Con The module function compiles the include and exclude patterns into regular expressions. It then creates a node using the `convert_to_df` function with the compiled include and exclude patterns and the specified columns. -```python -@register_module(SERIALIZE, MORPHEUS_MODULE_NAMESPACE) -def serialize(builder: mrc.Builder): - # Setup and config parsing - ... -``` For a complete reference, refer to: [Serialize](../../modules/core/serialize.md) @@ -483,12 +401,6 @@ The `write_to_file` module function writes all messages to a file. The `convert_to_strings` function takes a DataFrame (either pandas or cuDF) and converts it into the appropriate string format based on the file type (JSON or CSV). It checks whether to include the index column or not. -```python -@register_module(WRITE_TO_FILE, MORPHEUS_MODULE_NAMESPACE) -def write_to_file(builder: mrc.Builder): - # Setup and config parsing - ... -``` For a complete reference, refer to: [Write to File](../../modules/core/write_to_file.md) diff --git a/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md b/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md index d60f64f19e..2bba0ded89 100644 --- a/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md +++ b/docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md @@ -174,7 +174,7 @@ Subclass of `DateTimeColumn`, counts the unique occurrences of a value in `group ![Input Stages](img/dfp_input_config.png) #### Source Stage (`MultiFileSource`) -The `MultiFileSource` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/multi_file_source.py`) receives a path or list of paths (`filenames`), and will collectively be emitted into the pipeline as an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) object. The paths may include wildcards `*` as well as URLs (ex: `s3://path`) to remote storage providers such as S3, FTP, GCP, Azure, Databricks and others as defined by [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files). In addition to this paths can be cached locally by prefixing them with `filecache::` (ex: `filecache::s3://bucket-name/key-name`). +The `MultiFileSource` (`python/morpheus/morpheus/modules/input/multi_file_source.py`) receives a path or list of paths (`filenames`), and will collectively be emitted into the pipeline as an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) object. The paths may include wildcards `*` as well as URLs (ex: `s3://path`) to remote storage providers such as S3, FTP, GCP, Azure, Databricks and others as defined by [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files). In addition to this paths can be cached locally by prefixing them with `filecache::` (ex: `filecache::s3://bucket-name/key-name`). > **Note:** This stage does not actually download the data files, allowing the file list to be filtered and batched prior to being downloaded. @@ -187,7 +187,7 @@ The `MultiFileSource` (`examples/digital_fingerprinting/production/morpheus/dfp/ #### File Batcher Stage (`DFPFileBatcherStage`) -The `DFPFileBatcherStage` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py`) groups data in the incoming `DataFrame` in batches of a time period (per day default), and optionally filtering incoming data to a specific time window. This stage can potentially improve performance by combining multiple small files into a single batch. This stage assumes that the date of the logs can be easily inferred such as encoding the creation time in the file name (for example, `AUTH_LOG-2022-08-21T22.05.23Z.json`), or using the modification time as reported by the file system. The actual method for extracting the date is encoded in a user-supplied `date_conversion_func` function (more on this later). +The `DFPFileBatcherStage` (`python/morpheus_dfp/morpheus_dfp/stages/dfp_file_batcher_stage.py`) groups data in the incoming `DataFrame` in batches of a time period (per day default), and optionally filtering incoming data to a specific time window. This stage can potentially improve performance by combining multiple small files into a single batch. This stage assumes that the date of the logs can be easily inferred such as encoding the creation time in the file name (for example, `AUTH_LOG-2022-08-21T22.05.23Z.json`), or using the modification time as reported by the file system. The actual method for extracting the date is encoded in a user-supplied `date_conversion_func` function (more on this later). | Argument | Type | Description | | -------- | ---- | ----------- | @@ -219,7 +219,7 @@ pipeline.add_stage( > **Note:** If `date_conversion_func` returns time-zone aware timestamps, then `start_time` and `end_time` if not `None` need to also be timezone aware `datetime` objects. #### File to DataFrame Stage (`DFPFileToDataFrameStage`) -The `DFPFileToDataFrameStage` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py`) stage receives a `list` of an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) and loads them into a single `DataFrame` which is then emitted into the pipeline. When the parent stage is `DFPFileBatcherStage` each batch (typically one day) is concatenated into a single `DataFrame`. If the parent was `MultiFileSource` the entire dataset is loaded into a single `DataFrame`. Because of this, it is important to choose a `period` argument for `DFPFileBatcherStage` small enough such that each batch can fit into memory. +The `DFPFileToDataFrameStage` (`python/morpheus_dfp/morpheus_dfp/stages/dfp_file_to_df.py`) stage receives a `list` of an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) and loads them into a single `DataFrame` which is then emitted into the pipeline. When the parent stage is `DFPFileBatcherStage` each batch (typically one day) is concatenated into a single `DataFrame`. If the parent was `MultiFileSource` the entire dataset is loaded into a single `DataFrame`. Because of this, it is important to choose a `period` argument for `DFPFileBatcherStage` small enough such that each batch can fit into memory. | Argument | Type | Description | | -------- | ---- | ----------- | @@ -251,7 +251,7 @@ This final stage will write all received messages to a single output file in eit | `overwrite` | `bool` | Optional, defaults to `False`. If the file specified in `filename` already exists, it will be overwritten if this option is set to `True` | #### Write to S3 Stage (`WriteToS3Stage`) -The {py:obj}`~dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resulting anomaly detections to S3. The `WriteToS3Stage` decouples the S3 specific operations from the Morpheus stage, and as such receives an `s3_writer` argument. +The {py:obj}`~morpheus_dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resulting anomaly detections to S3. The `WriteToS3Stage` decouples the S3 specific operations from the Morpheus stage, and as such receives an `s3_writer` argument. | Argument | Type | Description | | -------- | ---- | ----------- | @@ -262,7 +262,7 @@ The {py:obj}`~dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resu These stages are common to both the training and inference pipelines, unlike the input and output stages these are specific to the DFP pipeline and intended to be configured but not replaceable. #### Split Users Stage (`DFPSplitUsersStage`) -The {py:obj}`~dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receives an incoming `DataFrame` and emits a `list` of `DFPMessageMeta` where each `DFPMessageMeta` represents the records associated for a given user. This allows for downstream stages to perform all necessary operations on a per user basis. +The {py:obj}`~morpheus_dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receives an incoming `DataFrame` and emits a `list` of `DFPMessageMeta` where each `DFPMessageMeta` represents the records associated for a given user. This allows for downstream stages to perform all necessary operations on a per user basis. | Argument | Type | Description | | -------- | ---- | ----------- | @@ -273,7 +273,7 @@ The {py:obj}`~dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receive | `only_users` | `List[str]` or `None` | Limit records to a specific list of users, when `include_generic` is `True` the generic user's records will also be limited to the users in this list. Mutually exclusive with `skip_users`. | #### Rolling Window Stage (`DFPRollingWindowStage`) -The {py:obj}`~dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage performs several key pieces of functionality for DFP. +The {py:obj}`~morpheus_dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage performs several key pieces of functionality for DFP. 1. This stage keeps a moving window of logs on a per user basis @@ -299,7 +299,7 @@ The {py:obj}`~dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage p > **Note:** this stage computes a row hash for the first and last rows of the incoming `DataFrame` as such all data contained must be hashable, any non-hashable values such as `lists` should be dropped or converted into hashable types in the `DFPFileToDataFrameStage`. #### Preprocessing Stage (`DFPPreprocessingStage`) -The {py:obj}`~dfp.stages.dfp_preprocessing_stage.DFPPreprocessingStage` stage, the actual logic of preprocessing is defined in the `input_schema` argument. Since this stage occurs in the pipeline after the `DFPFileBatcherStage` and `DFPSplitUsersStage` stages all records in the incoming `DataFrame` correspond to only a single user within a specific time period allowing for columns to be computer on a per-user per-time period basis such as the `logcount` and `locincrement` features mentioned above. Making the type of processing performed in this stage different from those performed in the `DFPFileToDataFrameStage`. +The {py:obj}`~morpheus_dfp.stages.dfp_preprocessing_stage.DFPPreprocessingStage` stage, the actual logic of preprocessing is defined in the `input_schema` argument. Since this stage occurs in the pipeline after the `DFPFileBatcherStage` and `DFPSplitUsersStage` stages all records in the incoming `DataFrame` correspond to only a single user within a specific time period allowing for columns to be computer on a per-user per-time period basis such as the `logcount` and `locincrement` features mentioned above. Making the type of processing performed in this stage different from those performed in the `DFPFileToDataFrameStage`. | Argument | Type | Description | | -------- | ---- | ----------- | @@ -316,7 +316,7 @@ After training the generic model, individual user models can be trained. Individ ### Training Stages #### Training Stage (`DFPTraining`) -The {py:obj}`~dfp.stages.dfp_training.DFPTraining` trains a model for each incoming `DataFrame` and emits an instance of `morpheus.messages.ControlMessage` containing the trained model. +The {py:obj}`~morpheus_dfp.stages.dfp_training.DFPTraining` trains a model for each incoming `DataFrame` and emits an instance of `morpheus.messages.ControlMessage` containing the trained model. | Argument | Type | Description | | -------- | ---- | ----------- | @@ -326,7 +326,7 @@ The {py:obj}`~dfp.stages.dfp_training.DFPTraining` trains a model for each incom | `validation_size` | `float` | Proportion of the input dataset to use for training validation. Should be between 0.0 and 1.0. Default is 0.0.| #### MLflow Model Writer Stage (`DFPMLFlowModelWriterStage`) -The {py:obj}`~dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stage publishes trained models into MLflow, skipping any model which lacked sufficient training data (current required minimum is 300 log records). +The {py:obj}`~morpheus_dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stage publishes trained models into MLflow, skipping any model which lacked sufficient training data (current required minimum is 300 log records). | Argument | Type | Description | | -------- | ---- | ----------- | @@ -343,7 +343,7 @@ The {py:obj}`~dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stag ### Inference Stages #### Inference Stage (`DFPInferenceStage`) -The {py:obj}`~dfp.stages.dfp_inference_stage.DFPInferenceStage` stage loads models from MLflow and performs inferences against those models. This stage emits a message containing the original `DataFrame` along with new columns containing the z score (`mean_abs_z`), as well as the name and version of the model that generated that score (`model_version`). For each feature in the model, three additional columns will also be added: +The {py:obj}`~morpheus_dfp.stages.dfp_inference_stage.DFPInferenceStage` stage loads models from MLflow and performs inferences against those models. This stage emits a message containing the original `DataFrame` along with new columns containing the z score (`mean_abs_z`), as well as the name and version of the model that generated that score (`model_version`). For each feature in the model, three additional columns will also be added: * `_loss` : The loss * `_z_loss` : The loss z-score * `_pred` : The predicted value @@ -370,4 +370,4 @@ The {py:obj}`~morpheus.stages.postprocess.filter_detections_stage.FilterDetectio | `field_name` | `str` | `probs` | Name of the tensor (`filter_source=FilterSource.TENSOR`) or DataFrame column (`filter_source=FilterSource.DATAFRAME`) to use as the filter criteria. | #### Post Processing Stage (`DFPPostprocessingStage`) -The {py:obj}`~dfp.stages.dfp_postprocessing_stage.DFPPostprocessingStage` stage adds a new `event_time` column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any `NAN` values with the a string value of `'NaN'`. +The {py:obj}`~morpheus_dfp.stages.dfp_postprocessing_stage.DFPPostprocessingStage` stage adds a new `event_time` column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any `NAN` values with the a string value of `'NaN'`. diff --git a/docs/source/extra_info/troubleshooting.md b/docs/source/extra_info/troubleshooting.md index 5f3b14be20..4f25115ca8 100644 --- a/docs/source/extra_info/troubleshooting.md +++ b/docs/source/extra_info/troubleshooting.md @@ -48,7 +48,7 @@ Error trying to get model Traceback (most recent call last): -File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_inference_stage.py", line 101, in on_data +File "/workspace/python/morpheus_dfp/morpheus_dfp/stages/dfp_inference_stage.py", line 101, in on_data loaded_model = model_cache.load_model(self._client) ``` diff --git a/examples/developer_guide/3_simple_cpp_stage/CMakeLists.txt b/examples/developer_guide/3_simple_cpp_stage/CMakeLists.txt index 97c7cc554e..4dc2b0875b 100644 --- a/examples/developer_guide/3_simple_cpp_stage/CMakeLists.txt +++ b/examples/developer_guide/3_simple_cpp_stage/CMakeLists.txt @@ -38,6 +38,10 @@ set(CMAKE_INSTALL_RPATH "$ORIGIN") # Set the option prefix to match the outer project before including. Must be before find_package(morpheus) set(OPTION_PREFIX "MORPHEUS") + +# Set the policy to allow for CMP0144, avoids warning about MORPHEUS_ROOT being set +cmake_policy(SET CMP0144 NEW) + find_package(morpheus REQUIRED) morpheus_utils_initialize_cpm(MORPHEUS_CACHE_DIR) diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/CMakeLists.txt b/examples/developer_guide/4_rabbitmq_cpp_stage/CMakeLists.txt index 4d50e40eb2..cef843fc80 100644 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/CMakeLists.txt +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/CMakeLists.txt @@ -38,6 +38,10 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # Set the option prefix to match the outer project before including. Must be before find_package(morpheus) set(OPTION_PREFIX "MORPHEUS") + +# Set the policy to allow for CMP0144, avoids warning about MORPHEUS_ROOT being set +cmake_policy(SET CMP0144 NEW) + find_package(morpheus REQUIRED) morpheus_utils_initialize_cpm(MORPHEUS_CACHE_DIR) diff --git a/examples/digital_fingerprinting/production/dfp_integrated_training_batch_pipeline.py b/examples/digital_fingerprinting/production/dfp_integrated_training_batch_pipeline.py index 7782961760..fa8f9c7189 100644 --- a/examples/digital_fingerprinting/production/dfp_integrated_training_batch_pipeline.py +++ b/examples/digital_fingerprinting/production/dfp_integrated_training_batch_pipeline.py @@ -13,7 +13,6 @@ # limitations under the License. import logging -import typing from datetime import datetime import click @@ -120,17 +119,17 @@ @click.option('--silence_monitors', flag_value=True, help='Controls whether monitors will be verbose.') def run_pipeline(source: str, train_users: str, - skip_user: typing.Tuple[str], - only_user: typing.Tuple[str], - start_time: datetime, + skip_user: tuple[str], + only_user: tuple[str], + start_time: datetime | None, duration: str, cache_dir: str, log_level: int, sample_rate_s: int, - tracking_uri, - silence_monitors, - mlflow_experiment_name_template, - mlflow_model_name_template, + tracking_uri: str, + silence_monitors: bool, + mlflow_experiment_name_template: str | None, + mlflow_model_name_template: str | None, **kwargs): if (skip_user and only_user): logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") diff --git a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake index 7d07b41bbd..2aa8a46c68 100644 --- a/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake +++ b/python/morpheus/morpheus/_lib/cmake/libmorpheus.cmake @@ -112,6 +112,9 @@ add_dependencies(morpheus ${cudf_helpers_target}) # In debug mode, dont allow missing symbols target_link_options(morpheus PUBLIC "$<$:-Wl,--no-allow-shlib-undefined>") +# Avoid warning from the lto-wrapper about serial compilation +target_link_options(morpheus PUBLIC "-flto=auto") + # Generates an include file for specifying external linkage since everything is hidden by default generate_export_header(morpheus NO_EXPORT_MACRO_NAME diff --git a/python/morpheus/morpheus/utils/compare_df.py b/python/morpheus/morpheus/utils/compare_df.py index 605b515edf..5541e0ecd4 100755 --- a/python/morpheus/morpheus/utils/compare_df.py +++ b/python/morpheus/morpheus/utils/compare_df.py @@ -130,6 +130,7 @@ def compare_df(df_a: pd.DataFrame, total_rows = len(df_a_filtered) diff_rows = len(df_a_filtered) - int(comparison.count_matching_rows()) + diff_cols = len(extra_columns) + len(missing_columns) if (comparison.matches()): logger.info("Results match validation dataset") @@ -141,7 +142,7 @@ def compare_df(df_a: pd.DataFrame, mismatch_df = merged.loc[mismatched_idx] - if diff_rows > 0: + if diff_rows > 0 or diff_cols > 0: logger.debug("Results do not match. Diff %d/%d (%f %%). First 10 mismatched rows:", diff_rows, total_rows, @@ -160,5 +161,5 @@ def compare_df(df_a: pd.DataFrame, "matching_cols": list(same_columns), "extra_cols": list(extra_columns), "missing_cols": list(missing_columns), - "diff_cols": len(extra_columns) + len(missing_columns) + "diff_cols": diff_cols } diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py index 21a0cfb96d..0566aaa0d7 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py @@ -15,8 +15,9 @@ import logging import mrc -from mrc.core.node import Broadcast +from mrc.core.node import Router +from morpheus.messages import ControlMessage from morpheus.utils.loader_ids import FSSPEC_LOADER from morpheus.utils.module_ids import DATA_LOADER from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE @@ -165,7 +166,7 @@ def dfp_deployment(builder: mrc.Builder): # | # v # +-------------------------------------+ - # | broadcast | + # | router | # +-------------------------------------+ # / \ # / \ @@ -205,13 +206,21 @@ def dfp_deployment(builder: mrc.Builder): "dfp_inference_pipe", dfp_inference_pipe_conf) - # Create broadcast node to fork the pipeline. - broadcast = Broadcast(builder, "broadcast") + def router_key_fn(cm: ControlMessage) -> str: + if cm.has_task("training"): + return "training" + if cm.has_task("inference"): + return "inference" + + raise ValueError("Control message does not have a valid task.") + + # Create router node to fork the pipeline. + router = Router(builder, "router", router_keys=["training", "inference"], key_fn=router_key_fn) # Make an edge between modules - builder.make_edge(fsspec_dataloader_module.output_port("output"), broadcast) - builder.make_edge(broadcast, dfp_training_pipe_module.input_port("input")) - builder.make_edge(broadcast, dfp_inference_pipe_module.input_port("input")) + builder.make_edge(fsspec_dataloader_module.output_port("output"), router) + builder.make_edge(router.get_source("training"), dfp_training_pipe_module.input_port("input")) + builder.make_edge(router.get_source("inference"), dfp_inference_pipe_module.input_port("input")) out_nodes = [dfp_training_pipe_module.output_port("output"), dfp_inference_pipe_module.output_port("output")] diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py index d18809cb2b..455c7b9552 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_inference_pipe.py @@ -211,13 +211,14 @@ def dfp_inference_pipe(builder: mrc.Builder): ts_column_name = config.get("timestamp_column_name") monitor_options = config.get("monitor_options", {}) + preproc_monitor_options = monitor_options.copy() + if "name_postfix" not in preproc_monitor_options: + preproc_monitor_options["name_postfix"] = "[inference_pipe]" + preproc_options = { "batching_options": config.get("batching_options", {}), "cache_dir": cache_dir, - "monitor_options": monitor_options, - "pre_filter_options": { - "enable_task_filtering": True, "filter_task_type": "inference" - }, + "monitor_options": preproc_monitor_options, "timestamp_column_name": ts_column_name, "user_splitting_options": config.get("user_splitting_options", {}), } diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_preproc.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_preproc.py index 54f934495f..eec215393f 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_preproc.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_preproc.py @@ -20,7 +20,6 @@ from morpheus.utils.loader_ids import FILE_TO_DF_LOADER from morpheus.utils.module_ids import DATA_LOADER from morpheus.utils.module_ids import FILE_BATCHER -from morpheus.utils.module_ids import FILTER_CONTROL_MESSAGE from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_utils import merge_dictionaries from morpheus.utils.module_utils import register_module @@ -46,7 +45,6 @@ def dfp_preproc(builder: mrc.Builder): Configurable parameters: - cache_dir (str): Directory for caching intermediate results - timestamp_column_name (str): Name of the column containing timestamps - - pre_filter_options (dict): Options for pre-filtering control messages - batching_options (dict): Options for batching files - user_splitting_options (dict): Options for splitting data by user - supported_loaders (dict): Supported data loaders for different file types @@ -57,11 +55,6 @@ def dfp_preproc(builder: mrc.Builder): # | # v # +-------------------------------+ - # | filter_control_message_module | - # +-------------------------------+ - # | - # v - # +-------------------------------+ # | file_batcher_module | # +-------------------------------+ # | @@ -94,8 +87,7 @@ def dfp_preproc(builder: mrc.Builder): ts_column_name = config.get("timestamp_column_name", None) monitor_options = config.get("monitor_options", {}) - pre_filter_options = config.get("pre_filter_options", {}) - task_type = pre_filter_options.get("filter_task_type") + monitor_name_postfix = monitor_options.get("name_postfix", "") batching_opts = config.get("batching_options", {}) batching_opts["cache_dir"] = cache_dir @@ -107,12 +99,9 @@ def dfp_preproc(builder: mrc.Builder): supported_loaders = config.get("supported_loaders", {}) - file_to_df_monitor_default = {"description": f"FileToDF [{task_type}_pipe]"} + file_to_df_monitor_default = {"description": f"FileToDF {monitor_name_postfix}"} file_to_df_monitor_conf = merge_dictionaries(monitor_options, file_to_df_monitor_default) - pre_filter_default = {} - pre_filter_conf = merge_dictionaries(pre_filter_options, pre_filter_default) - # Double check on how 'batcher_config' is used in the file_batcher module. batching_opts_default = { "file_type": "JSON", @@ -133,13 +122,9 @@ def dfp_preproc(builder: mrc.Builder): dfp_split_users_default = {"fallback_username": config.get("fallback_username", "generic_user")} dfp_split_users_conf = merge_dictionaries(splitting_opts, dfp_split_users_default) - dfp_split_users_monitor_default = {"description": f"SplitUsers [{task_type}_pipe]"} + dfp_split_users_monitor_default = {"description": f"SplitUsers {monitor_name_postfix}"} dfp_split_users_monitor_conf = merge_dictionaries(monitor_options, dfp_split_users_monitor_default) - filter_control_message_module = builder.load_module(FILTER_CONTROL_MESSAGE, - "morpheus", - "filter_control_message", - pre_filter_conf) file_batcher_module = builder.load_module(FILE_BATCHER, "morpheus", "file_batcher", file_batcher_conf) file_to_df_dataloader_module = builder.load_module(DATA_LOADER, "morpheus", @@ -156,12 +141,11 @@ def dfp_preproc(builder: mrc.Builder): dfp_split_users_monitor_module = dfp_split_users_monitor_loader.load(builder=builder) # Make an edge between the modules. - builder.make_edge(filter_control_message_module.output_port("output"), file_batcher_module.input_port("input")) builder.make_edge(file_batcher_module.output_port("output"), file_to_df_dataloader_module.input_port("input")) builder.make_edge(file_to_df_dataloader_module.output_port("output"), file_to_df_monitor_module.input_port("input")) builder.make_edge(file_to_df_monitor_module.output_port("output"), dfp_split_users_module.input_port("input")) builder.make_edge(dfp_split_users_module.output_port("output"), dfp_split_users_monitor_module.input_port("input")) # Register input and output port for a module. - builder.register_module_input("input", filter_control_message_module.input_port("input")) + builder.register_module_input("input", file_batcher_module.input_port("input")) builder.register_module_output("output", dfp_split_users_monitor_module.output_port("output")) diff --git a/python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py b/python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py index 394ae0a12b..92df9fadf3 100644 --- a/python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py +++ b/python/morpheus_dfp/morpheus_dfp/modules/dfp_training_pipe.py @@ -182,13 +182,14 @@ def dfp_training_pipe(builder: mrc.Builder): ts_column_name = config.get("timestamp_column_name") monitor_options = config.get("monitor_options", {}) + preproc_monitor_options = monitor_options.copy() + if "name_postfix" not in preproc_monitor_options: + preproc_monitor_options["name_postfix"] = "[training_pipe]" + preproc_options = { "batching_options": config.get("batching_options", {}), "cache_dir": cache_dir, - "monitor_options": monitor_options, - "pre_filter_options": { - "enable_task_filtering": True, "filter_task_type": "training" - }, + "monitor_options": preproc_monitor_options, "timestamp_column_name": ts_column_name, "user_splitting_options": config.get("user_splitting_options", {}), } diff --git a/python/morpheus_dfp/morpheus_dfp/utils/dfp_arg_parser.py b/python/morpheus_dfp/morpheus_dfp/utils/dfp_arg_parser.py index 9fb64481e2..46d223b1a0 100644 --- a/python/morpheus_dfp/morpheus_dfp/utils/dfp_arg_parser.py +++ b/python/morpheus_dfp/morpheus_dfp/utils/dfp_arg_parser.py @@ -36,12 +36,12 @@ class TimeFields: class DFPArgParser: def __init__(self, - skip_user: str, - only_user: str, - start_time: str, + skip_user: tuple[str], + only_user: tuple[str], + start_time: datetime | None, log_level: int, cache_dir: str, - sample_rate_s: str, + sample_rate_s: int, duration: str, source: str, tracking_uri: str, diff --git a/python/morpheus_llm/morpheus_llm/_lib/cmake/libmorpheus_llm.cmake b/python/morpheus_llm/morpheus_llm/_lib/cmake/libmorpheus_llm.cmake index 83ba243398..539696fc38 100644 --- a/python/morpheus_llm/morpheus_llm/_lib/cmake/libmorpheus_llm.cmake +++ b/python/morpheus_llm/morpheus_llm/_lib/cmake/libmorpheus_llm.cmake @@ -60,6 +60,9 @@ target_include_directories(morpheus_llm # In debug mode, dont allow missing symbols target_link_options(morpheus_llm PUBLIC "$<$:-Wl,--no-allow-shlib-undefined>") +# Avoid warning from the lto-wrapper about serial compilation +target_link_options(morpheus_llm PUBLIC "-flto=auto") + # Ideally, we dont use glob here. But there is no good way to guarantee you dont miss anything like *.cpp file(GLOB_RECURSE morpheus_llm_public_headers LIST_DIRECTORIES FALSE diff --git a/scripts/compare_data_files.py b/scripts/compare_data_files.py index b1a53f4fa1..3250d9d65b 100755 --- a/scripts/compare_data_files.py +++ b/scripts/compare_data_files.py @@ -66,7 +66,7 @@ def main(): abs_tol=args.abs_tol, rel_tol=args.rel_tol) - if results['diff_rows'] > 0: + if results['diff_rows'] > 0 or results['diff_cols'] > 0: sys.exit(1)