Prior to running the GNN fraud detection pipeline, additional requirements must be installed in to your Conda environment. A supplemental requirements file has been provided in this example directory.
export CUDA_VER=12.1
mamba env update \
-n ${CONDA_DEFAULT_ENV} \
--file ./conda/environments/examples_cuda-121_arch-x86_64.yaml
export MORPHEUS_ROOT=$(pwd)
Use Morpheus to run the GNN fraud detection Pipeline with the transaction data. A pipeline has been configured in run.py
with several command line options:
cd ${MORPHEUS_ROOT}/examples/gnn_fraud_detection_pipeline
python run.py --help
Usage: run.py [OPTIONS]
Options:
--num_threads INTEGER RANGE Number of internal pipeline threads to use.
[x>=1]
--pipeline_batch_size INTEGER RANGE
Internal batch size for the pipeline. Can be
much larger than the model batch size. Also
used for Kafka consumers. [x>=1]
--model_max_batch_size INTEGER RANGE
Max batch size to use for the model. [x>=1]
--model_fea_length INTEGER RANGE
Features length to use for the model.
[x>=1]
--input_file PATH Input data filepath. [required]
--training_file PATH Training data filepath. [required]
--model_dir PATH Trained model directory path [required]
--output_file TEXT The path to the file where the inference
output will be saved.
--help Show this message and exit.
To launch the configured Morpheus pipeline with the sample data that is provided at $MORPHEUS_ROOT/models/dataset
, run the following:
cd ${MORPHEUS_ROOT}/examples/gnn_fraud_detection_pipeline
python run.py
====Registering Pipeline====
====Building Pipeline====
Graph construction rate: 0 messages [00:00, ? me====Building Pipeline Complete!====
Inference rate: 0 messages [00:00, ? messages/s]====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started==== 0 messages [00:00, ? messages/s]
====Building Segment: linear_segment_0====ges/s]
Added source: <from-file-0; FileSourceStage(filename=validation.csv, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=False)>
└─> morpheus.MessageMeta
Added stage: <deserialize-1; DeserializeStage(ensure_sliceable_index=True)>
└─ morpheus.MessageMeta -> morpheus.MultiMessage
Added stage: <fraud-graph-construction-2; FraudGraphConstructionStage(training_file=training.csv)>
└─ morpheus.MultiMessage -> stages.FraudGraphMultiMessage
Added stage: <monitor-3; MonitorStage(description=Graph construction rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ stages.FraudGraphMultiMessage -> stages.FraudGraphMultiMessage
Added stage: <gnn-fraud-sage-4; GraphSAGEStage(model_dir=model, batch_size=100, record_id=index, target_node=transaction)>
└─ stages.FraudGraphMultiMessage -> stages.GraphSAGEMultiMessage
Added stage: <monitor-5; MonitorStage(description=Inference rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ stages.GraphSAGEMultiMessage -> stages.GraphSAGEMultiMessage
Added stage: <gnn-fraud-classification-6; ClassificationStage(model_xgb_file=model/xgb.pt)>
└─ stages.GraphSAGEMultiMessage -> morpheus.MultiMessage
Added stage: <monitor-7; MonitorStage(description=Add classification rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MultiMessage -> morpheus.MultiMessage
Added stage: <serialize-8; SerializeStage(include=[], exclude=['^ID$', '^_ts_'], fixed_columns=True)>
└─ morpheus.MultiMessage -> morpheus.MessageMeta
Added stage: <monitor-9; MonitorStage(description=Serialize rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None, log_level=LogLevels.INFO)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <to-file-10; WriteToFileStage(filename=output.csv, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
Graph construction rate[Complete]: 265 messages [00:00, 1218.88 messages/s]
Inference rate[Complete]: 265 messages [00:01, 174.04 messages/s]
Add classification rate[Complete]: 265 messages [00:01, 170.69 messages/s]
Serialize rate[Complete]: 265 messages [00:01, 166.36 messages/s]
====Pipeline Complete====
The above example is illustrative of using the Python API to build a custom Morpheus pipeline. Alternately, the Morpheus command line could have been used to accomplish the same goal. To do this we must ensure the examples
directory is available in the PYTHONPATH
and each of the custom stages are registered as plugins.
Note: Since the gnn_fraud_detection_pipeline
module is visible to Python we can specify the plugins by their module name rather than the more verbose file path.
From the root of the Morpheus repo, run:
PYTHONPATH="examples" \
morpheus --log_level INFO \
--plugin "gnn_fraud_detection_pipeline" \
run --use_cpp False --pipeline_batch_size 1024 --model_max_batch_size 32 --edge_buffer_size 4 \
pipeline-other --model_fea_length 70 --label=probs \
from-file --filename examples/gnn_fraud_detection_pipeline/validation.csv --filter_null False \
deserialize \
fraud-graph-construction --training_file examples/gnn_fraud_detection_pipeline/training.csv \
monitor --description "Graph construction rate" \
gnn-fraud-sage --model_dir examples/gnn_fraud_detection_pipeline/model/ \
monitor --description "Inference rate" \
monitor --description "Add classification rate" \
serialize \
to-file --filename "output.csv" --overwrite