Skip to content

Commit

Permalink
Merge pull request #392 from neptune-ai/jk/handle-quota
Browse files Browse the repository at this point in the history
Jk/handle quota
  • Loading branch information
shnela authored Feb 8, 2021
2 parents 80b7b24 + 75b8723 commit 8ecb6bc
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 109 deletions.
126 changes: 51 additions & 75 deletions neptune/internal/backends/hosted_neptune_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import sys
import uuid
from functools import partial
from http.client import NOT_FOUND, UNPROCESSABLE_ENTITY # pylint:disable=no-name-in-module
from http.client import NOT_FOUND # pylint:disable=no-name-in-module
from io import StringIO
from itertools import groupby

Expand All @@ -39,21 +39,35 @@
from bravado.requests_client import RequestsClient
from bravado_core.formatter import SwaggerFormat
from packaging import version
from requests.exceptions import HTTPError
from six.moves import urllib

from neptune.api_exceptions import ExperimentAlreadyFinished, ExperimentLimitReached, \
ExperimentNotFound, ExperimentValidationError, NamespaceNotFound, ProjectNotFound, StorageLimitReached, \
ChannelAlreadyExists, ChannelsValuesSendBatchError, NotebookNotFound, \
PathInProjectNotFound, ChannelNotFound
from neptune.api_exceptions import (
ChannelAlreadyExists,
ChannelNotFound,
ChannelsValuesSendBatchError,
ExperimentAlreadyFinished,
ExperimentLimitReached,
ExperimentNotFound,
ExperimentValidationError,
NamespaceNotFound,
NotebookNotFound,
PathInProjectNotFound,
ProjectNotFound,
)
from neptune.backend import Backend
from neptune.checkpoint import Checkpoint
from neptune.internal.backends.client_config import ClientConfig
from neptune.exceptions import FileNotFound, DeprecatedApiToken, CannotResolveHostname, UnsupportedClientVersion, \
AlphaProjectException, STYLES
from neptune.exceptions import (
AlphaProjectException,
CannotResolveHostname,
DeprecatedApiToken,
FileNotFound,
STYLES,
UnsupportedClientVersion,
)
from neptune.experiments import Experiment
from neptune.internal.backends.credentials import Credentials
from neptune.internal.utils.http import extract_response_field
from neptune.internal.utils.http_utils import extract_response_field, handle_quota_limits
from neptune.model import ChannelWithLastValue, LeaderboardEntry
from neptune.notebook import Notebook
from neptune.oauth import NeptuneAuthenticator
Expand Down Expand Up @@ -437,41 +451,22 @@ def update_tags(self, experiment, tags_to_add, tags_to_delete):
else:
raise

@handle_quota_limits
def upload_experiment_source(self, experiment, data, progress_indicator):
try:
# Api exception handling is done in _upload_loop
self._upload_loop(partial(self._upload_raw_data,
api_method=self.backend_swagger_client.api.uploadExperimentSource),
data=data,
progress_indicator=progress_indicator,
path_params={'experimentId': experiment.internal_id},
query_params={})
except HTTPError as e:
if e.response.status_code == NOT_FOUND:
# pylint: disable=protected-access
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project.full_id)
if e.response.status_code == UNPROCESSABLE_ENTITY and (
extract_response_field(e.response, 'type') == 'LIMIT_OF_STORAGE_IN_PROJECT_REACHED'):
raise StorageLimitReached()
raise

self._upload_loop(partial(self._upload_raw_data,
api_method=self.backend_swagger_client.api.uploadExperimentSource),
data=data,
progress_indicator=progress_indicator,
path_params={'experimentId': experiment.internal_id},
query_params={})

@handle_quota_limits
def extract_experiment_source(self, experiment, data):
try:
return self._upload_tar_data(
experiment=experiment,
api_method=self.backend_swagger_client.api.uploadExperimentSourceAsTarstream,
data=data
)
except HTTPError as e:
if e.response.status_code == NOT_FOUND:
# pylint: disable=protected-access
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project.full_id)
if e.response.status_code == UNPROCESSABLE_ENTITY and (
extract_response_field(e.response, 'type') == 'LIMIT_OF_STORAGE_IN_PROJECT_REACHED'):
raise StorageLimitReached()
raise
return self._upload_tar_data(
experiment=experiment,
api_method=self.backend_swagger_client.api.uploadExperimentSourceAsTarstream,
data=data
)

@with_api_exceptions_handler
def create_channel(self, experiment, name, channel_type):
Expand Down Expand Up @@ -682,41 +677,22 @@ def send_hardware_metric_reports(self, experiment, metrics, metric_reports):
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project.full_id)

@handle_quota_limits
def upload_experiment_output(self, experiment, data, progress_indicator):
try:
# Api exception handling is done in _upload_loop
self._upload_loop(partial(self._upload_raw_data,
api_method=self.backend_swagger_client.api.uploadExperimentOutput),
data=data,
progress_indicator=progress_indicator,
path_params={'experimentId': experiment.internal_id},
query_params={})
except HTTPError as e:
if e.response.status_code == NOT_FOUND:
# pylint: disable=protected-access
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project.full_id)
if e.response.status_code == UNPROCESSABLE_ENTITY and (
extract_response_field(e.response, 'type') == 'LIMIT_OF_STORAGE_IN_PROJECT_REACHED'):
raise StorageLimitReached()
raise

self._upload_loop(partial(self._upload_raw_data,
api_method=self.backend_swagger_client.api.uploadExperimentOutput),
data=data,
progress_indicator=progress_indicator,
path_params={'experimentId': experiment.internal_id},
query_params={})

@handle_quota_limits
def extract_experiment_output(self, experiment, data):
try:
return self._upload_tar_data(
experiment=experiment,
api_method=self.backend_swagger_client.api.uploadExperimentOutputAsTarstream,
data=data
)
except HTTPError as e:
if e.response.status_code == NOT_FOUND:
# pylint: disable=protected-access
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project.full_id)
if e.response.status_code == UNPROCESSABLE_ENTITY and (
extract_response_field(e.response, 'type') == 'LIMIT_OF_STORAGE_IN_PROJECT_REACHED'):
raise StorageLimitReached()
raise
return self._upload_tar_data(
experiment=experiment,
api_method=self.backend_swagger_client.api.uploadExperimentOutputAsTarstream,
data=data
)

@with_api_exceptions_handler
def rm_data(self, experiment, path):
Expand Down
34 changes: 0 additions & 34 deletions neptune/internal/utils/http.py

This file was deleted.

68 changes: 68 additions & 0 deletions neptune/internal/utils/http_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Copyright (c) 2019, Neptune Labs Sp. z o.o.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from functools import wraps
from http.client import NOT_FOUND, UNPROCESSABLE_ENTITY # pylint:disable=no-name-in-module

from requests.exceptions import HTTPError

from neptune.api_exceptions import ExperimentNotFound, StorageLimitReached
from neptune.exceptions import NeptuneException

_logger = logging.getLogger(__name__)


def extract_response_field(response, field_name):
if response is None:
return None

try:
response_json = response.json()
if isinstance(response_json, dict):
return response_json.get(field_name)
else:
_logger.debug('HTTP response is not a dict: %s', str(response_json))
return None
except ValueError as e:
_logger.debug('Failed to parse HTTP response: %s', e)
return None


def handle_quota_limits(f):
"""Wrapper for functions which may request for non existing experiment or cause quota limit breach
Limitations:
Decorated function must be called with experiment argument like this fun(..., experiment=<experiment>, ...)"""

@wraps(f)
def handler(*args, **kwargs):
experiment = kwargs.get('experiment')
if experiment is None:
raise NeptuneException('This function must be called with experiment passed by name,'
' like this fun(..., experiment=<experiment>, ...)')
try:
return f(*args, **kwargs)
except HTTPError as e:
if e.response.status_code == NOT_FOUND:
# pylint: disable=protected-access
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project.full_id)
if (e.response.status_code == UNPROCESSABLE_ENTITY and
extract_response_field(e.response, 'title').startswith('Storage limit reached in organization: ')):
raise StorageLimitReached()
raise

return handler

0 comments on commit 8ecb6bc

Please sign in to comment.