Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-23.11' into 1177-fea-ad…
Browse files Browse the repository at this point in the history
…d-milvus-vector-db-service-and-sink
  • Loading branch information
bsuryadevara committed Oct 11, 2023
2 parents 2f24cc2 + 0772f87 commit 9670c97
Show file tree
Hide file tree
Showing 41 changed files with 692 additions and 177 deletions.
2 changes: 1 addition & 1 deletion ci/conda/recipes/morpheus/conda_build_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ python:
- 3.10

boost:
- 1.74
- 1.82

rapids_version:
- 23.06
3 changes: 2 additions & 1 deletion ci/conda/recipes/morpheus/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ outputs:
- docker-py 5.0.*
- grpcio # Version determined from cudf
- libmrc
- mlflow >=2.2.1,<3
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- mlflow>=2.2.1,<3
- mrc
- networkx 3.1.*
- numpydoc 1.4.*
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ export SKIP_YAPF=${SKIP_YAPF:-""}
# Set BUILD_DIR to use a different build folder
export BUILD_DIR=${BUILD_DIR:-"${MORPHEUS_ROOT}/build"}

# Speficy the clang-tools version to use. Default 14
export CLANG_TOOLS_VERSION=${CLANG_TOOLS_VERSION:-14}
# Speficy the clang-tools version to use. Default 16
export CLANG_TOOLS_VERSION=${CLANG_TOOLS_VERSION:-16}

# Returns the `branch-YY.MM` that is used as the base for merging
function get_base_branch() {
Expand Down
4 changes: 4 additions & 0 deletions ci/scripts/github/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ function update_conda_env() {

if [[ "${SKIP_CONDA_ENV_UPDATE}" == "" ]]; then
rapids-logger "Checking for updates to conda env"

# Remove default/conflicting channels from base image
rm /opt/conda/.condarc

# Update the packages
rapids-mamba-retry env update -n morpheus --prune -q --file ${ENV_YAML}
fi
Expand Down
9 changes: 9 additions & 0 deletions dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ files:
arch: [x86_64]
includes:
- build_cpp
- development_cpp
- run_morpheus
- test_python_morpheus
- cudatoolkit
Expand Down Expand Up @@ -56,6 +57,14 @@ dependencies:
- scikit-build=0.17.1
- tritonclient=2.26 # Required by NvTabular, force the version, so we get protobufs compatible with 4.21
- ucx=1.14
development_cpp:
common:
- output_types: [conda]
packages:
- clangdev=14
- include-what-you-use=0.18
- isort
- yapf=0.40.1
run_morpheus:
common:
- output_types: [conda]
Expand Down
1 change: 1 addition & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ RUN --mount=type=cache,id=apt,target=/var/cache/apt \
libcurand-${CUDA_MAJOR_VER}-${CUDA_MINOR_VER} \
libcusolver-${CUDA_MAJOR_VER}-${CUDA_MINOR_VER} \
libnuma1 \
openjdk-11-jre-headless \
openssh-client \
pkg-config \
tar \
Expand Down
10 changes: 6 additions & 4 deletions docker/conda/environments/cuda11.8_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ dependencies:
####### Morpheus Dependencies (keep sorted!) #######
- automake=1.16.5
- benchmark=1.6.1
- boost-cpp=1.74
- boost-cpp=1.82
- cachetools=5.0.0
- ccache>=3.7
- clangdev=14
- clangdev=16
- click >=8
- cmake=3.24
- configargparse=1.5
Expand Down Expand Up @@ -58,14 +58,15 @@ dependencies:
- grpcio
- gtest>=1.13.0
- gxx_linux-64=11.2
- include-what-you-use=0.18
- include-what-you-use=0.20
- ipywidgets
- isort
- jupyter_core>=4.11.2,<5.0
- jupyterlab
- libgrpc>=1.49
- librdkafka=1.9.2
- mlflow>=2.2.1,<2.7
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- mlflow>=2.2.1,<3
- mrc=23.11
- networkx=3.1
- ninja=1.10
Expand Down Expand Up @@ -107,6 +108,7 @@ dependencies:
####### Morpheus Pip Dependencies (keep sorted!) #######
- pip:
# Add additional dev dependencies here
- databricks-connect
- pytest-kafka==0.6.0
- pymilvus==2.3.1
- milvus
1 change: 1 addition & 0 deletions docker/conda/environments/cuda11.8_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies:
- dgl=1.0.2
- dill=0.3.6
- distributed>=2023.1.1
- libwebp>=1.3.2 # Required for CVE mitigation: https://nvd.nist.gov/vuln/detail/CVE-2023-4863
- mlflow>=2.2.1,<3
- papermill=2.3.4
- s3fs>=2023.6
1 change: 1 addition & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@
autodoc_mock_imports = [
"cudf", # Avoid loading GPU libraries during the documentation build
"cupy", # Avoid loading GPU libraries during the documentation build
"databricks.connect",
"merlin",
"morpheus.cli.commands", # Dont document the CLI in Sphinx
"nvtabular",
Expand Down
20 changes: 12 additions & 8 deletions docs/source/developer_guide/guides/2_real_world_phishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ Note that the tokenizer parameters and vocabulary hash file should exactly match

At this point, we have a pipeline that reads in a set of records and pre-processes them with the metadata required for our classifier to make predictions. Our next step is to define a stage that applies a machine learning model to our `MessageMeta` object. To accomplish this, we will be using Morpheus' `TritonInferenceStage`. This stage will handle communication with the `phishing-bert-onnx` model, which we provided to the Triton Docker container via the `models` directory mount.

Next we will add a monitor stage to measure the inference rate as well as a filter stage to filter out any results below a probability threshold of `0.9`.
Next we will add a monitor stage to measure the inference rate:

```python
# Add an inference stage
pipeline.add_stage(
Expand All @@ -418,14 +419,17 @@ pipeline.add_stage(
))

pipeline.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
```

# Filter values lower than 0.9
pipeline.add_stage(FilterDetectionsStage(config, threshold=0.9))
Here we add a postprocessing stage that adds the probability score for `is_phishing`:

```python
pipeline.add_stage(AddScoresStage(config, labels=["is_phishing"]))
```

Lastly, we will save our results to disk. For this purpose, we are using two stages that are often used in conjunction with each other: `SerializeStage` and `WriteToFileStage`.

The `SerializeStage` is used to include and exclude columns as desired in the output. Importantly, it also handles conversion from the `MultiMessage`-derived output type that is used by the `FilterDetectionsStage` to the `MessageMeta` class that is expected as input by the `WriteToFileStage`.
The `SerializeStage` is used to include and exclude columns as desired in the output. Importantly, it also handles conversion from the `MultiMessage`-derived output type to the `MessageMeta` class that is expected as input by the `WriteToFileStage`.

The `WriteToFileStage` will append message data to the output file as messages are received. Note however that for performance reasons the `WriteToFileStage` does not flush its contents out to disk every time a message is received. Instead, it relies on the underlying [buffered output stream](https://gcc.gnu.org/onlinedocs/libstdc++/manual/streambufs.html) to flush as needed, and then will close the file handle on shutdown.

Expand Down Expand Up @@ -456,7 +460,7 @@ from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage
from morpheus.stages.postprocess.add_scores_stage import AddScoresStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage
Expand Down Expand Up @@ -522,8 +526,8 @@ def run_pipeline():
# Monitor the inference rate
pipeline.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))

# Filter values lower than 0.9
pipeline.add_stage(FilterDetectionsStage(config, threshold=0.9))
# Add probability score for is_phishing
pipeline.add_stage(AddScoresStage(config, labels=["is_phishing"]))

# Write the to the output file
pipeline.add_stage(SerializeStage(config))
Expand All @@ -550,7 +554,7 @@ morpheus --log_level=debug --plugin examples/developer_guide/2_1_real_world_phis
preprocess --vocab_hash_file=data/bert-base-uncased-hash.txt --truncation=true --do_lower_case=true --add_special_tokens=false \
inf-triton --model_name=phishing-bert-onnx --server_url=localhost:8001 --force_convert_inputs=true \
monitor --description="Inference Rate" --smoothing=0.001 --unit=inf \
filter --threshold=0.9 --filter_source=TENSOR \
add-scores --label=is_phishing \
serialize \
to-file --filename=/tmp/detections.jsonlines --overwrite
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ The `DFPFileToDataFrameStage` (examples/digital_fingerprinting/production/morphe
| `parser_kwargs` | `dict` or `None` | Optional: additional keyword arguments to be passed into the `DataFrame` parser, currently this is going to be either [`pandas.read_csv`](https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html), [`pandas.read_json`](https://pandas.pydata.org/docs/reference/api/pandas.read_json.html) or [`pandas.read_parquet`](https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html) |
| `cache_dir` | `str` | Optional: path to cache location, defaults to `./.cache/dfp` |

This stage is able to download and load data files concurrently by multiple methods. Currently supported methods are: `single_thread`, `multiprocess`, `dask`, and `dask_thread`. The method used is chosen by setting the {envvar}`MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable, and `dask_thread` is used by default, and `single_thread` effectively disables concurrent loading.
This stage is able to download and load data files concurrently by multiple methods. Currently supported methods are: `single_thread`, `dask`, and `dask_thread`. The method used is chosen by setting the {envvar}`MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable, and `dask_thread` is used by default, and `single_thread` effectively disables concurrent loading.

This stage will cache the resulting `DataFrame` in `cache_dir`, since we are caching the `DataFrame`s and not the source files, a cache hit avoids the cost of parsing the incoming data. In the case of remote storage systems, such as S3, this avoids both parsing and a download on a cache hit. One consequence of this is that any change to the `schema` will require purging cached files in the `cache_dir` before those changes are visible.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/loaders/core/file_to_df_loader.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.

## File to DataFrame Loader

[DataLoader](../../modules/core/data_loader.md) module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to use different processing methods, such as single-threaded, multiprocess, dask, or dask_thread, as determined by the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable. When download_method starts with "dask," a dask client is created to process the files, otherwise, a single thread or multiprocess is used.
[DataLoader](../../modules/core/data_loader.md) module is used to load data files content into a dataframe using custom loader function. This loader function can be configured to use different processing methods, such as single-threaded, dask, or dask_thread, as determined by the `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable. When download_method starts with "dask," a dask client is created to process the files, otherwise, a single thread is used.

After processing, the resulting dataframe is cached using a hash of the file paths. This loader also has the ability to load file content from S3 buckets, in addition to loading data from the disk.

Expand Down
7 changes: 3 additions & 4 deletions examples/abp_pcap_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ docker pull nvcr.io/nvidia/tritonserver:23.06-py3
```

##### Deploy Triton Inference Server

Bind the provided `abp-pcap-xgb` directory to the docker container model repo at `/models`.

From the root of the Morpheus repo, navigate to the anomalous behavior profiling example directory:
```bash
cd examples/abp_pcap_detection
```

# Launch the container
The following creates the Triton container, mounts the `abp-pcap-xgb` directory to `/models/abp-pcap-xgb` in the Triton container, and starts the Triton server:
```bash
docker run --rm --gpus=all -p 8000:8000 -p 8001:8001 -p 8002:8002 -v $PWD/abp-pcap-xgb:/models/abp-pcap-xgb --name tritonserver nvcr.io/nvidia/tritonserver:23.06-py3 tritonserver --model-repository=/models --exit-on-error=false
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ function(find_and_configure_SimpleAmqpClient version)
# Needed to pick up the generated export.h
target_include_directories(SimpleAmqpClient PUBLIC "${rabbitmq_BINARY_DIR}/include")

# Suppress #warning deprecation messages from rabbitmq
target_compile_options(SimpleAmqpClient PRIVATE -Wno-cpp)
# Suppress #warning deprecation messages from rabbitmq and SimpleAmqpClient
# https://github.com/nv-morpheus/Morpheus/issues/1255
target_compile_options(SimpleAmqpClient PRIVATE -Wno-cpp -DBOOST_DISABLE_PRAGMA_MESSAGE)

endfunction()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Morpheus pipeline configurations for each workflow are managed using [pipelines_

When using the MRC SegmentModule in a pipeline, it will also require a module configuration which gets generated within the test. Additional information is included in the [Morpheus Pipeline with Modules](../../../../../docs/source/developer_guide/guides/6_digital_fingerprinting_reference.md#morpheus-pipeline-with-modules)

To ensure the [file_to_df_loader.py](../../../../../morpheus/loaders/file_to_df_loader.py) utilizes the same type of downloading mechanism, set `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable with any one of given choices (`multiprocess`, `dask`, `dask thread`, `single thread`).
To ensure the [file_to_df_loader.py](../../../../../morpheus/loaders/file_to_df_loader.py) utilizes the same type of downloading mechanism, set `MORPHEUS_FILE_DOWNLOAD_TYPE` environment variable with any one of given choices (`dask`, `dask thread`, `single thread`).

```
export MORPHEUS_FILE_DOWNLOAD_TYPE=dask
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,9 @@ def dfp_training_pipe(builder: mrc.Builder):
"encoder_layers": [512, 500], # layers of the encoding part
"decoder_layers": [512], # layers of the decoding part
"activation": 'relu', # activation function
"swap_p": 0.2, # noise parameter
"lr": 0.001, # learning rate
"lr_decay": 0.99, # learning decay
"swap_probability": 0.2, # noise parameter
"learning_rate": 0.001, # learning rate
"learning_rate_decay": 0.99, # learning decay
"batch_size": 512,
"verbose": False,
"optimizer": 'sgd', # SGD optimizer is selected(Stochastic gradient descent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ def __init__(self, c: Config, model_kwargs: dict = None, epochs=30, validation_s
"encoder_layers": [512, 500], # layers of the encoding part
"decoder_layers": [512], # layers of the decoding part
"activation": 'relu', # activation function
"swap_p": 0.2, # noise parameter
"lr": 0.001, # learning rate
"lr_decay": .99, # learning decay
"swap_probability": 0.2, # noise parameter
"learning_rate": 0.001, # learning rate
"learning_rate_decay": .99, # learning decay
"batch_size": 512,
"verbose": False,
"optimizer": 'sgd', # SGD optimizer is selected(Stochastic gradient descent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def __init__(self,
source: str,
tracking_uri: str,
silence_monitors: bool,
mlflow_experiment_name_formatter: str,
mlflow_model_name_formatter: str,
train_users: str = None):

self._skip_users = list(skip_user)
Expand All @@ -65,8 +67,8 @@ def __init__(self,
self._time_fields: TimeFields = None
self._silence_monitors = silence_monitors

self._model_name_formatter = f"DFP-{source}-" + "{user_id}"
self._experiment_name_formatter = f"dfp/{source}/training/" + "{reg_model_name}"
self._model_name_formatter = mlflow_model_name_formatter
self._experiment_name_formatter = mlflow_experiment_name_formatter

@staticmethod
def verify_init(func):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/azure/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-azure-{user_id}",
help="The MLflow model name template to use when logging models. ")
def run_pipeline(train_users,
skip_user: typing.Tuple[str],
only_user: typing.Tuple[str],
Expand All @@ -149,6 +157,8 @@ def run_pipeline(train_users,
log_level,
sample_rate_s,
filter_threshold,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
"""Runs the DFP pipeline."""
# To include the generic, we must be training all or generic
Expand Down Expand Up @@ -311,8 +321,8 @@ def run_pipeline(train_users,
# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

model_name_formatter = "DFP-azure-{user_id}"
experiment_name_formatter = "dfp/azure/training/{reg_model_name}"
model_name_formatter = mlflow_model_name_template
experiment_name_formatter = mlflow_experiment_name_template

if (is_training):
# Finally, perform training which will output a model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/duo/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-duo-{user_id}",
help="The MLflow model name template to use when logging models. ")
def run_pipeline(train_users,
skip_user: typing.Tuple[str],
only_user: typing.Tuple[str],
Expand All @@ -150,6 +158,8 @@ def run_pipeline(train_users,
log_level,
sample_rate_s,
filter_threshold,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
"""Runs the DFP pipeline."""
# To include the generic, we must be training all or generic
Expand Down Expand Up @@ -306,8 +316,8 @@ def run_pipeline(train_users,
# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

model_name_formatter = "DFP-duo-{user_id}"
experiment_name_formatter = "dfp/duo/training/{reg_model_name}"
model_name_formatter = mlflow_model_name_template
experiment_name_formatter = mlflow_experiment_name_template

if (is_training):

Expand Down
Loading

0 comments on commit 9670c97

Please sign in to comment.