Skip to content

Commit

Permalink
[DRAFT] Add model and experiment template 'click' options to dfp exam…
Browse files Browse the repository at this point in the history
…ple pipelines, and make model names Databricks compatible. (#1245)

Closes issue #1244

Authors:
  - Devin Robison (https://github.com/drobison00)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1245
  • Loading branch information
drobison00 authored Oct 5, 2023
1 parent f1bb834 commit c521cd4
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def __init__(self,
source: str,
tracking_uri: str,
silence_monitors: bool,
mlflow_experiment_name_formatter: str,
mlflow_model_name_formatter: str,
train_users: str = None):

self._skip_users = list(skip_user)
Expand All @@ -65,8 +67,8 @@ def __init__(self,
self._time_fields: TimeFields = None
self._silence_monitors = silence_monitors

self._model_name_formatter = f"DFP-{source}-" + "{user_id}"
self._experiment_name_formatter = f"dfp/{source}/training/" + "{reg_model_name}"
self._model_name_formatter = mlflow_model_name_formatter
self._experiment_name_formatter = mlflow_experiment_name_formatter

@staticmethod
def verify_init(func):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/azure/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-azure-{user_id}",
help="The MLflow model name template to use when logging models. ")
def run_pipeline(train_users,
skip_user: typing.Tuple[str],
only_user: typing.Tuple[str],
Expand All @@ -149,6 +157,8 @@ def run_pipeline(train_users,
log_level,
sample_rate_s,
filter_threshold,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
"""Runs the DFP pipeline."""
# To include the generic, we must be training all or generic
Expand Down Expand Up @@ -311,8 +321,8 @@ def run_pipeline(train_users,
# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

model_name_formatter = "DFP-azure-{user_id}"
experiment_name_formatter = "dfp/azure/training/{reg_model_name}"
model_name_formatter = mlflow_model_name_template
experiment_name_formatter = mlflow_experiment_name_template

if (is_training):
# Finally, perform training which will output a model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/duo/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-duo-{user_id}",
help="The MLflow model name template to use when logging models. ")
def run_pipeline(train_users,
skip_user: typing.Tuple[str],
only_user: typing.Tuple[str],
Expand All @@ -150,6 +158,8 @@ def run_pipeline(train_users,
log_level,
sample_rate_s,
filter_threshold,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
"""Runs the DFP pipeline."""
# To include the generic, we must be training all or generic
Expand Down Expand Up @@ -306,8 +316,8 @@ def run_pipeline(train_users,
# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

model_name_formatter = "DFP-duo-{user_id}"
experiment_name_formatter = "dfp/duo/training/{reg_model_name}"
model_name_formatter = mlflow_model_name_template
experiment_name_formatter = mlflow_experiment_name_template

if (is_training):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/{source}/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-{source}-{user_id}",
help="The MLflow model name template to use when logging models. ")
@click.option("--disable_pre_filtering",
is_flag=True,
help=("Enabling this option will skip pre-filtering of json messages. "
Expand All @@ -126,6 +134,8 @@ def run_pipeline(source: str,
tracking_uri,
silence_monitors,
use_cpp,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
if (skip_user and only_user):
logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting")
Expand All @@ -140,6 +150,8 @@ def run_pipeline(source: str,
source,
tracking_uri,
silence_monitors,
mlflow_experiment_name_template,
mlflow_model_name_template,
train_users)

dfp_arg_parser.init()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/{source}/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-{source}-{user_id}",
help="The MLflow model name template to use when logging models. ")
@click.option('--bootstrap_servers',
type=str,
default="localhost:9092",
Expand Down Expand Up @@ -138,6 +146,8 @@ def run_pipeline(source: str,
tracking_uri,
silence_monitors,
use_cpp,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
if (skip_user and only_user):
logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting")
Expand All @@ -152,6 +162,8 @@ def run_pipeline(source: str,
source,
tracking_uri,
silence_monitors,
mlflow_experiment_name_template,
mlflow_model_name_template,
train_users)

dfp_arg_parser.init()
Expand Down
29 changes: 26 additions & 3 deletions morpheus/controllers/mlflow_model_writer_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ def experiment_name_formatter(self):
def databricks_permissions(self):
return self._databricks_permissions

def _create_safe_user_id(self, user_id: str):
"""
Creates a safe user ID for use in MLflow model names and experiment names.
Parameters
----------
user_id : str
The user ID.
Returns
-------
str
The generated safe user ID.
"""

safe_user_id = user_id.replace('.', '_dot_')
safe_user_id = safe_user_id.replace('/', '_slash_')
safe_user_id = safe_user_id.replace(':', '_colon_')

return safe_user_id

def user_id_to_model(self, user_id: str):
"""
Converts a user ID to an model name
Expand All @@ -102,7 +123,7 @@ def user_id_to_model(self, user_id: str):
"""

kwargs = {
"user_id": user_id,
"user_id": self._create_safe_user_id(user_id),
"user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(),
}

Expand All @@ -123,9 +144,11 @@ def user_id_to_experiment(self, user_id: str) -> str:
The generated experiment name.
"""

safe_user_id = self._create_safe_user_id(user_id)

kwargs = {
"user_id": user_id,
"user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(),
"user_id": safe_user_id,
"user_md5": hashlib.md5(safe_user_id.encode('utf-8')).hexdigest(),
"reg_model_name": self.user_id_to_model(user_id=user_id)
}

Expand Down

0 comments on commit c521cd4

Please sign in to comment.