Skip to content

Commit

Permalink
Partitioned Operations Processor (#1524)
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Nieżurawski <[email protected]>
Co-authored-by: Artsiom Tserashkovich <[email protected]>
  • Loading branch information
3 people authored Oct 20, 2023
1 parent 11b8213 commit bbee392
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Run async callback in new daemon thread ([#1521](https://github.com/neptune-ai/neptune-client/pull/1521))
- Better handle bool values of `git_ref` param in `init_run` ([#1525](https://github.com/neptune-ai/neptune-client/pull/1525))
- Updated management docstrings ([#1500](https://github.com/neptune-ai/neptune-client/pull/1500))
- Fix error massage in case of NeptuneAuthTokenExpired ([#1531](https://github.com/neptune-ai/neptune-client/pull/1531))
- Updated NeptuneModelKeyAlreadyExistsError exception message ([#1536](https://github.com/neptune-ai/neptune-client/pull/1536))
- Sample logging for series errors ([#1539](https://github.com/neptune-ai/neptune-client/pull/1539))

Expand All @@ -19,6 +20,7 @@
- Added metadata file that stores information about internal directory structure and platform ([#1526](https://github.com/neptune-ai/neptune-client/pull/1526))
- Minor tweaks to `neptune.cli` and cleaning leftovers after async Experiments ([#1529](https://github.com/neptune-ai/neptune-client/pull/1529))
- Pin `simplejson` required version to below `3.19` ([#1535](https://github.com/neptune-ai/neptune-client/pull/1535))
- Added `experimental` mode that supports partitioned operations queue ([#1524](https://github.com/neptune-ai/neptune-client/pull/1524))


## neptune 1.8.2
Expand Down
2 changes: 1 addition & 1 deletion src/neptune/attributes/atoms/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def assign(self, value: FileVal, *, wait: bool = False) -> None:
operation = UploadFile.of_file(
value=value,
attribute_path=self._path,
operation_storage=self._container._op_processor._operation_storage,
operation_storage=self._container._op_processor._get_operation_processor(self._path)._operation_storage,
)

with self._container.lock():
Expand Down
20 changes: 20 additions & 0 deletions src/neptune/cli/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@ def sync_execution(
execution_path: Path,
container_id: UniqueId,
container_type: ContainerType,
) -> None:
if list(execution_path.glob("partition-*")):
for partition_path in execution_path.iterdir():
self.sync_single_execution(
execution_path=partition_path,
container_id=container_id,
container_type=container_type,
)
else:
self.sync_single_execution(
execution_path=execution_path,
container_id=container_id,
container_type=container_type,
)

def sync_single_execution(
self,
execution_path: Path,
container_id: UniqueId,
container_type: ContainerType,
) -> None:
operation_storage = OperationStorage(execution_path)
serializer: Callable[[Operation], Dict[str, Any]] = lambda op: op.to_dict()
Expand Down
12 changes: 12 additions & 0 deletions src/neptune/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ def is_container_synced_and_remove_junk(experiment_path: Path) -> bool:


def _is_execution_synced_and_remove_junk(execution_path: Path) -> bool:
# TODO: Refactor it
if list(execution_path.glob("partition-*")):
is_queue_empty = all(
_is_single_execution_synced_and_remove_junk(partition_path) for partition_path in execution_path.iterdir()
)
else:
return _is_single_execution_synced_and_remove_junk(execution_path)

return is_queue_empty


def _is_single_execution_synced_and_remove_junk(execution_path: Path) -> bool:
"""
The DiskQueue.close() method removes junk metadata from the disk when the queue is empty.
"""
Expand Down
3 changes: 2 additions & 1 deletion src/neptune/common/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def wrapper(*args, **kwargs):
time.sleep(wait_time)
last_exception = e
continue
except NeptuneAuthTokenExpired:
except NeptuneAuthTokenExpired as e:
last_exception = e
continue
except HTTPUnauthorized:
raise Unauthorized()
Expand Down
3 changes: 3 additions & 0 deletions src/neptune/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"NEPTUNE_RETRIES_TIMEOUT_ENV",
"NEPTUNE_SYNC_BATCH_TIMEOUT_ENV",
"NEPTUNE_ASYNC_BATCH_SIZE",
"NEPTUNE_ASYNC_PARTITIONS_NUMBER",
"NEPTUNE_SUBPROCESS_KILL_TIMEOUT",
"NEPTUNE_FETCH_TABLE_STEP_SIZE",
"NEPTUNE_SYNC_AFTER_STOP_TIMEOUT",
Expand Down Expand Up @@ -69,6 +70,8 @@

NEPTUNE_ASYNC_BATCH_SIZE = "NEPTUNE_ASYNC_BATCH_SIZE"

NEPTUNE_ASYNC_PARTITIONS_NUMBER = "NEPTUNE_ASYNC_PARTITIONS_NUMBER"

NEPTUNE_REQUEST_TIMEOUT = "NEPTUNE_REQUEST_TIMEOUT"

NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK = "NEPTUNE_ENABLE_DEFAULT_ASYNC_LAG_CALLBACK"
Expand Down
16 changes: 15 additions & 1 deletion src/neptune/internal/backends/hosted_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,17 @@
import requests
from bravado.http_client import HttpClient
from bravado.requests_client import RequestsClient
from requests.adapters import (
DEFAULT_POOLSIZE,
HTTPAdapter,
)

from neptune.common.backends.utils import with_api_exceptions_handler
from neptune.common.oauth import NeptuneAuthenticator
from neptune.envs import NEPTUNE_REQUEST_TIMEOUT
from neptune.envs import (
NEPTUNE_ASYNC_PARTITIONS_NUMBER,
NEPTUNE_REQUEST_TIMEOUT,
)
from neptune.exceptions import NeptuneClientUpgradeRequiredError
from neptune.internal.backends.api_model import ClientConfig
from neptune.internal.backends.swagger_client_wrapper import SwaggerClientWrapper
Expand All @@ -56,6 +63,7 @@

CONNECT_TIMEOUT = 30 # helps detecting internet connection lost
REQUEST_TIMEOUT = int(os.getenv(NEPTUNE_REQUEST_TIMEOUT, "600"))
MAX_POOL_SIZE = 128

DEFAULT_REQUEST_KWARGS = {
"_request_options": {
Expand All @@ -77,6 +85,12 @@ def create_http_client(ssl_verify: bool, proxies: Dict[str, str]) -> RequestsCli
http_client = RequestsClient(ssl_verify=ssl_verify, response_adapter_class=NeptuneResponseAdapter)
http_client.session.verify = ssl_verify

if os.getenv(NEPTUNE_ASYNC_PARTITIONS_NUMBER):
pool_size = int(os.getenv(NEPTUNE_ASYNC_PARTITIONS_NUMBER, DEFAULT_POOLSIZE))
adapter = HTTPAdapter(pool_connections=pool_size, pool_maxsize=4 * pool_size)
http_client.session.mount("https://", adapter)
http_client.session.mount("http://", adapter)

_close_connections_on_fork(http_client.session)

update_session_proxies(http_client.session, proxies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from neptune.internal.threading.daemon import Daemon
from neptune.internal.utils.disk_full import ensure_disk_not_full
from neptune.internal.utils.logger import logger
from neptune.internal.utils.monotonic_inc_batch_size import MonotonicIncBatchSize

if TYPE_CHECKING:
from neptune.internal.backends.neptune_backend import NeptuneBackend
Expand All @@ -76,13 +77,16 @@ def __init__(
async_lag_threshold: float = ASYNC_LAG_THRESHOLD,
async_no_progress_callback: Optional[Callable[[], None]] = None,
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
data_path: Optional[Path] = None,
should_print_logs: bool = True,
):
data_path = self._init_data_path(container_id, container_type)
self._should_print_logs: bool = should_print_logs
self._data_path = data_path if data_path else self._init_data_path(container_id, container_type)
self._metadata_file = MetadataFile(
data_path=data_path,
data_path=self._data_path,
metadata=common_metadata(mode="async", container_id=container_id, container_type=container_type),
)
self._operation_storage = OperationStorage(data_path=data_path)
self._operation_storage = OperationStorage(data_path=self._data_path)

serializer: Callable[[Operation], Dict[str, Any]] = lambda op: op.to_dict()
self._queue = DiskQueue(
Expand All @@ -95,14 +99,14 @@ def __init__(
self._container_id: "UniqueId" = container_id
self._container_type: "ContainerType" = container_type
self._backend: "NeptuneBackend" = backend
self._batch_size: int = batch_size
self._batch_size: MonotonicIncBatchSize = MonotonicIncBatchSize(size_limit=batch_size)
self._async_lag_callback: Callable[[], None] = async_lag_callback or (lambda: None)
self._async_lag_threshold: float = async_lag_threshold
self._async_no_progress_callback: Callable[[], None] = async_no_progress_callback or (lambda: None)
self._async_no_progress_threshold: float = async_no_progress_threshold
self._last_version: int = 0
self._consumed_version: int = 0
self._consumer: Daemon = self.ConsumerThread(self, sleep_time, batch_size)
self._consumer: Daemon = self.ConsumerThread(self, sleep_time, self._batch_size)
self._lock: threading.RLock = lock
self._last_ack: Optional[float] = None
self._lag_exceeded: bool = False
Expand All @@ -114,8 +118,8 @@ def __init__(
@staticmethod
def _init_data_path(container_id: "UniqueId", container_type: "ContainerType") -> Path:
now = datetime.now()
process_path = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, process_path)
path_suffix = f"exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
return get_container_dir(ASYNC_DIRECTORY, container_id, container_type, path_suffix)

@ensure_disk_not_full
def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
Expand All @@ -124,7 +128,7 @@ def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
self._check_lag()
self._check_no_progress()

if self._queue.size() > self._batch_size / 2:
if self._queue.size() > self._batch_size.get() / 2:
self._consumer.wake_up()
if wait:
self.wait()
Expand Down Expand Up @@ -213,7 +217,8 @@ def _wait_for_queue_empty(self, initial_queue_size: int, seconds: Optional[float
already_synced = initial_queue_size - size_remaining
already_synced_proc = (already_synced / initial_queue_size) * 100 if initial_queue_size else 100
if size_remaining == 0:
logger.info("All %s operations synced, thanks for waiting!", initial_queue_size)
if self._should_print_logs:
logger.info("All %s operations synced, thanks for waiting!", initial_queue_size)
return

time_elapsed = monotonic() - waiting_start
Expand Down Expand Up @@ -277,12 +282,12 @@ def __init__(
self,
processor: "AsyncOperationProcessor",
sleep_time: float,
batch_size: int,
batch_size: MonotonicIncBatchSize,
):
super().__init__(sleep_time=sleep_time, name="NeptuneAsyncOpProcessor")
self._processor: "AsyncOperationProcessor" = processor
self._errors_processor: OperationsErrorsProcessor = OperationsErrorsProcessor()
self._batch_size: int = batch_size
self._batch_size: MonotonicIncBatchSize = batch_size
self._last_flush: float = 0.0
self._no_progress_exceeded: bool = False

Expand All @@ -301,10 +306,11 @@ def work(self) -> None:
self._processor._queue.flush()

while True:
batch = self._processor._queue.get_batch(self._batch_size)
batch = self._processor._queue.get_batch(self._batch_size.get())
if not batch:
return
self.process_batch([element.obj for element in batch], batch[-1].ver)
self._batch_size.increase()

def _check_no_progress(self) -> None:
if not self._no_progress_exceeded:
Expand Down
27 changes: 25 additions & 2 deletions src/neptune/internal/operation_processors/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
Optional,
)

from neptune.envs import NEPTUNE_ASYNC_BATCH_SIZE
from neptune.envs import (
NEPTUNE_ASYNC_BATCH_SIZE,
NEPTUNE_ASYNC_PARTITIONS_NUMBER,
)
from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.id_formats import UniqueId
Expand All @@ -36,6 +39,7 @@
from .async_operation_processor import AsyncOperationProcessor
from .offline_operation_processor import OfflineOperationProcessor
from .operation_processor import OperationProcessor
from .partitioned_operation_processor import PartitionedOperationProcessor
from .read_only_operation_processor import ReadOnlyOperationProcessor
from .sync_operation_processor import SyncOperationProcessor

Expand All @@ -53,13 +57,32 @@ def get_operation_processor(
async_no_progress_threshold: float = ASYNC_NO_PROGRESS_THRESHOLD,
) -> OperationProcessor:
if mode == Mode.ASYNC:
batch_size = int(os.environ.get(NEPTUNE_ASYNC_BATCH_SIZE) or "1000")

if os.getenv(NEPTUNE_ASYNC_PARTITIONS_NUMBER):
partitions = int(os.environ.get(NEPTUNE_ASYNC_PARTITIONS_NUMBER) or "5")
if partitions > 1:
return PartitionedOperationProcessor(
container_id=container_id,
container_type=container_type,
backend=backend,
lock=lock,
sleep_time=flush_period,
batch_size=batch_size,
async_lag_callback=async_lag_callback,
async_lag_threshold=async_lag_threshold,
async_no_progress_callback=async_no_progress_callback,
async_no_progress_threshold=async_no_progress_threshold,
partitions=partitions,
)

return AsyncOperationProcessor(
container_id=container_id,
container_type=container_type,
backend=backend,
lock=lock,
sleep_time=flush_period,
batch_size=int(os.environ.get(NEPTUNE_ASYNC_BATCH_SIZE) or "1000"),
batch_size=batch_size,
async_lag_callback=async_lag_callback,
async_lag_threshold=async_lag_threshold,
async_no_progress_callback=async_no_progress_callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import abc
from typing import (
TYPE_CHECKING,
List,
Optional,
)

Expand Down Expand Up @@ -50,3 +51,7 @@ def stop(self, seconds: Optional[float] = None) -> None:

def close(self) -> None:
pass

# TODO: remove this method
def _get_operation_processor(self, path: List[str]) -> "OperationProcessor":
return self
Loading

0 comments on commit bbee392

Please sign in to comment.