Skip to content

Commit

Permalink
No synchronization callbacks reworked (#1567)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raalsky authored Nov 21, 2023
1 parent 4dda93b commit c189f42
Show file tree
Hide file tree
Showing 16 changed files with 1,031 additions and 121 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## [UNRELEASED] neptune 1.8.5

### Fixes
- Fixed no synchronization callbacks behaviour ([#1567](https://github.com/neptune-ai/neptune-client/pull/1567))

### Changes
- Enabled hooks for internal downloading functions used by the hosted backend ([#1571](https://github.com/neptune-ai/neptune-client/pull/1571))
- Added timestamp of operation put to disk queue ([#1569](https://github.com/neptune-ai/neptune-client/pull/1569))
Expand Down
2 changes: 2 additions & 0 deletions src/neptune/internal/init/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"ASYNC_LAG_THRESHOLD",
"ASYNC_NO_PROGRESS_THRESHOLD",
"DEFAULT_STOP_TIMEOUT",
"IN_BETWEEN_CALLBACKS_MINIMUM_INTERVAL",
]

DEFAULT_FLUSH_PERIOD = 5
Expand All @@ -28,3 +29,4 @@
ASYNC_LAG_THRESHOLD = 1800.0
ASYNC_NO_PROGRESS_THRESHOLD = 300.0
DEFAULT_STOP_TIMEOUT = 60.0
IN_BETWEEN_CALLBACKS_MINIMUM_INTERVAL = 300.0
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import threading
from datetime import datetime
from pathlib import Path
from queue import Queue
from time import (
monotonic,
time,
Expand All @@ -33,15 +34,15 @@
)

from neptune.common.exceptions import NeptuneException
from neptune.common.warnings import (
NeptuneWarning,
warn_once,
)
from neptune.constants import ASYNC_DIRECTORY
from neptune.envs import NEPTUNE_SYNC_AFTER_STOP_TIMEOUT
from neptune.exceptions import NeptuneSynchronizationAlreadyStoppedException
from neptune.internal.disk_queue import DiskQueue
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
DEFAULT_STOP_TIMEOUT,
)
from neptune.internal.init.parameters import DEFAULT_STOP_TIMEOUT
from neptune.internal.metadata_file import MetadataFile
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.operation_processor import OperationProcessor
Expand All @@ -50,6 +51,11 @@
get_container_dir,
)
from neptune.internal.operation_processors.utils import common_metadata
from neptune.internal.signals_processing.utils import (
signal_batch_lag,
signal_batch_processed,
signal_batch_started,
)
from neptune.internal.threading.daemon import Daemon
from neptune.internal.utils.disk_utilization import ensure_disk_not_overutilize
from neptune.internal.utils.logger import logger
Expand All @@ -58,6 +64,7 @@
from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.signals_processing.signals import Signal


class AsyncOperationProcessor(OperationProcessor):
Expand All @@ -70,12 +77,9 @@ def __init__(
container_type: "ContainerType",
backend: "NeptuneBackend",
lock: threading.RLock,
queue: "Queue[Signal]",
sleep_time: float = 5,
batch_size: int = 1000,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
data_path: Optional[Path] = None,
should_print_logs: bool = True,
):
Expand All @@ -99,17 +103,12 @@ def __init__(
self._container_type: "ContainerType" = container_type
self._backend: "NeptuneBackend" = backend
self._batch_size: int = batch_size
self._async_lag_callback: Callable[[], None] = async_lag_callback or (lambda: None)
self._async_lag_threshold: float = async_lag_threshold
self._async_no_progress_callback: Callable[[], None] = async_no_progress_callback or (lambda: None)
self._async_no_progress_threshold: float = async_no_progress_threshold
self._last_version: int = 0
self._consumed_version: int = 0
self._consumer: Daemon = self.ConsumerThread(self, sleep_time, batch_size)
self._lock: threading.RLock = lock
self._last_ack: Optional[float] = None
self._lag_exceeded: bool = False
self._should_call_no_progress_callback: bool = False
self._signals_queue: "Queue[Signal]" = queue
self._accepts_operations: bool = True

# Caller is responsible for taking this lock
self._waiting_cond = threading.Condition(lock=lock)
Expand All @@ -122,10 +121,11 @@ def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") -

@ensure_disk_not_overutilize
def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._last_version = self._queue.put(op)
if not self._accepts_operations:
warn_once("Not accepting operations", exception=NeptuneWarning)
return

self._check_lag()
self._check_no_progress()
self._last_version = self._queue.put(op)

if self._check_queue_size():
self._consumer.wake_up()
Expand Down Expand Up @@ -158,24 +158,6 @@ def wait(self) -> None:
if not self._consumer.is_running():
raise NeptuneSynchronizationAlreadyStoppedException()

def _check_lag(self) -> None:
if self._lag_exceeded or not self._last_ack or monotonic() - self._last_ack <= self._async_lag_threshold:
return

with self._lock:
if not self._lag_exceeded:
threading.Thread(target=self._async_lag_callback, daemon=True).start()
self._lag_exceeded = True

def _check_no_progress(self) -> None:
if not self._should_call_no_progress_callback:
return

with self._lock:
if self._should_call_no_progress_callback:
threading.Thread(target=self._async_no_progress_callback, daemon=True).start()
self._should_call_no_progress_callback = False

def _check_queue_size(self) -> bool:
return self._queue.size() > self._batch_size / 2

Expand Down Expand Up @@ -276,6 +258,7 @@ def stop(self, seconds: Optional[float] = None) -> None:
self._queue.cleanup_if_empty()

def close(self) -> None:
self._accepts_operations = False
self._queue.close()
self._metadata_file.close()

Expand All @@ -290,7 +273,6 @@ def __init__(
self._processor: "AsyncOperationProcessor" = processor
self._batch_size: int = batch_size
self._last_flush: float = 0.0
self._no_progress_exceeded: bool = False

def run(self) -> None:
try:
Expand All @@ -310,17 +292,11 @@ def work(self) -> None:
batch = self._processor._queue.get_batch(self._batch_size)
if not batch:
return
self.process_batch([element.obj for element in batch], batch[-1].ver)

def _check_no_progress(self) -> None:
if not self._no_progress_exceeded:
if (
self._processor._last_ack
and monotonic() - self._processor._last_ack > self._processor._async_no_progress_threshold
):
self._no_progress_exceeded = True
self._processor._should_call_no_progress_callback = True
signal_batch_started(queue=self._processor._signals_queue)
self.process_batch([element.obj for element in batch], batch[-1].ver, batch[-1].at)

# WARNING: Be careful when changing this function. It is used in the experimental package
def _handle_errors(self, errors: List[NeptuneException]) -> None:
for error in errors:
logger.error(
Expand All @@ -334,32 +310,27 @@ def _handle_errors(self, errors: List[NeptuneException]) -> None:
" synced manually using `neptune sync` command."
)
)
def process_batch(self, batch: List[Operation], version: int) -> None:
def process_batch(self, batch: List[Operation], version: int, occurred_at: Optional[float] = None) -> None:
if occurred_at is not None:
signal_batch_lag(queue=self._processor._signals_queue, lag=time() - occurred_at)

expected_count = len(batch)
version_to_ack = version - expected_count
while True:
# TODO: Handle Metadata errors
try:
processed_count, errors = self._processor._backend.execute_operations(
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)
except Exception as e:
self._check_no_progress()
# Let default retry logic handle this
raise e from e

self._no_progress_exceeded = False
processed_count, errors = self._processor._backend.execute_operations(
container_id=self._processor._container_id,
container_type=self._processor._container_type,
operations=batch,
operation_storage=self._processor._operation_storage,
)

signal_batch_processed(queue=self._processor._signals_queue)
version_to_ack += processed_count
batch = batch[processed_count:]

with self._processor._waiting_cond:
self._processor._queue.ack(version_to_ack)
self._processor._last_ack = monotonic()
self._processor._lag_exceeded = False

self._handle_errors(errors)

Expand Down
33 changes: 9 additions & 24 deletions src/neptune/internal/operation_processors/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,13 @@

import os
import threading
from typing import (
Callable,
Optional,
)
from queue import Queue
from typing import TYPE_CHECKING

from neptune.envs import NEPTUNE_ASYNC_BATCH_SIZE
from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
from neptune.internal.init.parameters import (
ASYNC_LAG_THRESHOLD,
ASYNC_NO_PROGRESS_THRESHOLD,
)
from neptune.types.mode import Mode

from .async_operation_processor import AsyncOperationProcessor
Expand All @@ -39,6 +33,9 @@
from .read_only_operation_processor import ReadOnlyOperationProcessor
from .sync_operation_processor import SyncOperationProcessor

if TYPE_CHECKING:
from neptune.internal.signals_processing.signals import Signal


# WARNING: Be careful when changing this function. It is used in the experimental package
def build_async_operation_processor(
Expand All @@ -47,10 +44,7 @@ def build_async_operation_processor(
backend: NeptuneBackend,
lock: threading.RLock,
sleep_time: float,
async_lag_callback: Optional[Callable[[], None]],
async_lag_threshold: float,
async_no_progress_callback: Optional[Callable[[], None]],
async_no_progress_threshold: float,
queue: "Queue[Signal]",
) -> OperationProcessor:
return AsyncOperationProcessor(
container_id=container_id,
Expand All @@ -59,10 +53,7 @@ def build_async_operation_processor(
lock=lock,
sleep_time=sleep_time,
batch_size=int(os.environ.get(NEPTUNE_ASYNC_BATCH_SIZE) or "1000"),
async_lag_callback=async_lag_callback,
async_lag_threshold=async_lag_threshold,
async_no_progress_callback=async_no_progress_callback,
async_no_progress_threshold=async_no_progress_threshold,
queue=queue,
)


Expand All @@ -73,10 +64,7 @@ def get_operation_processor(
backend: NeptuneBackend,
lock: threading.RLock,
flush_period: float,
async_lag_callback: Optional[Callable[[], None]] = None,
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
queue: "Queue[Signal]",
) -> OperationProcessor:
if mode == Mode.ASYNC:
return build_async_operation_processor(
Expand All @@ -85,10 +73,7 @@ def get_operation_processor(
backend=backend,
lock=lock,
sleep_time=flush_period,
async_lag_callback=async_lag_callback,
async_lag_threshold=async_lag_threshold,
async_no_progress_callback=async_no_progress_callback,
async_no_progress_threshold=async_no_progress_threshold,
queue=queue,
)
elif mode == Mode.SYNC:
return SyncOperationProcessor(container_id, container_type, backend)
Expand Down
15 changes: 15 additions & 0 deletions src/neptune/internal/signals_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2023, Neptune Labs Sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
Loading

0 comments on commit c189f42

Please sign in to comment.