diff --git a/tests/benchmarks/test_bench_e2e_pipelines.py b/tests/benchmarks/test_bench_e2e_pipelines.py index 14283cf154..e99e7bbc07 100644 --- a/tests/benchmarks/test_bench_e2e_pipelines.py +++ b/tests/benchmarks/test_bench_e2e_pipelines.py @@ -67,7 +67,7 @@ def nlp_pipeline(config: Config, input_file, repeat, vocab_hash_file, output_fil server_url=E2E_TEST_CONFIGS["triton_server_url"], force_convert_inputs=True)) pipeline.add_stage(AddClassificationsStage(config, threshold=0.5, prefix="")) - pipeline.add_stage(MonitorStage(config)) + pipeline.add_stage(MonitorStage(config, log_level=logging.INFO)) pipeline.add_stage(SerializeStage(config)) pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) @@ -89,7 +89,7 @@ def fil_pipeline(config: Config, input_file, repeat, output_file, model_name): server_url=E2E_TEST_CONFIGS["triton_server_url"], force_convert_inputs=True)) pipeline.add_stage(AddClassificationsStage(config, threshold=0.5, prefix="")) - pipeline.add_stage(MonitorStage(config)) + pipeline.add_stage(MonitorStage(config, log_level=logging.INFO)) pipeline.add_stage(SerializeStage(config)) pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) @@ -111,7 +111,7 @@ def ae_pipeline(config: Config, input_glob, repeat, train_data_glob, output_file pipeline.add_stage(PreprocessAEStage(config)) pipeline.add_stage(AutoEncoderInferenceStage(config)) pipeline.add_stage(AddScoresStage(config)) - pipeline.add_stage(MonitorStage(config)) + pipeline.add_stage(MonitorStage(config, log_level=logging.INFO)) pipeline.add_stage(SerializeStage(config)) pipeline.add_stage(WriteToFileStage(config, filename=output_file, overwrite=True)) diff --git a/tests/benchmarks/test_bench_monitor_stage.py b/tests/benchmarks/test_bench_monitor_stage.py index 7af2406acc..5ddbdef42d 100644 --- a/tests/benchmarks/test_bench_monitor_stage.py +++ b/tests/benchmarks/test_bench_monitor_stage.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +import typing import pytest from static_message_source import StaticMessageSource @@ -29,7 +30,7 @@ from morpheus.utils.logger import configure_logging -def build_and_run_pipeline(config: Config, df: cudf.DataFrame): +def build_and_run_pipeline(*, config: Config, df: cudf.DataFrame, morpheus_log_level: int): # Pipeline pipeline = LinearPipeline(config) @@ -39,7 +40,7 @@ def build_and_run_pipeline(config: Config, df: cudf.DataFrame): pipeline.add_stage(DeserializeStage(config)) # Stage we want to benchmark - pipeline.add_stage(MonitorStage(config)) + pipeline.add_stage(MonitorStage(config, log_level=morpheus_log_level)) pipeline.build() pipeline.run() @@ -47,7 +48,7 @@ def build_and_run_pipeline(config: Config, df: cudf.DataFrame): @pytest.mark.benchmark @pytest.mark.parametrize("num_messages", [1, 100, 10000, 1000000]) -def test_monitor_stage(benchmark, num_messages): +def test_monitor_stage(benchmark: typing.Callable, num_messages: int, morpheus_log_level: int): # Test Data @@ -70,4 +71,4 @@ def test_monitor_stage(benchmark, num_messages): config.edge_buffer_size = 4 # would prefer to benchmark just pipeline.run, but it asserts when called multiple times - benchmark(build_and_run_pipeline, config, df) + benchmark(build_and_run_pipeline, config=config, df=df, morpheus_log_level=morpheus_log_level) diff --git a/tests/conftest.py b/tests/conftest.py index 0a33fa7891..1f8f0ef425 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -861,6 +861,15 @@ def loglevel_fatal(): _wrap_set_log_level(logging.FATAL) +@pytest.fixture(scope="function") +def morpheus_log_level(): + """ + Returns the log level of the morpheus logger + """ + logger = logging.getLogger("morpheus") + yield logger.getEffectiveLevel() + + # ==== DataFrame Fixtures ==== @pytest.fixture(scope="function") def dataset(df_type: typing.Literal['cudf', 'pandas']): diff --git a/tests/examples/developer_guide/test_python_modules.py b/tests/examples/developer_guide/test_python_modules.py index 1c433d6f78..aad7333ce7 100644 --- a/tests/examples/developer_guide/test_python_modules.py +++ b/tests/examples/developer_guide/test_python_modules.py @@ -38,7 +38,7 @@ os.path.join(EXAMPLES_DIR, "my_compound_module_consumer_stage.py"), os.path.join(EXAMPLES_DIR, "my_test_module_consumer_stage.py") ]) -def test_pipeline(config: Config, import_mod: list[types.ModuleType]): +def test_pipeline(config: Config, import_mod: list[types.ModuleType], morpheus_log_level: int): my_compound_module_consumer_stage = import_mod[-2] my_test_module_consumer_stage = import_mod[-1] @@ -72,7 +72,7 @@ def test_pipeline(config: Config, import_mod: list[types.ModuleType]): pipeline.add_stage(my_test_module_consumer_stage.MyPassthroughModuleWrapper(config)) pipeline.add_stage(my_compound_module_consumer_stage.MyCompoundOpModuleWrapper(config)) - pipeline.add_stage(MonitorStage(config)) + pipeline.add_stage(MonitorStage(config, log_level=morpheus_log_level)) comp_stage = pipeline.add_stage(CompareDataFrameStage(config, expected_df)) pipeline.run() diff --git a/tests/examples/digital_fingerprinting/test_dfp_inference_stage.py b/tests/examples/digital_fingerprinting/test_dfp_inference_stage.py index f4dda7c815..46defbbbee 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_inference_stage.py +++ b/tests/examples/digital_fingerprinting/test_dfp_inference_stage.py @@ -71,19 +71,18 @@ def test_get_model(config: Config, mock_mlflow_client: mock.MagicMock, mock_mode @pytest.mark.usefixtures("reset_loglevel") -@pytest.mark.parametrize('morpheus_log_level', - [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) +@pytest.mark.parametrize('log_level', [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) def test_on_data( config: Config, mock_mlflow_client: mock.MagicMock, # pylint: disable=unused-argument mock_model_manager: mock.MagicMock, dfp_multi_message: "MultiDFPMessage", # noqa: F821 - morpheus_log_level: int, + log_level: int, dataset_pandas: DatasetManager): from dfp.messages.multi_dfp_message import MultiDFPMessage from dfp.stages.dfp_inference_stage import DFPInferenceStage - set_log_level(morpheus_log_level) + set_log_level(log_level) expected_results = list(range(1000, dfp_multi_message.mess_count + 1000)) diff --git a/tests/examples/digital_fingerprinting/test_dfp_postprocessing_stage.py b/tests/examples/digital_fingerprinting/test_dfp_postprocessing_stage.py index 4b13bacde5..6eed4c0d9e 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_postprocessing_stage.py +++ b/tests/examples/digital_fingerprinting/test_dfp_postprocessing_stage.py @@ -35,14 +35,13 @@ def test_constructor(config: Config): @pytest.mark.usefixtures("reset_loglevel") @pytest.mark.parametrize('use_on_data', [True, False]) -@pytest.mark.parametrize('morpheus_log_level', - [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) +@pytest.mark.parametrize('log_level', [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) @mock.patch('dfp.stages.dfp_postprocessing_stage.datetime') def test_process_events_on_data(mock_datetime: mock.MagicMock, config: Config, dfp_multi_ae_message: MultiAEMessage, use_on_data: bool, - morpheus_log_level: int): + log_level: int): from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage mock_dt_obj = mock.MagicMock() @@ -54,7 +53,7 @@ def test_process_events_on_data(mock_datetime: mock.MagicMock, df.loc[10, 'v2'] = np.nan df['event_time'] = '' - set_log_level(morpheus_log_level) + set_log_level(log_level) stage = DFPPostprocessingStage(config) # on_data is a thin wrapper around process_events, tests should be the same for non-empty messages diff --git a/tests/examples/digital_fingerprinting/test_dfp_preprocessing_stage.py b/tests/examples/digital_fingerprinting/test_dfp_preprocessing_stage.py index bf82381879..c7859cd90c 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_preprocessing_stage.py +++ b/tests/examples/digital_fingerprinting/test_dfp_preprocessing_stage.py @@ -36,17 +36,16 @@ def test_constructor(config: Config): @pytest.mark.usefixtures("reset_loglevel") -@pytest.mark.parametrize('morpheus_log_level', - [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) +@pytest.mark.parametrize('log_level', [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) def test_process_features( config: Config, dfp_multi_message: "MultiDFPMessage", # noqa: F821 dataset_pandas: DatasetManager, - morpheus_log_level: int): + log_level: int): from dfp.messages.multi_dfp_message import MultiDFPMessage from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage - set_log_level(morpheus_log_level) + set_log_level(log_level) expected_df = dfp_multi_message.get_meta_dataframe().copy(deep=True) expected_df['v210'] = expected_df['v2'] + 10 diff --git a/tests/test_abp.py b/tests/test_abp.py index 86778bfdb6..a3248deb7e 100755 --- a/tests/test_abp.py +++ b/tests/test_abp.py @@ -52,7 +52,7 @@ @pytest.mark.slow @pytest.mark.use_python @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path): +def test_abp_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH] @@ -98,7 +98,8 @@ def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path): pipe.add_stage(PreprocessFILStage(config)) pipe.add_stage( TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='test:0000', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config)) pipe.add_stage(AddScoresStage(config, prefix="score_")) pipe.add_stage( @@ -115,7 +116,7 @@ def test_abp_no_cpp(mock_triton_client, config: Config, tmp_path): @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_abp_cpp(config, tmp_path): +def test_abp_cpp(config: Config, tmp_path: str, morpheus_log_level: int): config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE @@ -141,7 +142,8 @@ def test_abp_cpp(config, tmp_path): pipe.add_stage( TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config)) pipe.add_stage(AddScoresStage(config, prefix="score_")) pipe.add_stage( @@ -158,7 +160,10 @@ def test_abp_cpp(config, tmp_path): @pytest.mark.slow @pytest.mark.use_python @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path): +def test_abp_multi_segment_no_cpp(mock_triton_client: mock.MagicMock, + config: Config, + tmp_path: str, + morpheus_log_level: int): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH] @@ -213,7 +218,8 @@ def test_abp_multi_segment_no_cpp(mock_triton_client, config: Config, tmp_path): pipe.add_segment_boundary(MultiResponseMessage) # Boundary 3 - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config)) pipe.add_segment_boundary(MultiResponseMessage) # Boundary 4 diff --git a/tests/test_abp_kafka.py b/tests/test_abp_kafka.py index 0e1f040612..46306ff29c 100755 --- a/tests/test_abp_kafka.py +++ b/tests/test_abp_kafka.py @@ -61,7 +61,8 @@ def test_abp_no_cpp(mock_triton_client: mock.MagicMock, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + morpheus_log_level: int): mock_metadata = { "inputs": [{ 'name': 'input__0', 'datatype': 'FP32', "shape": [-1, FEATURE_LENGTH] @@ -115,7 +116,8 @@ def test_abp_no_cpp(mock_triton_client: mock.MagicMock, pipe.add_stage(PreprocessFILStage(config)) pipe.add_stage( TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='test:0000', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config)) pipe.add_stage(SerializeStage(config)) pipe.add_stage( @@ -151,7 +153,8 @@ def test_abp_cpp(config: Config, dataset_pandas: DatasetManager, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + morpheus_log_level: int): config.mode = PipelineModes.FIL config.class_labels = ["mining"] config.model_max_batch_size = MODEL_MAX_BATCH_SIZE @@ -183,7 +186,8 @@ def test_abp_cpp(config: Config, pipe.add_stage( TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config)) pipe.add_stage(SerializeStage(config)) pipe.add_stage( diff --git a/tests/test_dfp.py b/tests/test_dfp.py index d32ad3c1e8..2f3bacbdae 100755 --- a/tests/test_dfp.py +++ b/tests/test_dfp.py @@ -23,6 +23,7 @@ from _utils import TEST_DIRS from _utils import calc_error_val +from morpheus.config import Config from morpheus.config import ConfigAutoEncoder from morpheus.config import PipelineModes from morpheus.messages.message_meta import MessageMeta @@ -50,7 +51,7 @@ @pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage]) @pytest.mark.usefixtures("reload_modules") @mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder') -def test_dfp_roleg(mock_ae, config, tmp_path): +def test_dfp_roleg(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int): tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_tensor.csv'), delimiter=',') anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_anomaly_score.csv'), delimiter=',') exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_exp_results.csv')) @@ -107,7 +108,8 @@ def test_dfp_roleg(mock_ae, config, tmp_path): cold_end=False, filter_percent=90.0, zscore_threshold=8.0)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage( ValidationStage(config, val_file_name=val_file_name, @@ -135,7 +137,7 @@ def test_dfp_roleg(mock_ae, config, tmp_path): @pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage]) @pytest.mark.usefixtures("reload_modules") @mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder') -def test_dfp_user123(mock_ae, config, tmp_path): +def test_dfp_user123(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int): tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',') anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',') exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv')) @@ -190,7 +192,8 @@ def test_dfp_user123(mock_ae, config, tmp_path): cold_end=False, filter_percent=90.0, zscore_threshold=8.0)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage( ValidationStage(config, val_file_name=val_file_name, @@ -217,7 +220,7 @@ def test_dfp_user123(mock_ae, config, tmp_path): @pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage]) @pytest.mark.usefixtures("reload_modules") @mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder') -def test_dfp_user123_multi_segment(mock_ae, config, tmp_path): +def test_dfp_user123_multi_segment(mock_ae: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int): tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',') anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',') exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv')) @@ -278,7 +281,8 @@ def test_dfp_user123_multi_segment(mock_ae, config, tmp_path): filter_percent=90.0, zscore_threshold=8.0)) pipe.add_segment_boundary(MultiResponseMessage) # Boundary 6 - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage( ValidationStage(config, val_file_name=val_file_name, diff --git a/tests/test_dfp_kafka.py b/tests/test_dfp_kafka.py index 5b28ae6f7c..8bd4900b96 100755 --- a/tests/test_dfp_kafka.py +++ b/tests/test_dfp_kafka.py @@ -64,7 +64,8 @@ def test_dfp_roleg(mock_ae: mock.MagicMock, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + morpheus_log_level: int): tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_tensor.csv'), delimiter=',') anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_anomaly_score.csv'), delimiter=',') exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_roleg_exp_results.csv')) @@ -116,7 +117,8 @@ def test_dfp_roleg(mock_ae: mock.MagicMock, cold_end=False, filter_percent=90.0, zscore_threshold=8.0)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(SerializeStage(config, include=[])) pipe.add_stage( WriteToKafkaStage(config, bootstrap_servers=kafka_bootstrap_servers, output_topic=kafka_topics.output_topic)) @@ -166,7 +168,8 @@ def test_dfp_user123(mock_ae: mock.MagicMock, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + morpheus_log_level: int): tensor_data = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_tensor.csv'), delimiter=',') anomaly_score = np.loadtxt(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_anomaly_score.csv'), delimiter=',') exp_results = pd.read_csv(os.path.join(TEST_DIRS.tests_data_dir, 'dfp_user123_exp_results.csv')) @@ -217,7 +220,8 @@ def test_dfp_user123(mock_ae: mock.MagicMock, cold_end=False, filter_percent=90.0, zscore_threshold=8.0)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(SerializeStage(config, include=[])) pipe.add_stage( WriteToKafkaStage(config, bootstrap_servers=kafka_bootstrap_servers, output_topic=kafka_topics.output_topic)) diff --git a/tests/test_monitor_stage.py b/tests/test_monitor_stage.py index e023f159b3..68b1b35ca7 100755 --- a/tests/test_monitor_stage.py +++ b/tests/test_monitor_stage.py @@ -151,23 +151,22 @@ def test_progress_sink(mock_morph_tqdm: mock.MagicMock, config: Config): @pytest.mark.usefixtures("reset_loglevel") -@pytest.mark.parametrize('morpheus_log_level', - [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) +@pytest.mark.parametrize('log_level', [logging.CRITICAL, logging.ERROR, logging.WARNING, logging.INFO, logging.DEBUG]) @mock.patch('morpheus.stages.general.monitor_stage.MonitorController.sink_on_completed', autospec=True) @mock.patch('morpheus.stages.general.monitor_stage.MonitorController.progress_sink', autospec=True) def test_log_level(mock_progress_sink: mock.MagicMock, mock_sink_on_completed: mock.MagicMock, config: Config, - morpheus_log_level: int): + log_level: int): """ Test ensures the monitor stage doesn't add itself to the MRC pipeline if not configured for the current log-level """ input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - set_log_level(morpheus_log_level) + set_log_level(log_level) monitor_stage_level = logging.INFO - should_be_included = (morpheus_log_level <= monitor_stage_level) + should_be_included = (log_level <= monitor_stage_level) pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=input_file)) @@ -179,16 +178,13 @@ def test_log_level(mock_progress_sink: mock.MagicMock, assert mock_sink_on_completed.call_count == expected_call_count -@pytest.mark.usefixtures("reset_loglevel") @pytest.mark.use_python -def test_thread(config: Config): +def test_thread(config: Config, morpheus_log_level: int): """ - Test ensures the monitor stage doesn't add itself to the MRC pipeline if not configured for the current log-level + Test ensures the monitor stage executes on the same thread as the parent stage """ input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - set_log_level(log_level=logging.INFO) - monitor_thread_id = None # Create a dummy count function where we can save the thread id from the monitor stage @@ -202,8 +198,9 @@ def fake_determine_count_fn(x): pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=input_file)) dummy_stage = pipe.add_stage(RecordThreadIdStage(config)) - pipe.add_stage(MonitorStage(config, determine_count_fn=fake_determine_count_fn)) + pipe.add_stage(MonitorStage(config, determine_count_fn=fake_determine_count_fn, log_level=morpheus_log_level)) pipe.run() # Check that the thread ids are the same + assert monitor_thread_id is not None assert dummy_stage.thread_id == monitor_thread_id diff --git a/tests/test_phishing.py b/tests/test_phishing.py index 4f434e993e..77e752ef3f 100755 --- a/tests/test_phishing.py +++ b/tests/test_phishing.py @@ -23,6 +23,7 @@ from _utils import TEST_DIRS from _utils import calc_error_val from _utils import mk_async_infer +from morpheus.config import Config from morpheus.config import PipelineModes from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage @@ -44,7 +45,7 @@ @pytest.mark.slow @pytest.mark.use_python @mock.patch('tritonclient.grpc.InferenceServerClient') -def test_email_no_cpp(mock_triton_client, config, tmp_path): +def test_email_no_cpp(mock_triton_client: mock.MagicMock, config: Config, tmp_path: str, morpheus_log_level: int): mock_metadata = { "inputs": [{ "name": "input_ids", "datatype": "INT64", "shape": [-1, FEATURE_LENGTH] @@ -96,7 +97,8 @@ def test_email_no_cpp(mock_triton_client, config, tmp_path): pipe.add_stage( TritonInferenceStage(config, model_name='phishing-bert-onnx', server_url='test:0000', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config, labels=["is_phishing"], threshold=0.7)) pipe.add_stage( ValidationStage(config, val_file_name=val_file_name, results_file_name=results_file_name, rel_tol=0.05)) @@ -111,7 +113,7 @@ def test_email_no_cpp(mock_triton_client, config, tmp_path): @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_email_cpp(config, tmp_path): +def test_email_cpp(config: Config, tmp_path: str, morpheus_log_level: int): config.mode = PipelineModes.NLP config.class_labels = load_labels_file(os.path.join(TEST_DIRS.data_dir, "labels_phishing.txt")) config.model_max_batch_size = MODEL_MAX_BATCH_SIZE @@ -139,7 +141,8 @@ def test_email_cpp(config, tmp_path): model_name='phishing-bert-onnx', server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config, labels=["is_phishing"], threshold=0.7)) pipe.add_stage( ValidationStage(config, val_file_name=val_file_name, results_file_name=results_file_name, rel_tol=0.05)) diff --git a/tests/test_phishing_kafka.py b/tests/test_phishing_kafka.py index ba8fa1a14f..1a04061cc9 100755 --- a/tests/test_phishing_kafka.py +++ b/tests/test_phishing_kafka.py @@ -60,7 +60,8 @@ def test_email_no_cpp(mock_triton_client: mock.MagicMock, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + morpheus_log_level: int): mock_metadata = { "inputs": [{ "name": "input_ids", "datatype": "INT64", "shape": [-1, FEATURE_LENGTH] @@ -120,7 +121,8 @@ def test_email_no_cpp(mock_triton_client: mock.MagicMock, pipe.add_stage( TritonInferenceStage(config, model_name='phishing-bert-onnx', server_url='test:0000', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config, labels=["is_phishing"], threshold=0.7)) pipe.add_stage(SerializeStage(config)) pipe.add_stage( @@ -153,7 +155,8 @@ def test_email_cpp(dataset_pandas: DatasetManager, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + morpheus_log_level: int): config.mode = PipelineModes.NLP config.class_labels = load_labels_file(os.path.join(TEST_DIRS.data_dir, "labels_phishing.txt")) config.model_max_batch_size = MODEL_MAX_BATCH_SIZE @@ -187,7 +190,8 @@ def test_email_cpp(dataset_pandas: DatasetManager, model_name='phishing-bert-onnx', server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config, labels=["is_phishing"], threshold=0.7)) pipe.add_stage(SerializeStage(config)) pipe.add_stage( diff --git a/tests/test_sid.py b/tests/test_sid.py index 67ca36161c..2221abe930 100755 --- a/tests/test_sid.py +++ b/tests/test_sid.py @@ -25,6 +25,7 @@ from _utils import calc_error_val from _utils import compare_class_to_scores from _utils import mk_async_infer +from morpheus.config import Config from morpheus.config import CppConfig from morpheus.config import PipelineModes from morpheus.pipeline import LinearPipeline @@ -44,7 +45,15 @@ MODEL_MAX_BATCH_SIZE = 32 -def _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_name: str = "data"): +def _run_minibert_pipeline( + *, + config: Config, + tmp_path: str, + model_name: str, + truncated: bool, + morpheus_log_level: int, + data_col_name: str = "data", +): """ Runs just the Minibert Pipeline """ @@ -100,7 +109,8 @@ def _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_nam column=data_col_name)) pipe.add_stage( TritonInferenceStage(config, model_name=model_name, server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config, threshold=0.5, prefix="si_")) pipe.add_stage(AddScoresStage(config, prefix="score_")) pipe.add_stage( @@ -113,7 +123,13 @@ def _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_nam return calc_error_val(results_file_name) -def _run_minibert(config, tmp_path, model_name, truncated, data_col_name: str = "data"): +def _run_minibert(*, + config: Config, + tmp_path: str, + model_name: str, + truncated: bool, + morpheus_log_level: int, + data_col_name: str = "data"): """ Runs the minibert pipeline and mocks the Triton Python interface """ @@ -145,15 +161,24 @@ def _run_minibert(config, tmp_path, model_name, truncated, data_col_name: str = async_infer = mk_async_infer(inf_results) mock_triton_client.async_infer.side_effect = async_infer - return _run_minibert_pipeline(config, tmp_path, model_name, truncated, data_col_name) + return _run_minibert_pipeline(config=config, + tmp_path=tmp_path, + model_name=model_name, + truncated=truncated, + data_col_name=data_col_name, + morpheus_log_level=morpheus_log_level) @pytest.mark.slow @pytest.mark.use_cpp @pytest.mark.usefixtures("launch_mock_triton") -def test_minibert_no_trunc(config, tmp_path): +def test_minibert_no_trunc(config: Config, tmp_path: str, morpheus_log_level: int): - results = _run_minibert(config, tmp_path, "sid-minibert-onnx-no-trunc", False) + results = _run_minibert(config=config, + tmp_path=tmp_path, + model_name="sid-minibert-onnx-no-trunc", + truncated=False, + morpheus_log_level=morpheus_log_level) # Not sure why these are different if (CppConfig.get_should_use_cpp()): @@ -164,22 +189,15 @@ def test_minibert_no_trunc(config, tmp_path): @pytest.mark.slow @pytest.mark.usefixtures("launch_mock_triton") -def test_minibert_truncated(config, tmp_path): - - results = _run_minibert(config, tmp_path, 'sid-minibert-onnx', True) - - # Not sure why these are different - if (CppConfig.get_should_use_cpp()): - assert results.diff_rows == 1204 - else: - assert results.diff_rows == 1333 - - -@pytest.mark.slow -@pytest.mark.usefixtures("launch_mock_triton") -def test_minibert_data_col_name(config, tmp_path): - - results = _run_minibert(config, tmp_path, 'sid-minibert-onnx', True, "definitely_not_data") +@pytest.mark.parametrize("data_col_name", ["data", "definitely_not_data"]) +def test_minibert_truncated(config: Config, tmp_path: str, morpheus_log_level: int, data_col_name: str): + + results = _run_minibert(config=config, + tmp_path=tmp_path, + model_name='sid-minibert-onnx', + truncated=True, + data_col_name=data_col_name, + morpheus_log_level=morpheus_log_level) # Not sure why these are different if (CppConfig.get_should_use_cpp()): diff --git a/tests/test_sid_kafka.py b/tests/test_sid_kafka.py index ecc87de4b3..a50544c9c9 100755 --- a/tests/test_sid_kafka.py +++ b/tests/test_sid_kafka.py @@ -58,7 +58,8 @@ def test_minibert_no_cpp(mock_triton_client: mock.MagicMock, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + morpheus_log_level: int): mock_metadata = { "inputs": [{ "name": "input_ids", "datatype": "INT32", "shape": [-1, FEATURE_LENGTH] @@ -117,7 +118,8 @@ def test_minibert_no_cpp(mock_triton_client: mock.MagicMock, add_special_tokens=False)) pipe.add_stage( TritonInferenceStage(config, model_name='sid-minibert-onnx', server_url='fake:001', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config, threshold=0.5, prefix="si_")) pipe.add_stage(SerializeStage(config)) pipe.add_stage( @@ -150,7 +152,8 @@ def test_minibert_cpp(dataset_pandas: DatasetManager, config: Config, kafka_bootstrap_servers: str, kafka_topics: KafkaTopics, - kafka_consumer: "KafkaConsumer"): + kafka_consumer: "KafkaConsumer", + morpheus_log_level: int): config.mode = PipelineModes.NLP config.class_labels = [ "address", @@ -187,7 +190,8 @@ def test_minibert_cpp(dataset_pandas: DatasetManager, model_name='sid-minibert-onnx', server_url='localhost:8001', force_convert_inputs=True)) - pipe.add_stage(MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf")) + pipe.add_stage( + MonitorStage(config, description="Inference Rate", smoothing=0.001, unit="inf", log_level=morpheus_log_level)) pipe.add_stage(AddClassificationsStage(config, threshold=0.5, prefix="si_")) pipe.add_stage(SerializeStage(config)) pipe.add_stage(