-
Notifications
You must be signed in to change notification settings - Fork 64
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add errors processor for async operations (#1539)
Co-authored-by: Artsiom Tserashkovich <[email protected]>
- Loading branch information
1 parent
b17d986
commit 79f6ef7
Showing
5 changed files
with
116 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
src/neptune/internal/operation_processors/operations_errors_processor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import os | ||
import re | ||
from typing import ( | ||
List, | ||
Set, | ||
) | ||
|
||
from neptune.common.exceptions import NeptuneException | ||
from neptune.envs import NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS | ||
from neptune.exceptions import MetadataInconsistency | ||
from neptune.internal.utils.logger import logger | ||
|
||
|
||
class OperationsErrorsProcessor: | ||
def __init__(self) -> None: | ||
self._sampling_enabled = os.getenv(NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS, "false").lower() in ("true", "1", "t") | ||
self._error_sampling_exp = re.compile( | ||
r"X-coordinates \(step\) must be strictly increasing for series attribute: (.*)\. Invalid point: (.*)" | ||
) | ||
self._logged_steps: Set[str] = set() | ||
|
||
def handle(self, errors: List[NeptuneException]) -> None: | ||
for error in errors: | ||
if self._sampling_enabled and isinstance(error, MetadataInconsistency): | ||
match_exp = self._error_sampling_exp.match(str(error)) | ||
if match_exp: | ||
self._handle_not_increased_error_for_step(error, match_exp.group(2)) | ||
continue | ||
|
||
logger.error("Error occurred during asynchronous operation processing: %s", str(error)) | ||
|
||
def _handle_not_increased_error_for_step(self, error: MetadataInconsistency, step: str) -> None: | ||
if step not in self._logged_steps: | ||
self._logged_steps.add(step) | ||
logger.error( | ||
f"Error occurred during asynchronous operation processing: {str(error)}. " | ||
+ f"Suppressing other errors for step: {step}." | ||
) |
71 changes: 71 additions & 0 deletions
71
tests/unit/neptune/new/internal/utils/test_operations_errors_processor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import os | ||
from unittest.mock import patch | ||
|
||
from neptune.common.exceptions import NeptuneException | ||
from neptune.envs import NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS | ||
from neptune.exceptions import MetadataInconsistency | ||
from neptune.internal.operation_processors.operations_errors_processor import OperationsErrorsProcessor | ||
|
||
|
||
class TestOperationsErrorsProcessor: | ||
@patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "True"}) | ||
def test_sample_only_repeated_steps(self, capsys): | ||
processor = OperationsErrorsProcessor() | ||
duplicated_errors = [ | ||
MetadataInconsistency( | ||
"X-coordinates (step) must be strictly increasing for series attribute: a. Invalid point: 2.0" | ||
), | ||
MetadataInconsistency( | ||
"X-coordinates (step) must be strictly increasing for series attribute: b. Invalid point: 2.0" | ||
), | ||
MetadataInconsistency( | ||
"X-coordinates (step) must be strictly increasing for series attribute: c. Invalid point: 2.0" | ||
), | ||
] | ||
|
||
processor.handle(errors=duplicated_errors) | ||
|
||
captured = capsys.readouterr() | ||
assert str(duplicated_errors[0]) in captured.out | ||
assert str(duplicated_errors[1]) not in captured.out | ||
assert str(duplicated_errors[2]) not in captured.out | ||
|
||
@patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "True"}) | ||
def test_not_affect_other_errors(self, capsys): | ||
processor = OperationsErrorsProcessor() | ||
duplicated_errors = list( | ||
[ | ||
MetadataInconsistency("X-coordinates (step) must be strictly increasing for series attribute: a."), | ||
NeptuneException("General error"), | ||
MetadataInconsistency("X-coordinates (step) must be strictly increasing for series attribute: a."), | ||
] | ||
) | ||
|
||
processor.handle(errors=duplicated_errors) | ||
|
||
captured = capsys.readouterr() | ||
assert str(duplicated_errors[0]) in captured.out | ||
assert str(duplicated_errors[1]) in captured.out | ||
assert str(duplicated_errors[2]) in captured.out | ||
|
||
@patch.dict(os.environ, {NEPTUNE_SAMPLE_SERIES_STEPS_ERRORS: "False"}) | ||
def test_not_sample_when_disabled(self, capsys): | ||
processor = OperationsErrorsProcessor() | ||
duplicated_errors = [ | ||
MetadataInconsistency( | ||
"X-coordinates (step) must be strictly increasing for series attribute: a. Invalid point: 2.0" | ||
), | ||
MetadataInconsistency( | ||
"X-coordinates (step) must be strictly increasing for series attribute: b. Invalid point: 2.0" | ||
), | ||
MetadataInconsistency( | ||
"X-coordinates (step) must be strictly increasing for series attribute: c. Invalid point: 2.0" | ||
), | ||
] | ||
|
||
processor.handle(errors=duplicated_errors) | ||
|
||
captured = capsys.readouterr() | ||
assert str(duplicated_errors[0]) in captured.out | ||
assert str(duplicated_errors[1]) in captured.out | ||
assert str(duplicated_errors[2]) in captured.out |