From c189f42e7a7ac6043e0c3e7c088bfd3c4a157b1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Jankowski?= Date: Tue, 21 Nov 2023 13:54:12 +0100 Subject: [PATCH] No synchronization callbacks reworked (#1567) --- CHANGELOG.md | 3 + src/neptune/internal/init/parameters.py | 2 + .../async_operation_processor.py | 97 ++-- .../internal/operation_processors/factory.py | 33 +- .../internal/signals_processing/__init__.py | 15 + .../signals_processing/background_job.py | 79 +++ .../internal/signals_processing/signals.py | 68 +++ .../signals_processing/signals_processor.py | 127 ++++ .../internal/signals_processing/utils.py | 53 ++ .../metadata_containers/metadata_container.py | 63 +- src/neptune/metadata_containers/model.py | 10 +- .../metadata_containers/model_version.py | 14 +- src/neptune/metadata_containers/run.py | 9 +- .../test_async_operation_processor.py | 18 +- .../internal/signals_processing/__init__.py | 15 + .../test_signals_processor.py | 546 ++++++++++++++++++ 16 files changed, 1031 insertions(+), 121 deletions(-) create mode 100644 src/neptune/internal/signals_processing/__init__.py create mode 100644 src/neptune/internal/signals_processing/background_job.py create mode 100644 src/neptune/internal/signals_processing/signals.py create mode 100644 src/neptune/internal/signals_processing/signals_processor.py create mode 100644 src/neptune/internal/signals_processing/utils.py create mode 100644 tests/unit/neptune/new/internal/signals_processing/__init__.py create mode 100644 tests/unit/neptune/new/internal/signals_processing/test_signals_processor.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d3fc6444..e570cfd87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## [UNRELEASED] neptune 1.8.5 +### Fixes +- Fixed no synchronization callbacks behaviour ([#1567](https://github.com/neptune-ai/neptune-client/pull/1567)) + ### Changes - Enabled hooks for internal downloading functions used by the hosted backend ([#1571](https://github.com/neptune-ai/neptune-client/pull/1571)) - Added timestamp of operation put to disk queue ([#1569](https://github.com/neptune-ai/neptune-client/pull/1569)) diff --git a/src/neptune/internal/init/parameters.py b/src/neptune/internal/init/parameters.py index 4e897957b..0c0389305 100644 --- a/src/neptune/internal/init/parameters.py +++ b/src/neptune/internal/init/parameters.py @@ -20,6 +20,7 @@ "ASYNC_LAG_THRESHOLD", "ASYNC_NO_PROGRESS_THRESHOLD", "DEFAULT_STOP_TIMEOUT", + "IN_BETWEEN_CALLBACKS_MINIMUM_INTERVAL", ] DEFAULT_FLUSH_PERIOD = 5 @@ -28,3 +29,4 @@ ASYNC_LAG_THRESHOLD = 1800.0 ASYNC_NO_PROGRESS_THRESHOLD = 300.0 DEFAULT_STOP_TIMEOUT = 60.0 +IN_BETWEEN_CALLBACKS_MINIMUM_INTERVAL = 300.0 diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 48b66a9e1..b99f5d3e2 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -19,6 +19,7 @@ import threading from datetime import datetime from pathlib import Path +from queue import Queue from time import ( monotonic, time, @@ -33,15 +34,15 @@ ) from neptune.common.exceptions import NeptuneException +from neptune.common.warnings import ( + NeptuneWarning, + warn_once, +) from neptune.constants import ASYNC_DIRECTORY from neptune.envs import NEPTUNE_SYNC_AFTER_STOP_TIMEOUT from neptune.exceptions import NeptuneSynchronizationAlreadyStoppedException from neptune.internal.disk_queue import DiskQueue -from neptune.internal.init.parameters import ( - ASYNC_LAG_THRESHOLD, - ASYNC_NO_PROGRESS_THRESHOLD, - DEFAULT_STOP_TIMEOUT, -) +from neptune.internal.init.parameters import DEFAULT_STOP_TIMEOUT from neptune.internal.metadata_file import MetadataFile from neptune.internal.operation import Operation from neptune.internal.operation_processors.operation_processor import OperationProcessor @@ -50,6 +51,11 @@ get_container_dir, ) from neptune.internal.operation_processors.utils import common_metadata +from neptune.internal.signals_processing.utils import ( + signal_batch_lag, + signal_batch_processed, + signal_batch_started, +) from neptune.internal.threading.daemon import Daemon from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize from neptune.internal.utils.logger import logger @@ -58,6 +64,7 @@ from neptune.internal.backends.neptune_backend import NeptuneBackend from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import UniqueId + from neptune.internal.signals_processing.signals import Signal class AsyncOperationProcessor(OperationProcessor): @@ -70,12 +77,9 @@ def __init__( container_type: "ContainerType", backend: "NeptuneBackend", lock: threading.RLock, + queue: "Queue[Signal]", sleep_time: float = 5, batch_size: int = 1000, - async_lag_callback: Optional[Callable[[], None]] = None, - async_lag_threshold: float = ASYNC_LAG_THRESHOLD, - async_no_progress_callback: Optional[Callable[[], None]] = None, - async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, data_path: Optional[Path] = None, should_print_logs: bool = True, ): @@ -99,17 +103,12 @@ def __init__( self._container_type: "ContainerType" = container_type self._backend: "NeptuneBackend" = backend self._batch_size: int = batch_size - self._async_lag_callback: Callable[[], None] = async_lag_callback or (lambda: None) - self._async_lag_threshold: float = async_lag_threshold - self._async_no_progress_callback: Callable[[], None] = async_no_progress_callback or (lambda: None) - self._async_no_progress_threshold: float = async_no_progress_threshold self._last_version: int = 0 self._consumed_version: int = 0 self._consumer: Daemon = self.ConsumerThread(self, sleep_time, batch_size) self._lock: threading.RLock = lock - self._last_ack: Optional[float] = None - self._lag_exceeded: bool = False - self._should_call_no_progress_callback: bool = False + self._signals_queue: "Queue[Signal]" = queue + self._accepts_operations: bool = True # Caller is responsible for taking this lock self._waiting_cond = threading.Condition(lock=lock) @@ -122,10 +121,11 @@ def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") - @ensure_disk_not_overutilize def enqueue_operation(self, op: Operation, *, wait: bool) -> None: - self._last_version = self._queue.put(op) + if not self._accepts_operations: + warn_once("Not accepting operations", exception=NeptuneWarning) + return - self._check_lag() - self._check_no_progress() + self._last_version = self._queue.put(op) if self._check_queue_size(): self._consumer.wake_up() @@ -158,24 +158,6 @@ def wait(self) -> None: if not self._consumer.is_running(): raise NeptuneSynchronizationAlreadyStoppedException() - def _check_lag(self) -> None: - if self._lag_exceeded or not self._last_ack or monotonic() - self._last_ack <= self._async_lag_threshold: - return - - with self._lock: - if not self._lag_exceeded: - threading.Thread(target=self._async_lag_callback, daemon=True).start() - self._lag_exceeded = True - - def _check_no_progress(self) -> None: - if not self._should_call_no_progress_callback: - return - - with self._lock: - if self._should_call_no_progress_callback: - threading.Thread(target=self._async_no_progress_callback, daemon=True).start() - self._should_call_no_progress_callback = False - def _check_queue_size(self) -> bool: return self._queue.size() > self._batch_size / 2 @@ -276,6 +258,7 @@ def stop(self, seconds: Optional[float] = None) -> None: self._queue.cleanup_if_empty() def close(self) -> None: + self._accepts_operations = False self._queue.close() self._metadata_file.close() @@ -290,7 +273,6 @@ def __init__( self._processor: "AsyncOperationProcessor" = processor self._batch_size: int = batch_size self._last_flush: float = 0.0 - self._no_progress_exceeded: bool = False def run(self) -> None: try: @@ -310,17 +292,11 @@ def work(self) -> None: batch = self._processor._queue.get_batch(self._batch_size) if not batch: return - self.process_batch([element.obj for element in batch], batch[-1].ver) - def _check_no_progress(self) -> None: - if not self._no_progress_exceeded: - if ( - self._processor._last_ack - and monotonic() - self._processor._last_ack > self._processor._async_no_progress_threshold - ): - self._no_progress_exceeded = True - self._processor._should_call_no_progress_callback = True + signal_batch_started(queue=self._processor._signals_queue) + self.process_batch([element.obj for element in batch], batch[-1].ver, batch[-1].at) + # WARNING: Be careful when changing this function. It is used in the experimental package def _handle_errors(self, errors: List[NeptuneException]) -> None: for error in errors: logger.error( @@ -334,32 +310,27 @@ def _handle_errors(self, errors: List[NeptuneException]) -> None: " synced manually using `neptune sync` command." ) ) - def process_batch(self, batch: List[Operation], version: int) -> None: + def process_batch(self, batch: List[Operation], version: int, occurred_at: Optional[float] = None) -> None: + if occurred_at is not None: + signal_batch_lag(queue=self._processor._signals_queue, lag=time() - occurred_at) + expected_count = len(batch) version_to_ack = version - expected_count while True: # TODO: Handle Metadata errors - try: - processed_count, errors = self._processor._backend.execute_operations( - container_id=self._processor._container_id, - container_type=self._processor._container_type, - operations=batch, - operation_storage=self._processor._operation_storage, - ) - except Exception as e: - self._check_no_progress() - # Let default retry logic handle this - raise e from e - - self._no_progress_exceeded = False + processed_count, errors = self._processor._backend.execute_operations( + container_id=self._processor._container_id, + container_type=self._processor._container_type, + operations=batch, + operation_storage=self._processor._operation_storage, + ) + signal_batch_processed(queue=self._processor._signals_queue) version_to_ack += processed_count batch = batch[processed_count:] with self._processor._waiting_cond: self._processor._queue.ack(version_to_ack) - self._processor._last_ack = monotonic() - self._processor._lag_exceeded = False self._handle_errors(errors) diff --git a/src/neptune/internal/operation_processors/factory.py b/src/neptune/internal/operation_processors/factory.py index 1bdab4d95..8e847c14f 100644 --- a/src/neptune/internal/operation_processors/factory.py +++ b/src/neptune/internal/operation_processors/factory.py @@ -18,19 +18,13 @@ import os import threading -from typing import ( - Callable, - Optional, -) +from queue import Queue +from typing import TYPE_CHECKING from neptune.envs import NEPTUNE_ASYNC_BATCH_SIZE from neptune.internal.backends.neptune_backend import NeptuneBackend from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import UniqueId -from neptune.internal.init.parameters import ( - ASYNC_LAG_THRESHOLD, - ASYNC_NO_PROGRESS_THRESHOLD, -) from neptune.types.mode import Mode from .async_operation_processor import AsyncOperationProcessor @@ -39,6 +33,9 @@ from .read_only_operation_processor import ReadOnlyOperationProcessor from .sync_operation_processor import SyncOperationProcessor +if TYPE_CHECKING: + from neptune.internal.signals_processing.signals import Signal + # WARNING: Be careful when changing this function. It is used in the experimental package def build_async_operation_processor( @@ -47,10 +44,7 @@ def build_async_operation_processor( backend: NeptuneBackend, lock: threading.RLock, sleep_time: float, - async_lag_callback: Optional[Callable[[], None]], - async_lag_threshold: float, - async_no_progress_callback: Optional[Callable[[], None]], - async_no_progress_threshold: float, + queue: "Queue[Signal]", ) -> OperationProcessor: return AsyncOperationProcessor( container_id=container_id, @@ -59,10 +53,7 @@ def build_async_operation_processor( lock=lock, sleep_time=sleep_time, batch_size=int(os.environ.get(NEPTUNE_ASYNC_BATCH_SIZE) or "1000"), - async_lag_callback=async_lag_callback, - async_lag_threshold=async_lag_threshold, - async_no_progress_callback=async_no_progress_callback, - async_no_progress_threshold=async_no_progress_threshold, + queue=queue, ) @@ -73,10 +64,7 @@ def get_operation_processor( backend: NeptuneBackend, lock: threading.RLock, flush_period: float, - async_lag_callback: Optional[Callable[[], None]] = None, - async_lag_threshold: float = ASYNC_LAG_THRESHOLD, - async_no_progress_callback: Optional[Callable[[], None]] = None, - async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD, + queue: "Queue[Signal]", ) -> OperationProcessor: if mode == Mode.ASYNC: return build_async_operation_processor( @@ -85,10 +73,7 @@ def get_operation_processor( backend=backend, lock=lock, sleep_time=flush_period, - async_lag_callback=async_lag_callback, - async_lag_threshold=async_lag_threshold, - async_no_progress_callback=async_no_progress_callback, - async_no_progress_threshold=async_no_progress_threshold, + queue=queue, ) elif mode == Mode.SYNC: return SyncOperationProcessor(container_id, container_type, backend) diff --git a/src/neptune/internal/signals_processing/__init__.py b/src/neptune/internal/signals_processing/__init__.py new file mode 100644 index 000000000..8d06af532 --- /dev/null +++ b/src/neptune/internal/signals_processing/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/src/neptune/internal/signals_processing/background_job.py b/src/neptune/internal/signals_processing/background_job.py new file mode 100644 index 000000000..4d98d5bd4 --- /dev/null +++ b/src/neptune/internal/signals_processing/background_job.py @@ -0,0 +1,79 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +__all__ = ["CallbacksMonitor"] + +from queue import Queue +from typing import ( + TYPE_CHECKING, + Callable, + Optional, +) + +from neptune.internal.background_job import BackgroundJob +from neptune.internal.signals_processing.signals_processor import SignalsProcessor + +if TYPE_CHECKING: + from neptune.internal.signals_processing.signals import Signal + from neptune.metadata_containers import MetadataContainer + + +class CallbacksMonitor(BackgroundJob): + def __init__( + self, + queue: "Queue[Signal]", + async_lag_threshold: float, + async_no_progress_threshold: float, + async_lag_callback: Optional[Callable[["MetadataContainer"], None]] = None, + async_no_progress_callback: Optional[Callable[["MetadataContainer"], None]] = None, + period: float = 10, + ) -> None: + self._period: float = period + self._queue: "Queue[Signal]" = queue + self._thread: Optional["SignalsProcessor"] = None + self._started: bool = False + self._async_lag_threshold: float = async_lag_threshold + self._async_no_progress_threshold: float = async_no_progress_threshold + self._async_lag_callback: Optional[Callable[["MetadataContainer"], None]] = async_lag_callback + self._async_no_progress_callback: Optional[Callable[["MetadataContainer"], None]] = async_no_progress_callback + + def start(self, container: "MetadataContainer") -> None: + self._thread = SignalsProcessor( + period=self._period, + container=container, + queue=self._queue, + async_lag_threshold=self._async_lag_threshold, + async_no_progress_threshold=self._async_no_progress_threshold, + async_lag_callback=self._async_lag_callback, + async_no_progress_callback=self._async_no_progress_callback, + ) + self._thread.start() + self._started = True + + def stop(self) -> None: + if self._thread and self._started: + self._thread.interrupt() + + def join(self, seconds: Optional[float] = None) -> None: + if self._thread and self._started: + self._thread.join(seconds) + + def pause(self) -> None: + if self._thread: + self._thread.pause() + + def resume(self) -> None: + if self._thread: + self._thread.resume() diff --git a/src/neptune/internal/signals_processing/signals.py b/src/neptune/internal/signals_processing/signals.py new file mode 100644 index 000000000..74004df0a --- /dev/null +++ b/src/neptune/internal/signals_processing/signals.py @@ -0,0 +1,68 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +__all__ = [ + "Signal", + "SignalsVisitor", + "BatchStartedSignal", + "BatchProcessedSignal", + "BatchLagSignal", +] + +from abc import abstractmethod +from dataclasses import dataclass + + +@dataclass +class Signal: + occured_at: float + + @abstractmethod + def accept(self, visitor: "SignalsVisitor") -> None: + ... + + +@dataclass +class BatchStartedSignal(Signal): + def accept(self, visitor: "SignalsVisitor") -> None: + visitor.visit_batch_started(signal=self) + + +@dataclass +class BatchProcessedSignal(Signal): + def accept(self, visitor: "SignalsVisitor") -> None: + visitor.visit_batch_processed(signal=self) + + +@dataclass +class BatchLagSignal(Signal): + lag: float + + def accept(self, visitor: "SignalsVisitor") -> None: + visitor.visit_batch_lag(signal=self) + + +class SignalsVisitor: + @abstractmethod + def visit_batch_started(self, signal: Signal) -> None: + ... + + @abstractmethod + def visit_batch_processed(self, signal: Signal) -> None: + ... + + @abstractmethod + def visit_batch_lag(self, signal: Signal) -> None: + ... diff --git a/src/neptune/internal/signals_processing/signals_processor.py b/src/neptune/internal/signals_processing/signals_processor.py new file mode 100644 index 000000000..b3c91ab9d --- /dev/null +++ b/src/neptune/internal/signals_processing/signals_processor.py @@ -0,0 +1,127 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +__all__ = ["SignalsProcessor"] + +from queue import ( + Empty, + Queue, +) +from threading import Thread +from time import monotonic +from typing import ( + TYPE_CHECKING, + Callable, + Optional, +) + +from neptune.internal.init.parameters import IN_BETWEEN_CALLBACKS_MINIMUM_INTERVAL +from neptune.internal.signals_processing.signals import ( + BatchLagSignal, + SignalsVisitor, +) +from neptune.internal.threading.daemon import Daemon + +if TYPE_CHECKING: + from neptune.internal.signals_processing.signals import Signal + from neptune.metadata_containers import MetadataContainer + + +class SignalsProcessor(Daemon, SignalsVisitor): + def __init__( + self, + *, + period: float, + container: "MetadataContainer", + queue: "Queue[Signal]", + async_lag_threshold: float, + async_no_progress_threshold: float, + async_lag_callback: Optional[Callable[["MetadataContainer"], None]] = None, + async_no_progress_callback: Optional[Callable[["MetadataContainer"], None]] = None, + callbacks_interval: float = IN_BETWEEN_CALLBACKS_MINIMUM_INTERVAL, + in_async: bool = True, + ) -> None: + super().__init__(sleep_time=period, name="CallbacksMonitor") + + self._container: "MetadataContainer" = container + self._queue: "Queue[Signal]" = queue + self._async_lag_threshold: float = async_lag_threshold + self._async_no_progress_threshold: float = async_no_progress_threshold + self._async_lag_callback: Optional[Callable[["MetadataContainer"], None]] = async_lag_callback + self._async_no_progress_callback: Optional[Callable[["MetadataContainer"], None]] = async_no_progress_callback + self._callbacks_interval: float = callbacks_interval + self._in_async: bool = in_async + + self._last_batch_started_at: Optional[float] = None + self._last_no_progress_callback_at: Optional[float] = None + self._last_lag_callback_at: Optional[float] = None + + def visit_batch_started(self, signal: "Signal") -> None: + if self._last_batch_started_at is None: + self._last_batch_started_at = signal.occured_at + + def visit_batch_processed(self, signal: "Signal") -> None: + if self._last_batch_started_at is not None: + self._check_no_progress(at_timestamp=signal.occured_at) + self._last_batch_started_at = None + + def visit_batch_lag(self, signal: "Signal") -> None: + if self._async_lag_callback is None or not isinstance(signal, BatchLagSignal): + return + + if signal.lag > self._async_lag_threshold: + current_time = monotonic() + if ( + self._last_lag_callback_at is None + or current_time - self._last_lag_callback_at > self._callbacks_interval + ): + execute_callback(callback=self._async_lag_callback, container=self._container, in_async=self._in_async) + self._last_lag_callback_at = current_time + + def _check_callbacks(self) -> None: + self._check_no_progress(at_timestamp=monotonic()) + + def _check_no_progress(self, at_timestamp: float) -> None: + if self._async_no_progress_callback is None: + return + + if self._last_batch_started_at is not None: + if at_timestamp - self._last_batch_started_at > self._async_no_progress_threshold: + if ( + self._last_no_progress_callback_at is None + or at_timestamp - self._last_no_progress_callback_at > self._callbacks_interval + ): + execute_callback( + callback=self._async_no_progress_callback, container=self._container, in_async=self._in_async + ) + self._last_no_progress_callback_at = monotonic() + + def work(self) -> None: + try: + while not self._queue.empty(): + signal = self._queue.get_nowait() + signal.accept(self) + self._check_callbacks() + except Empty: + pass + + +def execute_callback( + *, callback: Callable[["MetadataContainer"], None], container: "MetadataContainer", in_async: bool +) -> None: + if in_async: + Thread(target=callback, name="CallbackExecution", args=(container,), daemon=True).start() + else: + callback(container) diff --git a/src/neptune/internal/signals_processing/utils.py b/src/neptune/internal/signals_processing/utils.py new file mode 100644 index 000000000..9d8d18d4c --- /dev/null +++ b/src/neptune/internal/signals_processing/utils.py @@ -0,0 +1,53 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +__all__ = ["signal_batch_processed", "signal_batch_started", "signal_batch_lag"] + +from queue import ( + Full, + Queue, +) +from time import monotonic +from typing import Optional + +from neptune.common.warnings import ( + NeptuneWarning, + warn_once, +) +from neptune.internal.signals_processing.signals import ( + BatchLagSignal, + BatchProcessedSignal, + BatchStartedSignal, + Signal, +) + + +def signal(*, queue: "Queue[Signal]", obj: "Signal") -> None: + try: + queue.put_nowait(item=obj) + except Full: + warn_once("Signal queue is full. Some signals will be lost.", exception=NeptuneWarning) + + +def signal_batch_started(*, queue: "Queue[Signal]", occured_at: Optional[float] = None) -> None: + signal(queue=queue, obj=BatchStartedSignal(occured_at=occured_at or monotonic())) + + +def signal_batch_processed(*, queue: "Queue[Signal]", occured_at: Optional[float] = None) -> None: + signal(queue=queue, obj=BatchProcessedSignal(occured_at=occured_at or monotonic())) + + +def signal_batch_lag(*, queue: "Queue[Signal]", lag: float, occured_at: Optional[float] = None) -> None: + signal(queue=queue, obj=BatchLagSignal(occured_at=occured_at or monotonic(), lag=lag)) diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 988716fd7..b332b1ec3 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -23,11 +23,10 @@ import time import traceback from contextlib import AbstractContextManager -from functools import ( - partial, - wraps, -) +from functools import wraps +from queue import Queue from typing import ( + TYPE_CHECKING, Any, Dict, Iterable, @@ -62,6 +61,7 @@ from neptune.internal.backends.nql import NQLQuery from neptune.internal.backends.project_name_lookup import project_name_lookup from neptune.internal.backgroud_job_list import BackgroundJobList +from neptune.internal.background_job import BackgroundJob from neptune.internal.container_structure import ContainerStructure from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import ( @@ -78,6 +78,7 @@ from neptune.internal.operation import DeleteAttribute from neptune.internal.operation_processors.factory import get_operation_processor from neptune.internal.operation_processors.operation_processor import OperationProcessor +from neptune.internal.signals_processing.background_job import CallbacksMonitor from neptune.internal.state import ContainerState from neptune.internal.utils import ( verify_optional_callable, @@ -96,6 +97,9 @@ from neptune.types.type_casting import cast_value from neptune.utils import stop_synchronization_callback +if TYPE_CHECKING: + from neptune.internal.signals_processing.signals import Signal + def ensure_not_stopped(fun): @wraps(fun) @@ -140,6 +144,7 @@ def __init__( self._forking_cond: threading.Condition = threading.Condition() self._forking_state: bool = False self._state: ContainerState = ContainerState.CREATED + self._signals_queue: "Queue[Signal]" = Queue() self._backend: NeptuneBackend = get_backend(mode=mode, api_token=api_token, proxies=proxies) @@ -173,12 +178,7 @@ def __init__( backend=self._backend, lock=self._lock, flush_period=flush_period, - async_lag_callback=partial(self._async_lag_callback, self) if self._async_lag_callback else None, - async_lag_threshold=self._async_lag_threshold, - async_no_progress_callback=partial(self._async_no_progress_callback, self) - if self._async_no_progress_callback - else None, - async_no_progress_threshold=self._async_no_progress_threshold, + queue=self._signals_queue, ) self._bg_job: BackgroundJobList = self._prepare_background_jobs_if_non_read_only() self._structure: ContainerStructure[Attribute, NamespaceAttr] = ContainerStructure(NamespaceBuilder(self)) @@ -231,6 +231,7 @@ def _handle_fork_in_child(self): reset_internal_ssl_state() if self._state == ContainerState.STARTED: self._op_processor.close() + self._signals_queue = Queue() self._op_processor = get_operation_processor( mode=self._mode, container_id=self._id, @@ -238,16 +239,22 @@ def _handle_fork_in_child(self): backend=self._backend, lock=self._lock, flush_period=self._flush_period, - async_lag_callback=partial(self._async_lag_callback, self) if self._async_lag_callback else None, - async_lag_threshold=self._async_lag_threshold, - async_no_progress_callback=partial(self._async_no_progress_callback, self) - if self._async_no_progress_callback - else None, - async_no_progress_threshold=self._async_no_progress_threshold, + queue=self._signals_queue, ) # TODO: Every implementation of background job should handle fork by itself. - self._bg_job = BackgroundJobList([]) + jobs = [] + if self._mode == Mode.ASYNC: + jobs.append( + CallbacksMonitor( + queue=self._signals_queue, + async_lag_threshold=self._async_lag_threshold, + async_no_progress_threshold=self._async_no_progress_threshold, + async_lag_callback=self._async_lag_callback, + async_no_progress_callback=self._async_no_progress_callback, + ) + ) + self._bg_job = BackgroundJobList(jobs) self._op_processor.start() @@ -265,16 +272,30 @@ def _before_fork(self): self._op_processor.pause() def _prepare_background_jobs_if_non_read_only(self) -> BackgroundJobList: + jobs = [] + if self._mode != Mode.READ_ONLY: - return self._prepare_background_jobs() - return BackgroundJobList([]) + jobs.extend(self._get_background_jobs()) + + if self._mode == Mode.ASYNC: + jobs.append( + CallbacksMonitor( + queue=self._signals_queue, + async_lag_threshold=self._async_lag_threshold, + async_no_progress_threshold=self._async_no_progress_threshold, + async_lag_callback=self._async_lag_callback, + async_no_progress_callback=self._async_no_progress_callback, + ) + ) + + return BackgroundJobList(jobs) @abc.abstractmethod def _get_or_create_api_object(self) -> ApiExperiment: raise NotImplementedError - def _prepare_background_jobs(self) -> BackgroundJobList: - return BackgroundJobList([]) + def _get_background_jobs(self) -> List["BackgroundJob"]: + return [] def _write_initial_attributes(self): pass diff --git a/src/neptune/metadata_containers/model.py b/src/neptune/metadata_containers/model.py index eadde9ca7..44847b533 100644 --- a/src/neptune/metadata_containers/model.py +++ b/src/neptune/metadata_containers/model.py @@ -17,7 +17,9 @@ import os from typing import ( + TYPE_CHECKING, Iterable, + List, Optional, ) @@ -39,7 +41,6 @@ NQLQueryAggregate, NQLQueryAttribute, ) -from neptune.internal.backgroud_job_list import BackgroundJobList from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import QualifiedName from neptune.internal.init.parameters import ( @@ -57,6 +58,9 @@ from neptune.metadata_containers.metadata_containers_table import Table from neptune.types.mode import Mode +if TYPE_CHECKING: + from neptune.internal.background_job import BackgroundJob + class Model(MetadataContainer): """Class for registering a model to neptune.ai and retrieving information from it.""" @@ -232,8 +236,8 @@ def _get_or_create_api_object(self) -> ApiExperiment: called_function="init_model", ) - def _prepare_background_jobs(self) -> BackgroundJobList: - return BackgroundJobList([PingBackgroundJob()]) + def _get_background_jobs(self) -> List["BackgroundJob"]: + return [PingBackgroundJob()] def _write_initial_attributes(self): if self._name is not None: diff --git a/src/neptune/metadata_containers/model_version.py b/src/neptune/metadata_containers/model_version.py index d5ed34ba0..f7e82d515 100644 --- a/src/neptune/metadata_containers/model_version.py +++ b/src/neptune/metadata_containers/model_version.py @@ -16,7 +16,11 @@ __all__ = ["ModelVersion"] import os -from typing import Optional +from typing import ( + TYPE_CHECKING, + List, + Optional, +) from neptune.attributes.constants import ( SYSTEM_NAME_ATTRIBUTE_PATH, @@ -31,7 +35,6 @@ NeptuneOfflineModeChangeStageException, ) from neptune.internal.backends.api_model import ApiExperiment -from neptune.internal.backgroud_job_list import BackgroundJobList from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import QualifiedName from neptune.internal.init.parameters import ( @@ -50,6 +53,9 @@ from neptune.types.mode import Mode from neptune.types.model_version_stage import ModelVersionStage +if TYPE_CHECKING: + from neptune.internal.background_job import BackgroundJob + class ModelVersion(MetadataContainer): """Class for managing a version of a neptune.ai model and retrieving information from it.""" @@ -226,8 +232,8 @@ def _get_or_create_api_object(self) -> ApiExperiment: called_function="init_model_version", ) - def _prepare_background_jobs(self) -> BackgroundJobList: - return BackgroundJobList([PingBackgroundJob()]) + def _get_background_jobs(self) -> List["BackgroundJob"]: + return [PingBackgroundJob()] def _write_initial_attributes(self): if self._name is not None: diff --git a/src/neptune/metadata_containers/run.py b/src/neptune/metadata_containers/run.py index 0ae83b96f..a4097aa56 100644 --- a/src/neptune/metadata_containers/run.py +++ b/src/neptune/metadata_containers/run.py @@ -19,6 +19,7 @@ import threading from platform import node as get_hostname from typing import ( + TYPE_CHECKING, List, Optional, Tuple, @@ -51,7 +52,6 @@ ) from neptune.internal.backends.api_model import ApiExperiment from neptune.internal.backends.neptune_backend import NeptuneBackend -from neptune.internal.backgroud_job_list import BackgroundJobList from neptune.internal.container_type import ContainerType from neptune.internal.hardware.hardware_metric_reporting_job import HardwareMetricReportingJob from neptune.internal.id_formats import QualifiedName @@ -99,6 +99,9 @@ from neptune.types.atoms.git_ref import GitRefDisabled from neptune.types.mode import Mode +if TYPE_CHECKING: + from neptune.internal.background_job import BackgroundJob + class Run(MetadataContainer): """Starts a tracked run that logs ML model-building metadata to neptune.ai.""" @@ -445,7 +448,7 @@ def _get_or_create_api_object(self) -> ApiExperiment: checkpoint_id=checkpoint_id, ) - def _prepare_background_jobs(self) -> BackgroundJobList: + def _get_background_jobs(self) -> List["BackgroundJob"]: background_jobs = [PingBackgroundJob()] websockets_factory = self._backend.websockets_factory(self._project_api_object.id, self._id) @@ -466,7 +469,7 @@ def _prepare_background_jobs(self) -> BackgroundJobList: TracebackJob(path=f"{self._monitoring_namespace}/traceback", fail_on_exception=self._fail_on_exception) ) - return BackgroundJobList(background_jobs) + return background_jobs def _write_initial_attributes(self): if self._name is not None: diff --git a/tests/unit/neptune/new/internal/operation_processors/test_async_operation_processor.py b/tests/unit/neptune/new/internal/operation_processors/test_async_operation_processor.py index eb07d4dd3..53465309a 100644 --- a/tests/unit/neptune/new/internal/operation_processors/test_async_operation_processor.py +++ b/tests/unit/neptune/new/internal/operation_processors/test_async_operation_processor.py @@ -39,7 +39,11 @@ def test_close(metadata_file_mock, _, disk_queue_mock): # and processor = AsyncOperationProcessor( - container_id=container_id, container_type=container_type, backend=MagicMock(), lock=MagicMock() + container_id=container_id, + container_type=container_type, + backend=MagicMock(), + lock=MagicMock(), + queue=MagicMock(), ) # and @@ -70,7 +74,11 @@ def test_cleanup_if_empty(metadata_file_mock, operation_storage_mock, disk_queue # and processor = AsyncOperationProcessor( - container_id=container_id, container_type=container_type, backend=MagicMock(), lock=MagicMock() + container_id=container_id, + container_type=container_type, + backend=MagicMock(), + lock=MagicMock(), + queue=MagicMock(), ) # and @@ -99,7 +107,11 @@ def test_metadata(metadata_file_mock, _, __): # when AsyncOperationProcessor( - container_id=container_id, container_type=container_type, backend=MagicMock(), lock=MagicMock() + container_id=container_id, + container_type=container_type, + backend=MagicMock(), + lock=MagicMock(), + queue=MagicMock(), ) # then diff --git a/tests/unit/neptune/new/internal/signals_processing/__init__.py b/tests/unit/neptune/new/internal/signals_processing/__init__.py new file mode 100644 index 000000000..8d06af532 --- /dev/null +++ b/tests/unit/neptune/new/internal/signals_processing/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/unit/neptune/new/internal/signals_processing/test_signals_processor.py b/tests/unit/neptune/new/internal/signals_processing/test_signals_processor.py new file mode 100644 index 000000000..dedfe899c --- /dev/null +++ b/tests/unit/neptune/new/internal/signals_processing/test_signals_processor.py @@ -0,0 +1,546 @@ +# +# Copyright (c) 2023, Neptune Labs Sp. z o.o. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from queue import Queue + +from mock import ( + MagicMock, + call, + patch, +) + +from neptune.internal.signals_processing.signals_processor import SignalsProcessor +from neptune.internal.signals_processing.utils import ( + signal_batch_lag, + signal_batch_processed, + signal_batch_started, +) + + +def test__no_progress__no_signal(): + # given + container = MagicMock() + async_no_progress_callback = MagicMock() + + # and + queue = Queue() + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=1.0, + async_no_progress_callback=async_no_progress_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_no_progress_callback.assert_not_called() + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__no_progress__proper_execution_of_batch(monotonic): + # given + container = MagicMock() + async_no_progress_callback = MagicMock() + + # and + monotonic.side_effect = [ + 5.0, + ] + + # and + queue = Queue() + # First proper batch + signal_batch_started(queue=queue, occured_at=0.0) + signal_batch_processed(queue=queue, occured_at=3.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=5.0, + async_no_progress_callback=async_no_progress_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_no_progress_callback.assert_not_called() + monotonic.assert_has_calls(calls=(call(),), any_order=True) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__no_progress__too_long_batch_execution(monotonic): + # given + container = MagicMock() + async_no_progress_callback = MagicMock() + + # and + monotonic.side_effect = [11.0, 11.01] + + # and + queue = Queue() + # First too long batch + signal_batch_started(queue=queue, occured_at=1.0) + signal_batch_processed(queue=queue, occured_at=9.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=5.0, + async_no_progress_callback=async_no_progress_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_no_progress_callback.assert_called_once_with(container) + monotonic.assert_has_calls(calls=(call(), call()), any_order=True) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__no_progress__proper_then_too_long(monotonic): + # given + container = MagicMock() + async_no_progress_callback = MagicMock() + + # and + monotonic.side_effect = [16.0, 16.01] + + # and + queue = Queue() + # First proper batch + signal_batch_started(queue=queue, occured_at=0.0) + signal_batch_processed(queue=queue, occured_at=4.0) + # Second too long batch + signal_batch_started(queue=queue, occured_at=5.0) + signal_batch_processed(queue=queue, occured_at=15.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=5.0, + async_no_progress_callback=async_no_progress_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_no_progress_callback.assert_called_once_with(container) + monotonic.assert_has_calls(calls=(call(), call()), any_order=True) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__no_progress__proper_then_non_ended(monotonic): + # given + container = MagicMock() + async_no_progress_callback = MagicMock() + + # and + monotonic.side_effect = [16.0, 16.01] + + # and + queue = Queue() + # First proper batch + signal_batch_started(queue=queue, occured_at=0.0) + signal_batch_processed(queue=queue, occured_at=4.0) + # Second non-ended batch + signal_batch_started(queue=queue, occured_at=5.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=5.0, + async_no_progress_callback=async_no_progress_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_no_progress_callback.assert_called_once_with(container) + monotonic.assert_has_calls(calls=(call(), call()), any_order=True) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__no_progress__too_short_time_between_callbacks(monotonic): + # given + container = MagicMock() + async_no_progress_callback = MagicMock() + + # and + monotonic.side_effect = [14.0, 14.01] + + # and + queue = Queue() + # First failing batch + signal_batch_started(queue=queue, occured_at=0.0) + signal_batch_processed(queue=queue, occured_at=6.0) + # Almost immediate second failing batch + signal_batch_started(queue=queue, occured_at=7.0) + signal_batch_processed(queue=queue, occured_at=13.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=5.0, + async_no_progress_callback=async_no_progress_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_no_progress_callback.assert_called_once_with(container) + monotonic.assert_has_calls(calls=(call(), call()), any_order=True) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__no_progress__ack_in_between(monotonic): + # given + container = MagicMock() + async_no_progress_callback = MagicMock() + + # and + monotonic.side_effect = [17.0, 17.01] + + # and + queue = Queue() + # First failing batch + signal_batch_started(queue=queue, occured_at=0.0) + signal_batch_processed(queue=queue, occured_at=6.0) + # Proper batch + signal_batch_started(queue=queue, occured_at=7.0) + signal_batch_processed(queue=queue, occured_at=9.0) + # Second failing batch + signal_batch_started(queue=queue, occured_at=10.0) + signal_batch_processed(queue=queue, occured_at=16.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=5.0, + async_no_progress_callback=async_no_progress_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + monotonic.assert_has_calls(calls=(call(), call()), any_order=True) + async_no_progress_callback.assert_called_once_with(container) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__no_progress__proper_then_too_long_different_cycles(monotonic): + # given + container = MagicMock() + async_no_progress_callback = MagicMock() + + # and + monotonic.side_effect = [5.0, 5.01, 16.0, 16.01] + + # and + queue = Queue() + # First proper batch + signal_batch_started(queue=queue, occured_at=0.0) + signal_batch_processed(queue=queue, occured_at=4.0) + # Second too long batch + signal_batch_started(queue=queue, occured_at=5.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=5.0, + async_no_progress_callback=async_no_progress_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_no_progress_callback.assert_not_called() + + # given + signal_batch_processed(queue=queue, occured_at=15.0) + + # when + processor.work() + + # then + async_no_progress_callback.assert_called_once_with(container) + + # and + monotonic.assert_has_calls(calls=(call(), call(), call())) + async_no_progress_callback.assert_has_calls(calls=(call(container),), any_order=True) + + +def test__lag__no_signal(): + # given + container = MagicMock() + async_lag_callback = MagicMock() + + # and + queue = Queue() + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=1.0, + async_lag_callback=async_lag_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_lag_callback.assert_not_called() + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__lag__proper_execution_of_batch(monotonic): + # given + container = MagicMock() + async_lag_callback = MagicMock() + + # and + monotonic.side_effect = [ + 5.0, + ] + + # and + queue = Queue() + signal_batch_lag(queue=queue, lag=0.1, occured_at=1.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=1.0, + async_lag_callback=async_lag_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_lag_callback.assert_not_called() + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__lag__too_big_lag(monotonic): + # given + container = MagicMock() + async_lag_callback = MagicMock() + + # and + monotonic.side_effect = [ + 7.0, + 7.01, + ] + + # and + queue = Queue() + signal_batch_lag(queue=queue, lag=5.0, occured_at=6.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=1.0, + async_lag_callback=async_lag_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_lag_callback.assert_called_once_with(container) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__lag__too_short_interval(monotonic): + # given + container = MagicMock() + async_lag_callback = MagicMock() + + # and + monotonic.side_effect = [7.0, 7.01, 11.0, 11.01] + + # and + queue = Queue() + signal_batch_lag(queue=queue, lag=5.0, occured_at=6.0) + signal_batch_lag(queue=queue, lag=3.0, occured_at=10.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=1.0, + async_lag_callback=async_lag_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_lag_callback.assert_called_once_with(container) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__lag__longer_interval(monotonic): + # given + container = MagicMock() + async_lag_callback = MagicMock() + + # and + monotonic.side_effect = [7.0, 20.0, 20.01] + + # and + queue = Queue() + signal_batch_lag(queue=queue, lag=5.0, occured_at=6.0) + signal_batch_lag(queue=queue, lag=3.0, occured_at=19.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=1.0, + async_lag_callback=async_lag_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_lag_callback.assert_has_calls( + calls=( + call(container), + call(container), + ), + any_order=True, + ) + + +@patch("neptune.internal.signals_processing.signals_processor.monotonic") +def test__lag__longer_interval_different_cycles(monotonic): + # given + container = MagicMock() + async_lag_callback = MagicMock() + + # and + monotonic.side_effect = [7.0, 7.01, 20.0, 20.01] + + # and + queue = Queue() + signal_batch_lag(queue=queue, lag=5.0, occured_at=6.0) + + # and + processor = SignalsProcessor( + period=10, + container=container, + queue=queue, + async_lag_threshold=1.0, + async_no_progress_threshold=1.0, + async_lag_callback=async_lag_callback, + callbacks_interval=5, + in_async=False, + ) + + # when + processor.work() + + # then + async_lag_callback.assert_called_with(container) + + # given + signal_batch_lag(queue=queue, lag=3.0, occured_at=19.0) + + # when + processor.work() + + # then + async_lag_callback.assert_has_calls( + calls=( + call(container), + call(container), + ), + any_order=True, + )