Skip to content

Commit

Permalink
Fix tests to detect issue #1626 (#1629)
Browse files Browse the repository at this point in the history
* PR #659 inadvertently excluded the monitor stage from several of the end-to-end pipeline tests.
* Adds an environment variable `MORPHEUS_MONITOR_ALWAYS_ENABLED` which when set, will force the monitor stage to always be enabled.
* Adds an auto-use fixture `monitor_stage_always_enabled` which ensures the environment variable is set & present. 

Requires nv-morpheus/MRC#473 to be merged first

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - Christopher Harris (https://github.com/cwharris)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1629
  • Loading branch information
dagardner-nv authored Apr 24, 2024
1 parent cbfea7d commit ec18300
Show file tree
Hide file tree
Showing 16 changed files with 137 additions and 86 deletions.
6 changes: 3 additions & 3 deletions tests/benchmarks/test_bench_e2e_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def nlp_pipeline(config: Config, input_file, repeat, vocab_hash_file, output_fil
server_url=E2E_TEST_CONFIGS["triton_server_url"],
force_convert_inputs=True))
pipeline.add_stage(AddClassificationsStage(config, threshold=0.5, prefix=""))
pipeline.add_stage(MonitorStage(config))
pipeline.add_stage(MonitorStage(config, log_level=logging.INFO))
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))

Expand All @@ -89,7 +89,7 @@ def fil_pipeline(config: Config, input_file, repeat, output_file, model_name):
server_url=E2E_TEST_CONFIGS["triton_server_url"],
force_convert_inputs=True))
pipeline.add_stage(AddClassificationsStage(config, threshold=0.5, prefix=""))
pipeline.add_stage(MonitorStage(config))
pipeline.add_stage(MonitorStage(config, log_level=logging.INFO))
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))

Expand All @@ -111,7 +111,7 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file
pipeline.add_stage(PreprocessAEStage(config))
pipeline.add_stage(AutoEncoderInferenceStage(config))
pipeline.add_stage(AddScoresStage(config))
pipeline.add_stage(MonitorStage(config))
pipeline.add_stage(MonitorStage(config, log_level=logging.INFO))
pipeline.add_stage(SerializeStage(config))
pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True))

Expand Down
9 changes: 5 additions & 4 deletions tests/benchmarks/test_bench_monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
import typing

import pytest
from static_message_source import StaticMessageSource
Expand All @@ -29,7 +30,7 @@
from morpheus.utils.logger import configure_logging


def build_and_run_pipeline(config: Config, df: cudf.DataFrame):
def build_and_run_pipeline(*, config: Config, df: cudf.DataFrame, morpheus_log_level: int):

# Pipeline
pipeline = LinearPipeline(config)
Expand All @@ -39,15 +40,15 @@ def build_and_run_pipeline(config: Config, df: cudf.DataFrame):
pipeline.add_stage(DeserializeStage(config))

# Stage we want to benchmark
pipeline.add_stage(MonitorStage(config))
pipeline.add_stage(MonitorStage(config, log_level=morpheus_log_level))

pipeline.build()
pipeline.run()


@pytest.mark.benchmark
@pytest.mark.parametrize("num_messages", [1, 100, 10000, 1000000])
def test_monitor_stage(benchmark, num_messages):
def test_monitor_stage(benchmark: typing.Callable, num_messages: int, morpheus_log_level: int):

# Test Data

Expand All @@ -70,4 +71,4 @@ def test_monitor_stage(benchmark, num_messages):
config.edge_buffer_size = 4

# would prefer to benchmark just pipeline.run, but it asserts when called multiple times
benchmark(build_and_run_pipeline, config, df)
benchmark(build_and_run_pipeline, config=config, df=df, morpheus_log_level=morpheus_log_level)
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,15 @@ def loglevel_fatal():
_wrap_set_log_level(logging.FATAL)


@pytest.fixture(scope="function")
def morpheus_log_level():
"""
Returns the log level of the morpheus logger
"""
logger = logging.getLogger("morpheus")
yield logger.getEffectiveLevel()


# ==== DataFrame Fixtures ====
@pytest.fixture(scope="function")
def dataset(df_type: typing.Literal['cudf', 'pandas']):
Expand Down
4 changes: 2 additions & 2 deletions tests/examples/developer_guide/test_python_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
os.path.join(EXAMPLES_DIR, "my_compound_module_consumer_stage.py"),
os.path.join(EXAMPLES_DIR, "my_test_module_consumer_stage.py")
])
def test_pipeline(config: Config, import_mod: list[types.ModuleType]):
def test_pipeline(config: Config, import_mod: list[types.ModuleType], morpheus_log_level: int):
my_compound_module_consumer_stage = import_mod[-2]
my_test_module_consumer_stage = import_mod[-1]

Expand Down Expand Up @@ -72,7 +72,7 @@ def test_pipeline(config: Config, import_mod: list[types.ModuleType]):

pipeline.add_stage(my_test_module_consumer_stage.MyPassthroughModuleWrapper(config))
pipeline.add_stage(my_compound_module_consumer_stage.MyCompoundOpModuleWrapper(config))
pipeline.add_stage(MonitorStage(config))
pipeline.add_stage(MonitorStage(config, log_level=morpheus_log_level))
comp_stage = pipeline.add_stage(CompareDataFrameStage(config, expected_df))

pipeline.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,18 @@ def test_get_model(config: Config, mock_mlflow_client: mock.MagicMock, mock_mode


@pytest.mark.usefixtures("reset_loglevel")
@pytest.mark.parametrize('morpheus_log_level',
[logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG])
@pytest.mark.parametrize('log_level', [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG])
def test_on_data(
config: Config,
mock_mlflow_client: mock.MagicMock, # pylint: disable=unused-argument
mock_model_manager: mock.MagicMock,
dfp_multi_message: "MultiDFPMessage", # noqa: F821
morpheus_log_level: int,
log_level: int,
dataset_pandas: DatasetManager):
from dfp.messages.multi_dfp_message import MultiDFPMessage
from dfp.stages.dfp_inference_stage import DFPInferenceStage

set_log_level(morpheus_log_level)
set_log_level(log_level)

expected_results = list(range(1000, dfp_multi_message.mess_count + 1000))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@ def test_constructor(config: Config):

@pytest.mark.usefixtures("reset_loglevel")
@pytest.mark.parametrize('use_on_data', [True, False])
@pytest.mark.parametrize('morpheus_log_level',
[logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG])
@pytest.mark.parametrize('log_level', [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG])
@mock.patch('dfp.stages.dfp_postprocessing_stage.datetime')
def test_process_events_on_data(mock_datetime: mock.MagicMock,
config: Config,
dfp_multi_ae_message: MultiAEMessage,
use_on_data: bool,
morpheus_log_level: int):
log_level: int):
from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage

mock_dt_obj = mock.MagicMock()
Expand All @@ -54,7 +53,7 @@ def test_process_events_on_data(mock_datetime: mock.MagicMock,
df.loc[10, 'v2'] = np.nan
df['event_time'] = ''

set_log_level(morpheus_log_level)
set_log_level(log_level)
stage = DFPPostprocessingStage(config)

# on_data is a thin wrapper around process_events, tests should be the same for non-empty messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,16 @@ def test_constructor(config: Config):


@pytest.mark.usefixtures("reset_loglevel")
@pytest.mark.parametrize('morpheus_log_level',
[logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG])
@pytest.mark.parametrize('log_level', [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG])
def test_process_features(
config: Config,
dfp_multi_message: "MultiDFPMessage", # noqa: F821
dataset_pandas: DatasetManager,
morpheus_log_level: int):
log_level: int):
from dfp.messages.multi_dfp_message import MultiDFPMessage
from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage

set_log_level(morpheus_log_level)
set_log_level(log_level)

expected_df = dfp_multi_message.get_meta_dataframe().copy(deep=True)
expected_df['v210'] = expected_df['v2'] + 10
Expand Down
18 changes: 12 additions & 6 deletions tests/test_abp.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
@pytest.mark.slow
@pytest.mark.use_python
@mock.patch('tritonclient.grpc.InferenceServerClient')
def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path):
def test_abp_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int):
mock_metadata = {
"inputs": [{
'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH]
Expand Down Expand Up @@ -98,7 +98,8 @@ def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path):
pipe.add_stage(PreprocessFILStage(config))
pipe.add_stage(
TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='test:0000', force_convert_inputs=True))
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(AddClassificationsStage(config))
pipe.add_stage(AddScoresStage(config, prefix="score_"))
pipe.add_stage(
Expand All @@ -115,7 +116,7 @@ def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path):
@pytest.mark.slow
@pytest.mark.use_cpp
@pytest.mark.usefixtures("launch_mock_triton")
def test_abp_cpp(config, tmp_path):
def test_abp_cpp(config: Config, tmp_path: str, morpheus_log_level: int):
config.mode = PipelineModes.FIL
config.class_labels = ["mining"]
config.model_max_batch_size = MODEL_MAX_BATCH_SIZE
Expand All @@ -141,7 +142,8 @@ def test_abp_cpp(config, tmp_path):
pipe.add_stage(
TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001',
force_convert_inputs=True))
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(AddClassificationsStage(config))
pipe.add_stage(AddScoresStage(config, prefix="score_"))
pipe.add_stage(
Expand All @@ -158,7 +160,10 @@ def test_abp_cpp(config, tmp_path):
@pytest.mark.slow
@pytest.mark.use_python
@mock.patch('tritonclient.grpc.InferenceServerClient')
def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path):
def test_abp_multi_segment_no_cpp(mock_triton_client: mock.MagicMock,
config: Config,
tmp_path: str,
morpheus_log_level: int):
mock_metadata = {
"inputs": [{
'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH]
Expand Down Expand Up @@ -213,7 +218,8 @@ def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path):

pipe.add_segment_boundary(MultiResponseMessage) # Boundary 3

pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(AddClassificationsStage(config))

pipe.add_segment_boundary(MultiResponseMessage) # Boundary 4
Expand Down
12 changes: 8 additions & 4 deletions tests/test_abp_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def test_abp_no_cpp(mock_triton_client: mock.MagicMock,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: KafkaTopics,
kafka_consumer: "KafkaConsumer"):
kafka_consumer: "KafkaConsumer",
morpheus_log_level: int):
mock_metadata = {
"inputs": [{
'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH]
Expand Down Expand Up @@ -115,7 +116,8 @@ def test_abp_no_cpp(mock_triton_client: mock.MagicMock,
pipe.add_stage(PreprocessFILStage(config))
pipe.add_stage(
TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='test:0000', force_convert_inputs=True))
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(AddClassificationsStage(config))
pipe.add_stage(SerializeStage(config))
pipe.add_stage(
Expand Down Expand Up @@ -151,7 +153,8 @@ def test_abp_cpp(config: Config,
dataset_pandas: DatasetManager,
kafka_bootstrap_servers: str,
kafka_topics: KafkaTopics,
kafka_consumer: "KafkaConsumer"):
kafka_consumer: "KafkaConsumer",
morpheus_log_level: int):
config.mode = PipelineModes.FIL
config.class_labels = ["mining"]
config.model_max_batch_size = MODEL_MAX_BATCH_SIZE
Expand Down Expand Up @@ -183,7 +186,8 @@ def test_abp_cpp(config: Config,
pipe.add_stage(
TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001',
force_convert_inputs=True))
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(AddClassificationsStage(config))
pipe.add_stage(SerializeStage(config))
pipe.add_stage(
Expand Down
16 changes: 10 additions & 6 deletions tests/test_dfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from _utils import TEST_DIRS
from _utils import calc_error_val
from morpheus.config import Config
from morpheus.config import ConfigAutoEncoder
from morpheus.config import PipelineModes
from morpheus.messages.message_meta import MessageMeta
Expand Down Expand Up @@ -50,7 +51,7 @@
@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage])
@pytest.mark.usefixtures("reload_modules")
@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder')
def test_dfp_roleg(mock_ae, config, tmp_path):
def test_dfp_roleg(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int):
tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_tensor.csv'), delimiter=',')
anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_anomaly_score.csv'), delimiter=',')
exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_exp_results.csv'))
Expand Down Expand Up @@ -107,7 +108,8 @@ def test_dfp_roleg(mock_ae, config, tmp_path):
cold_end=False,
filter_percent=90.0,
zscore_threshold=8.0))
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(
ValidationStage(config,
val_file_name=val_file_name,
Expand Down Expand Up @@ -135,7 +137,7 @@ def test_dfp_roleg(mock_ae, config, tmp_path):
@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage])
@pytest.mark.usefixtures("reload_modules")
@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder')
def test_dfp_user123(mock_ae, config, tmp_path):
def test_dfp_user123(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int):
tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',')
anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',')
exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv'))
Expand Down Expand Up @@ -190,7 +192,8 @@ def test_dfp_user123(mock_ae, config, tmp_path):
cold_end=False,
filter_percent=90.0,
zscore_threshold=8.0))
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(
ValidationStage(config,
val_file_name=val_file_name,
Expand All @@ -217,7 +220,7 @@ def test_dfp_user123(mock_ae, config, tmp_path):
@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage])
@pytest.mark.usefixtures("reload_modules")
@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder')
def test_dfp_user123_multi_segment(mock_ae, config, tmp_path):
def test_dfp_user123_multi_segment(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int):
tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',')
anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',')
exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv'))
Expand Down Expand Up @@ -278,7 +281,8 @@ def test_dfp_user123_multi_segment(mock_ae, config, tmp_path):
filter_percent=90.0,
zscore_threshold=8.0))
pipe.add_segment_boundary(MultiResponseMessage) # Boundary 6
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(
ValidationStage(config,
val_file_name=val_file_name,
Expand Down
12 changes: 8 additions & 4 deletions tests/test_dfp_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def test_dfp_roleg(mock_ae: mock.MagicMock,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: KafkaTopics,
kafka_consumer: "KafkaConsumer"):
kafka_consumer: "KafkaConsumer",
morpheus_log_level: int):
tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_tensor.csv'), delimiter=',')
anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_anomaly_score.csv'), delimiter=',')
exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_exp_results.csv'))
Expand Down Expand Up @@ -116,7 +117,8 @@ def test_dfp_roleg(mock_ae: mock.MagicMock,
cold_end=False,
filter_percent=90.0,
zscore_threshold=8.0))
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(SerializeStage(config, include=[]))
pipe.add_stage(
WriteToKafkaStage(config, bootstrap_servers=kafka_bootstrap_servers, output_topic=kafka_topics.output_topic))
Expand Down Expand Up @@ -166,7 +168,8 @@ def test_dfp_user123(mock_ae: mock.MagicMock,
config: Config,
kafka_bootstrap_servers: str,
kafka_topics: KafkaTopics,
kafka_consumer: "KafkaConsumer"):
kafka_consumer: "KafkaConsumer",
morpheus_log_level: int):
tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',')
anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',')
exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv'))
Expand Down Expand Up @@ -217,7 +220,8 @@ def test_dfp_user123(mock_ae: mock.MagicMock,
cold_end=False,
filter_percent=90.0,
zscore_threshold=8.0))
pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf"))
pipe.add_stage(
MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level))
pipe.add_stage(SerializeStage(config, include=[]))
pipe.add_stage(
WriteToKafkaStage(config, bootstrap_servers=kafka_bootstrap_servers, output_topic=kafka_topics.output_topic))
Expand Down
Loading

0 comments on commit ec18300

Please sign in to comment.