diff --git a/neptune/new/envs.py b/neptune/new/envs.py index 070c9716a..5846426de 100644 --- a/neptune/new/envs.py +++ b/neptune/new/envs.py @@ -26,3 +26,7 @@ NEPTUNE_NOTEBOOK_ID = 'NEPTUNE_NOTEBOOK_ID' NEPTUNE_NOTEBOOK_PATH = 'NEPTUNE_NOTEBOOK_PATH' + +NEPTUNE_RETRIES_TIMEOUT_ENV = 'NEPTUNE_RETRIES_TIMEOUT' + +NEPTUNE_SYNC_BATCH_TIMEOUT_ENV = 'NEPTUNE_SYNC_BATCH_TIMEOUT' diff --git a/neptune/new/internal/backends/utils.py b/neptune/new/internal/backends/utils.py index 0f96ff51d..7975e79a5 100644 --- a/neptune/new/internal/backends/utils.py +++ b/neptune/new/internal/backends/utils.py @@ -13,8 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import itertools import logging +import os import socket import sys import time @@ -34,6 +35,7 @@ from packaging.version import Version from requests import Session +from neptune.new.envs import NEPTUNE_RETRIES_TIMEOUT_ENV from neptune.new.exceptions import SSLError, NeptuneConnectionLostException, \ Unauthorized, Forbidden, CannotResolveHostname, UnsupportedClientVersion, ClientHttpError from neptune.new.internal.backends.api_model import ClientConfig @@ -42,11 +44,19 @@ _logger = logging.getLogger(__name__) +MAX_RETRY_TIME = 30 +retries_timeout = int(os.getenv(NEPTUNE_RETRIES_TIMEOUT_ENV, "60")) + + def with_api_exceptions_handler(func): def wrapper(*args, **kwargs): last_exception = None - for retry in range(0, 3): + start_time = time.monotonic() + for retry in itertools.count(0): + if time.monotonic() - start_time > retries_timeout: + break + try: return func(*args, **kwargs) except requests.exceptions.SSLError as e: @@ -55,7 +65,7 @@ def wrapper(*args, **kwargs): requests.exceptions.ConnectionError, requests.exceptions.Timeout, HTTPRequestTimeout, HTTPServiceUnavailable, HTTPGatewayTimeout, HTTPBadGateway, HTTPTooManyRequests, HTTPServerError) as e: - time.sleep(2 ** retry) + time.sleep(min(2 ** min(10, retry), MAX_RETRY_TIME)) last_exception = e continue except HTTPUnauthorized: @@ -75,7 +85,7 @@ def wrapper(*args, **kwargs): HTTPGatewayTimeout.status_code, HTTPTooManyRequests.status_code, HTTPServerError.status_code): - time.sleep(2 ** retry) + time.sleep(min(2 ** min(10, retry), MAX_RETRY_TIME)) last_exception = e continue elif status_code == HTTPUnauthorized.status_code: diff --git a/neptune/new/sync.py b/neptune/new/sync.py index cb0be42b4..47911ece8 100644 --- a/neptune/new/sync.py +++ b/neptune/new/sync.py @@ -18,6 +18,7 @@ import os import sys import textwrap +import time import uuid from pathlib import Path from typing import Sequence, Iterable, List, Optional, Tuple, Any @@ -30,10 +31,10 @@ OFFLINE_DIRECTORY, OFFLINE_NAME_PREFIX, ) -from neptune.new.envs import PROJECT_ENV_NAME +from neptune.new.envs import NEPTUNE_SYNC_BATCH_TIMEOUT_ENV, PROJECT_ENV_NAME from neptune.new.exceptions import ( CannotSynchronizeOfflineRunsWithoutProject, - NeptuneException, + NeptuneConnectionLostException, NeptuneException, ProjectNotFound, RunNotFound, ) @@ -52,6 +53,8 @@ # Set in CLI entry points block, patched in tests backend: NeptuneBackend = None +retries_timeout = int(os.getenv(NEPTUNE_SYNC_BATCH_TIMEOUT_ENV, "3600")) + def get_run(run_id: str) -> Optional[ApiRun]: try: @@ -216,7 +219,19 @@ def sync_execution(execution_path: Path, run_uuid: uuid.UUID) -> None: batch, version = disk_queue.get_batch(1000) if not batch: break - backend.execute_operations(run_uuid, batch) + + start_time = time.monotonic() + while True: + try: + backend.execute_operations(run_uuid, batch) + break + except NeptuneConnectionLostException as ex: + if time.monotonic() - start_time > retries_timeout: + raise ex + click.echo(f"Experiencing connection interruptions. " + f"Will try to reestablish communication with Neptune.", + sys.stderr) + disk_queue.ack(version)