From 79f6ef7d47c547af787415c1164e81bbab00fdc6 Mon Sep 17 00:00:00 2001 From: Artsiom Tserashkovich Date: Thu, 19 Oct 2023 22:44:10 +0200 Subject: [PATCH] Add errors processor for async operations (#1539) Co-authored-by: Artsiom Tserashkovich --- CHANGELOG.md | 1 + src/neptune/envs.py | 3 + .../async_operation_processor.py | 8 +-- .../operations_errors_processor.py | 38 ++++++++++ .../utils/test_operations_errors_processor.py | 71 +++++++++++++++++++ 5 files changed, 116 insertions(+), 5 deletions(-) create mode 100644 src/neptune/internal/operation_processors/operations_errors_processor.py create mode 100644 tests/unit/neptune/new/internal/utils/test_operations_errors_processor.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 82091a045..ffadeba6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Run async callback in new daemon thread ([#1521](https://github.com/neptune-ai/neptune-client/pull/1521)) - Better handle bool values of `git_ref` param in `init_run` ([#1525](https://github.com/neptune-ai/neptune-client/pull/1525)) - Updated management docstrings ([#1500](https://github.com/neptune-ai/neptune-client/pull/1500)) +- Sample logging for series errors ([#1539](https://github.com/neptune-ai/neptune-client/pull/1539)) ### Changes - Safety (errors suppressing) execution mode ([#1503](https://github.com/neptune-ai/neptune-client/pull/1503)) diff --git a/src/neptune/envs.py b/src/neptune/envs.py index 5ad693578..d915c4f46 100644 --- a/src/neptune/envs.py +++ b/src/neptune/envs.py @@ -35,6 +35,7 @@ "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK", "NEPTUNE_ENABLE_DEFAULT_ASYNC_NO_PROGRESS_CALLBACK", "NEPTUNE_DISABLE_PARENT_DIR_DELETION", + "NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS", ] from neptune.common.envs import ( @@ -80,4 +81,6 @@ NEPTUNE_DISABLE_PARENT_DIR_DELETION = "NEPTUNE_DISABLE_PARENT_DIR_DELETION" +NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS = "NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS" + S3_ENDPOINT_URL = "S3_ENDPOINT_URL" diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 62a9f76cc..77729a39f 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -48,6 +48,7 @@ OperationStorage, get_container_dir, ) +from neptune.internal.operation_processors.operations_errors_processor import OperationsErrorsProcessor from neptune.internal.operation_processors.utils import common_metadata from neptune.internal.threading.daemon import Daemon from neptune.internal.utils.disk_full import ensure_disk_not_full @@ -280,6 +281,7 @@ def __init__( ): super().__init__(sleep_time=sleep_time, name="NeptuneAsyncOpProcessor") self._processor: "AsyncOperationProcessor" = processor + self._errors_processor: OperationsErrorsProcessor = OperationsErrorsProcessor() self._batch_size: int = batch_size self._last_flush: float = 0.0 self._no_progress_exceeded: bool = False @@ -346,11 +348,7 @@ def process_batch(self, batch: List[Operation], version: int) -> None: self._processor._last_ack = monotonic() self._processor._lag_exceeded = False - for error in errors: - logger.error( - "Error occurred during asynchronous operation processing: %s", - error, - ) + self._errors_processor.handle(errors) self._processor._consumed_version = version_to_ack diff --git a/src/neptune/internal/operation_processors/operations_errors_processor.py b/src/neptune/internal/operation_processors/operations_errors_processor.py new file mode 100644 index 000000000..4bcdfc24f --- /dev/null +++ b/src/neptune/internal/operation_processors/operations_errors_processor.py @@ -0,0 +1,38 @@ +import os +import re +from typing import ( + List, + Set, +) + +from neptune.common.exceptions import NeptuneException +from neptune.envs import NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS +from neptune.exceptions import MetadataInconsistency +from neptune.internal.utils.logger import logger + + +class OperationsErrorsProcessor: + def __init__(self) -> None: + self._sampling_enabled = os.getenv(NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS, "false").lower() in ("true", "1", "t") + self._error_sampling_exp = re.compile( + r"X-coordinates \(step\) must be strictly increasing for series attribute: (.*)\. Invalid point: (.*)" + ) + self._logged_steps: Set[str] = set() + + def handle(self, errors: List[NeptuneException]) -> None: + for error in errors: + if self._sampling_enabled and isinstance(error, MetadataInconsistency): + match_exp = self._error_sampling_exp.match(str(error)) + if match_exp: + self._handle_not_increased_error_for_step(error, match_exp.group(2)) + continue + + logger.error("Error occurred during asynchronous operation processing: %s", str(error)) + + def _handle_not_increased_error_for_step(self, error: MetadataInconsistency, step: str) -> None: + if step not in self._logged_steps: + self._logged_steps.add(step) + logger.error( + f"Error occurred during asynchronous operation processing: {str(error)}. " + + f"Suppressing other errors for step: {step}." + ) diff --git a/tests/unit/neptune/new/internal/utils/test_operations_errors_processor.py b/tests/unit/neptune/new/internal/utils/test_operations_errors_processor.py new file mode 100644 index 000000000..900f66dac --- /dev/null +++ b/tests/unit/neptune/new/internal/utils/test_operations_errors_processor.py @@ -0,0 +1,71 @@ +import os +from unittest.mock import patch + +from neptune.common.exceptions import NeptuneException +from neptune.envs import NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS +from neptune.exceptions import MetadataInconsistency +from neptune.internal.operation_processors.operations_errors_processor import OperationsErrorsProcessor + + +class TestOperationsErrorsProcessor: + @patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "True"}) + def test_sample_only_repeated_steps(self, capsys): + processor = OperationsErrorsProcessor() + duplicated_errors = [ + MetadataInconsistency( + "X-coordinates (step) must be strictly increasing for series attribute: a. Invalid point: 2.0" + ), + MetadataInconsistency( + "X-coordinates (step) must be strictly increasing for series attribute: b. Invalid point: 2.0" + ), + MetadataInconsistency( + "X-coordinates (step) must be strictly increasing for series attribute: c. Invalid point: 2.0" + ), + ] + + processor.handle(errors=duplicated_errors) + + captured = capsys.readouterr() + assert str(duplicated_errors[0]) in captured.out + assert str(duplicated_errors[1]) not in captured.out + assert str(duplicated_errors[2]) not in captured.out + + @patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "True"}) + def test_not_affect_other_errors(self, capsys): + processor = OperationsErrorsProcessor() + duplicated_errors = list( + [ + MetadataInconsistency("X-coordinates (step) must be strictly increasing for series attribute: a."), + NeptuneException("General error"), + MetadataInconsistency("X-coordinates (step) must be strictly increasing for series attribute: a."), + ] + ) + + processor.handle(errors=duplicated_errors) + + captured = capsys.readouterr() + assert str(duplicated_errors[0]) in captured.out + assert str(duplicated_errors[1]) in captured.out + assert str(duplicated_errors[2]) in captured.out + + @patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "False"}) + def test_not_sample_when_disabled(self, capsys): + processor = OperationsErrorsProcessor() + duplicated_errors = [ + MetadataInconsistency( + "X-coordinates (step) must be strictly increasing for series attribute: a. Invalid point: 2.0" + ), + MetadataInconsistency( + "X-coordinates (step) must be strictly increasing for series attribute: b. Invalid point: 2.0" + ), + MetadataInconsistency( + "X-coordinates (step) must be strictly increasing for series attribute: c. Invalid point: 2.0" + ), + ] + + processor.handle(errors=duplicated_errors) + + captured = capsys.readouterr() + assert str(duplicated_errors[0]) in captured.out + assert str(duplicated_errors[1]) in captured.out + assert str(duplicated_errors[2]) in captured.out