Skip to content

Commit

Permalink
Improve the logging tests and add support for resetting the logger (#…
Browse files Browse the repository at this point in the history
…1716)

- Allows the logger to be "reset" which is necessary to avoid duplicate logging handlers if `configure_logging` is called multiple times
- Updates all of the logging tests to parameterize the log level and better check if the handlers are actually being called.

- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)

URL: #1716
  • Loading branch information
mdemoret-nv authored and dagardner-nv committed Jun 4, 2024
1 parent 67982a7 commit 3672282
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 95 deletions.
40 changes: 36 additions & 4 deletions morpheus/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import logging.handlers
import multiprocessing
import os
import re
import warnings
import weakref
from enum import Enum

import appdirs
Expand Down Expand Up @@ -153,12 +156,41 @@ def _configure_from_log_level(*extra_handlers: logging.Handler, log_level: int):
queue_listener.start()
queue_listener._thread.name = "Logging Thread"

# Register a function to kill the listener thread before shutting down. prevents error on intpreter close
def stop_queue_listener():
queue_listener.stop()
# Register a function to kill the listener thread when the queue_handler is removed.
weakref.finalize(morpheus_queue_handler, queue_listener.stop)

# Register a handler before shutting down to remove all log handlers, this ensures that the weakref.finalize
# handler we just defined is called at exit.
import atexit
atexit.register(stop_queue_listener)
atexit.register(reset_logging)
else:
raise RuntimeError("Logging has already been configured. Use `set_log_level` to change the log level or reset "
"the logging system by calling `reset_logging`.")


def reset_logging(logger_name: str = "morpheus"):
"""
Resets the Morpheus logging system. This will remove all handlers from the Morpheus logger and stop the queue
listener. This is useful for testing where the logging system needs to be reconfigured multiple times or
reconfigured with different settings.
"""

morpheus_logger = logging.getLogger(logger_name)

for handler in morpheus_logger.handlers.copy():
# Copied from `logging.shutdown`.
try:
handler.acquire()
handler.flush()
handler.close()
except (OSError, ValueError):
pass
finally:
handler.release()
morpheus_logger.removeHandler(handler)

if hasattr(morpheus_logger, "_configured_by_morpheus"):
delattr(morpheus_logger, "_configured_by_morpheus")


def configure_logging(*extra_handlers: logging.Handler, log_level: int = None, log_config_file: str = None):
Expand Down
5 changes: 5 additions & 0 deletions tests/benchmarks/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
from test_bench_e2e_pipelines import E2E_TEST_CONFIGS


@pytest.fixture(autouse=True)
def reset_logging_fixture(reset_logging): # pylint: disable=unused-argument
yield


# pylint: disable=unused-argument
def pytest_benchmark_update_json(config, benchmarks, output_json):

Expand Down
16 changes: 8 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,18 @@ def should_filter_test(item: pytest.Item):
items[:] = [x for x in items if should_filter_test(x)]


def clear_handlers(logger):
handlers = logger.handlers.copy()
for handler in handlers:
logger.removeHandler(handler)
@pytest.fixture(scope="function", name="reset_logging")
def reset_logging_fixture():
from morpheus.utils.logger import reset_logging
reset_logging()
yield


@pytest.hookimpl(trylast=True)
def pytest_runtest_teardown(item, nextitem):
morpheus_logger = logging.getLogger("morpheus")
clear_handlers(morpheus_logger)
clear_handlers(logging.getLogger())
setattr(morpheus_logger, "_configured_by_morpheus", False)
from morpheus.utils.logger import reset_logging
reset_logging(logger_name="morpheus")
reset_logging(logger_name=None) # Reset the root logger as well


# This fixture will be used by all tests.
Expand Down
27 changes: 9 additions & 18 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,28 +144,19 @@ def config_warning_fixture():
@pytest.mark.use_python
class TestCLI:

def test_help(self):
@pytest.mark.parametrize('cmd',
[[], ['tools'], ['run'], ['run', 'pipeline-ae'], ['run', 'pipeline-fil'],
['run', 'pipeline-nlp'], ['run', 'pipeline-other']])
def test_help(self, cmd: list[str]):
runner = CliRunner()
result = runner.invoke(commands.cli, ['--help'])
result = runner.invoke(commands.cli, cmd + ['--help'])
assert result.exit_code == 0, result.output

result = runner.invoke(commands.cli, ['tools', '--help'])
assert result.exit_code == 0, result.output

result = runner.invoke(commands.cli, ['run', '--help'])
assert result.exit_code == 0, result.output

result = runner.invoke(commands.cli, ['run', 'pipeline-ae', '--help'])
assert result.exit_code == 0, result.output

def test_autocomplete(self, tmp_path):
@pytest.mark.parametrize('cmd',
[['tools', 'autocomplete', 'show'], ['tools', 'autocomplete', 'install', '--shell=bash']])
def test_autocomplete(self, tmp_path, cmd: list[str]):
runner = CliRunner()
result = runner.invoke(commands.cli, ['tools', 'autocomplete', 'show'], env={'HOME': str(tmp_path)})
assert result.exit_code == 0, result.output

# The actual results of this are specific to the implementation of click_completion
result = runner.invoke(commands.cli, ['tools', 'autocomplete', 'install', '--shell=bash'],
env={'HOME': str(tmp_path)})
result = runner.invoke(commands.cli, cmd, env={'HOME': str(tmp_path)})
assert result.exit_code == 0, result.output

@pytest.mark.usefixtures("restore_environ")
Expand Down
9 changes: 2 additions & 7 deletions tests/test_dfp_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import typing
from io import StringIO
Expand Down Expand Up @@ -44,20 +43,16 @@
from morpheus.stages.preprocess import train_ae_stage
from morpheus.utils.compare_df import compare_df
from morpheus.utils.file_utils import load_labels_file
from morpheus.utils.logger import configure_logging

if (typing.TYPE_CHECKING):
from kafka import KafkaConsumer

configure_logging(log_level=logging.DEBUG)
# End-to-end test intended to imitate the dfp validation test


@pytest.mark.kafka
@pytest.mark.slow
@pytest.mark.use_python
@pytest.mark.reload_modules([commands, preprocess_ae_stage, train_ae_stage])
@pytest.mark.usefixtures("reload_modules")
@pytest.mark.usefixtures("reload_modules", "loglevel_debug")
@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder')
def test_dfp_roleg(mock_ae: mock.MagicMock,
dataset_pandas: DatasetManager,
Expand Down Expand Up @@ -159,7 +154,7 @@ def test_dfp_roleg(mock_ae: mock.MagicMock,
@pytest.mark.slow
@pytest.mark.use_python
@pytest.mark.reload_modules([preprocess_ae_stage, train_ae_stage])
@pytest.mark.usefixtures("reload_modules")
@pytest.mark.usefixtures("reload_modules", "loglevel_debug")
@mock.patch('morpheus.stages.preprocess.train_ae_stage.AutoEncoder')
def test_dfp_user123(mock_ae: mock.MagicMock,
dataset_pandas: DatasetManager,
Expand Down
157 changes: 101 additions & 56 deletions tests/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,115 +13,160 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import io
import logging
import multiprocessing
import logging.handlers
import os
import re
import time
from unittest.mock import patch

import pytest

from _utils import TEST_DIRS
from morpheus.utils.logger import TqdmLoggingHandler
from morpheus.utils.logger import LogLevels
from morpheus.utils.logger import configure_logging
from morpheus.utils.logger import deprecated_message_warning
from morpheus.utils.logger import deprecated_stage_warning
from morpheus.utils.logger import reset_logging
from morpheus.utils.logger import set_log_level


def _flush_logging_queue(logger: logging.Logger):
for handler in logger.handlers:
if isinstance(handler, logging.handlers.QueueHandler):
while (handler.queue.qsize() != 0):
time.sleep(0.01)


@pytest.fixture(autouse=True)
def reset_logger():
def reset_logging_fixture(reset_logging): # pylint: disable=unused-argument, redefined-outer-name
yield


@patch('logging.handlers.QueueListener.stop')
def test_reset_logging(queue_listener_stop_mock):

configure_logging(log_level=logging.INFO)

morpheus_logger = logging.getLogger("morpheus")
setattr(morpheus_logger, "_configured_by_morpheus", False)

assert len(morpheus_logger.handlers) > 0

reset_logging()

assert len(morpheus_logger.handlers) == 0

# Force garbage collection to ensure the QueueListener is stopped by the reset_logging function
gc.collect()

queue_listener_stop_mock.assert_called()


@patch('logging.handlers.QueueListener')
@patch('logging.handlers.QueueHandler.emit')
def test_configure_logging_from_level_default_handlers(queue_handler, queue_listener):
configure_logging(log_level=logging.DEBUG)
@pytest.mark.parametrize("log_level", LogLevels)
def test_configure_logging_from_level_default_handlers(queue_handler, log_level: type[LogLevels]):
configure_logging(log_level=log_level.value)

morpheus_logger = logging.getLogger("morpheus")
assert morpheus_logger.level == logging.DEBUG

assert morpheus_logger.level == log_level.value
assert morpheus_logger.propagate is False
pos_args = queue_listener.call_args[0]
assert len(pos_args) == 3
assert isinstance(pos_args[0], multiprocessing.queues.Queue)
assert isinstance(pos_args[1], TqdmLoggingHandler)
assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler)
assert pos_args[2].baseFilename.endswith("morpheus.log")
morpheus_logger.debug("test")
queue_handler.assert_called()

morpheus_logger.info("test")

if (log_level.value <= logging.INFO and log_level.value != logging.NOTSET):
queue_handler.assert_called()
else:
queue_handler.assert_not_called()

def test_configure_logging__no_args():
with pytest.raises(Exception) as excinfo:

def test_configure_logging_no_args():
with pytest.raises(Exception, match="log_level must be specified"):
configure_logging()
assert "log_level must be specified" in str(excinfo.value)


@patch('logging.handlers.RotatingFileHandler.emit')
@patch('morpheus.utils.logger.TqdmLoggingHandler.emit')
def test_configure_logging_from_file(console_handler, file_handler):

log_config_file = os.path.join(TEST_DIRS.tests_data_dir, "logging.json")

configure_logging(log_config_file=log_config_file)

morpheus_logger = logging.getLogger("morpheus")
assert morpheus_logger.level == logging.DEBUG

assert morpheus_logger.level == logging.WARNING
assert morpheus_logger.propagate is False

morpheus_logger.debug("test")

console_handler.assert_not_called()
file_handler.assert_not_called()

morpheus_logger.warning("test")

console_handler.assert_called_once()
file_handler.assert_called_once()


def test_configure_logging_multiple_times():
configure_logging(log_level=logging.INFO)

morpheus_logger = logging.getLogger("morpheus")

assert morpheus_logger.level == logging.INFO

# Call configure_logging again without resetting
with pytest.raises(Exception, match="Logging has already been configured"):
configure_logging(log_level=logging.DEBUG)

assert morpheus_logger.level == logging.INFO


def test_configure_logging_from_file_filenotfound():
with pytest.raises(FileNotFoundError):
configure_logging(log_config_file="does_not_exist.json")


@patch('logging.handlers.QueueListener')
@patch('logging.handlers.QueueHandler.emit')
def test_configure_logging_add_one_handler(queue_handler, queue_listener):
new_handler = logging.StreamHandler()
configure_logging(new_handler, log_level=logging.DEBUG)
morpheus_logger = logging.getLogger("morpheus")
assert morpheus_logger.level == logging.DEBUG
assert morpheus_logger.propagate is False
pos_args = queue_listener.call_args[0]
assert len(pos_args) == 4
assert isinstance(pos_args[0], multiprocessing.queues.Queue)
assert isinstance(pos_args[1], TqdmLoggingHandler)
assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler)
assert isinstance(pos_args[3], logging.StreamHandler)
morpheus_logger.debug("test")
queue_handler.assert_called()
def test_configure_logging_custom_handlers():
# Create a string stream for the handler
string_stream_1 = io.StringIO()
string_stream_2 = io.StringIO()

new_handler_1 = logging.StreamHandler(string_stream_1)
new_handler_2 = logging.StreamHandler(string_stream_2)

@patch('logging.handlers.QueueListener')
@patch('logging.handlers.QueueHandler.emit')
def test_configure_logging_add_two_handlers(queue_handler, queue_listener):
new_handler_1 = logging.StreamHandler()
new_handler_2 = logging.StreamHandler()
configure_logging(new_handler_1, new_handler_2, log_level=logging.DEBUG)

morpheus_logger = logging.getLogger("morpheus")
assert morpheus_logger.level == logging.DEBUG
assert morpheus_logger.propagate is False
pos_args = queue_listener.call_args[0]
assert len(pos_args) == 5
assert isinstance(pos_args[0], multiprocessing.queues.Queue)
assert isinstance(pos_args[1], TqdmLoggingHandler)
assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler)
assert isinstance(pos_args[3], logging.StreamHandler)
assert isinstance(pos_args[4], logging.StreamHandler)

morpheus_logger.debug("test")
queue_handler.assert_called()

_flush_logging_queue(morpheus_logger)

string_stream_1.seek(0)
string_stream_2.seek(0)

def test_set_log_level():
assert string_stream_1.getvalue() == "test\n"
assert string_stream_2.getvalue() == "test\n"


@pytest.mark.parametrize("log_level", LogLevels)
def test_set_log_level(log_level: type[LogLevels]):
configure_logging(log_level=logging.INFO)

morpheus_logger = logging.getLogger("morpheus")

assert morpheus_logger.level == logging.INFO
set_log_level(logging.DEBUG)
assert morpheus_logger.level == logging.DEBUG

set_log_level(log_level.value)

assert morpheus_logger.level == log_level.value


def test_deprecated_stage_warning(caplog):
def test_deprecated_stage_warning(caplog: pytest.LogCaptureFixture):

class DummyStage():
pass
Expand All @@ -134,7 +179,7 @@ class DummyStage():
assert "The 'DummyStage' stage ('dummy_stage') has been deprecated" in caplog.text


def test_deprecated_stage_warning_with_reason(caplog):
def test_deprecated_stage_warning_with_reason(caplog: pytest.LogCaptureFixture):

class DummyStage():
pass
Expand Down
Loading

0 comments on commit 3672282

Please sign in to comment.