Skip to content

Commit

Permalink
Merge pull request #1497 from neptune-ai/pg/revert
Browse files Browse the repository at this point in the history
Pg/revert
  • Loading branch information
PatrykGala authored Oct 10, 2023
2 parents 51f6286 + b737e26 commit 1ef5bf5
Show file tree
Hide file tree
Showing 38 changed files with 1,017 additions and 2,509 deletions.
8 changes: 0 additions & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
## [UNRELEASED] neptune 1.8.2

### Changes
- Upgraded performance of sending series data to Neptune ([#1483](https://github.com/neptune-ai/neptune-client/pull/1483))
- Compress (gzip) request to server, when server support it ([#1476](https://github.com/neptune-ai/neptune-client/pull/1476))
- Support for batch processing parameters from env variables ([#1495](https://github.com/neptune-ai/neptune-client/pull/1495))


## neptune 1.8.1

### Fixes
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,15 @@ module = [
"neptune.internal.backends.offline_neptune_backend",
"neptune.internal.backends.operation_api_name_visitor",
"neptune.internal.backends.operation_api_object_converter",
"neptune.internal.backends.operations_preprocessor",
"neptune.internal.backends.project_name_lookup",
"neptune.internal.backends.swagger_client_wrapper",
"neptune.internal.backends.utils",
"neptune.internal.backgroud_job_list",
"neptune.internal.background_job",
"neptune.internal.container_structure",
"neptune.internal.credentials",
"neptune.internal.disk_queue",
"neptune.internal.hardware.gpu.gpu_monitor",
"neptune.internal.hardware.hardware_metric_reporting_job",
"neptune.internal.id_formats",
Expand All @@ -350,6 +352,7 @@ module = [
"neptune.internal.utils.git",
"neptune.internal.utils.hashing",
"neptune.internal.utils.images",
"neptune.internal.utils.json_file_splitter",
"neptune.internal.utils.limits",
"neptune.internal.utils.logger",
"neptune.internal.utils.ping_background_job",
Expand All @@ -358,6 +361,7 @@ module = [
"neptune.internal.utils.runningmode",
"neptune.internal.utils.s3",
"neptune.internal.utils.source_code",
"neptune.internal.utils.sync_offset_file",
"neptune.internal.utils.traceback_job",
"neptune.internal.utils.uncaught_exception_handler",
"neptune.internal.websockets.websocket_signals_background_job",
Expand Down
2 changes: 1 addition & 1 deletion src/neptune/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
from neptune.internal.backends.hosted_neptune_backend import HostedNeptuneBackend
from neptune.internal.backends.neptune_backend import NeptuneBackend # noqa: F401
from neptune.internal.credentials import Credentials
from neptune.internal.disk_queue import DiskQueue # noqa: F401
from neptune.internal.operation import Operation # noqa: F401
from neptune.internal.queue.disk_queue import DiskQueue # noqa: F401


@click.command()
Expand Down
2 changes: 1 addition & 1 deletion src/neptune/cli/path_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
Project,
)
from neptune.internal.backends.neptune_backend import NeptuneBackend # noqa: F401
from neptune.internal.disk_queue import DiskQueue # noqa: F401
from neptune.internal.operation import Operation # noqa: F401
from neptune.internal.queue.disk_queue import DiskQueue # noqa: F401


def get_neptune_path(ctx, param, path: str) -> Path:
Expand Down
2 changes: 1 addition & 1 deletion src/neptune/cli/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
Project,
)
from neptune.internal.container_type import ContainerType
from neptune.internal.disk_queue import DiskQueue
from neptune.internal.id_formats import (
QualifiedName,
UniqueId,
)
from neptune.internal.operation import Operation
from neptune.internal.operation_processors.operation_storage import OperationStorage
from neptune.internal.queue.disk_queue import DiskQueue
from neptune.internal.utils.logger import logger

retries_timeout = int(os.getenv(NEPTUNE_SYNC_BATCH_TIMEOUT_ENV, "3600"))
Expand Down
2 changes: 1 addition & 1 deletion src/neptune/cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@
)
from neptune.internal.backends.neptune_backend import NeptuneBackend
from neptune.internal.container_type import ContainerType
from neptune.internal.disk_queue import DiskQueue
from neptune.internal.id_formats import (
QualifiedName,
UniqueId,
)
from neptune.internal.operation import Operation
from neptune.internal.queue.disk_queue import DiskQueue
from neptune.internal.utils.logger import logger


Expand Down
10 changes: 2 additions & 8 deletions src/neptune/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
"NEPTUNE_NOTEBOOK_PATH",
"NEPTUNE_RETRIES_TIMEOUT_ENV",
"NEPTUNE_SYNC_BATCH_TIMEOUT_ENV",
"NEPTUNE_ASYNC_MAX_POINTS_PER_BATCH",
"NEPTUNE_ASYNC_MAX_ATTRIBUTES_IN_BATCH",
"NEPTUNE_ASYNC_MAX_POINTS_PER_ATTRIBUTE",
"NEPTUNE_ASYNC_BATCH_SIZE",
"NEPTUNE_SUBPROCESS_KILL_TIMEOUT",
"NEPTUNE_FETCH_TABLE_STEP_SIZE",
"NEPTUNE_SYNC_AFTER_STOP_TIMEOUT",
Expand Down Expand Up @@ -62,11 +60,7 @@

NEPTUNE_SYNC_AFTER_STOP_TIMEOUT = "NEPTUNE_SYNC_AFTER_STOP_TIMEOUT"

NEPTUNE_ASYNC_MAX_POINTS_PER_BATCH = "NEPTUNE_ASYNC_MAX_POINTS_PER_BATCH"

NEPTUNE_ASYNC_MAX_ATTRIBUTES_IN_BATCH = "NEPTUNE_ASYNC_MAX_ATTRIBUTES_IN_BATCH"

NEPTUNE_ASYNC_MAX_POINTS_PER_ATTRIBUTE = "NEPTUNE_ASYNC_MAX_POINTS_PER_ATTRIBUTE"
NEPTUNE_ASYNC_BATCH_SIZE = "NEPTUNE_ASYNC_BATCH_SIZE"

NEPTUNE_REQUEST_TIMEOUT = "NEPTUNE_REQUEST_TIMEOUT"

Expand Down
3 changes: 0 additions & 3 deletions src/neptune/internal/backends/api_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class ClientConfig:
_missing_features: FrozenSet[str]
version_info: VersionInfo
multipart_config: MultipartConfig
gzip_upload: bool

def has_feature(self, feature_name: str) -> bool:
return feature_name not in self._missing_features
Expand Down Expand Up @@ -163,7 +162,6 @@ def from_api_response(config) -> "ClientConfig":
multipart_upload_config = MultipartConfig(
min_chunk_size, max_chunk_size, max_chunk_count, max_single_part_size
)
gzip_upload = getattr(config, "gzipUpload", False)

artifacts_config_obj = getattr(config, "artifacts", None)
has_artifacts = getattr(artifacts_config_obj, "enabled", False)
Expand All @@ -181,7 +179,6 @@ def from_api_response(config) -> "ClientConfig":
_missing_features=frozenset(missing_features),
version_info=VersionInfo.build(min_recommended, min_compatible, max_compatible),
multipart_config=multipart_upload_config,
gzip_upload=gzip_upload,
)


Expand Down
29 changes: 1 addition & 28 deletions src/neptune/internal/backends/hosted_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import os
import platform
import zlib
from typing import (
Dict,
Tuple,
Expand All @@ -32,11 +31,6 @@
import requests
from bravado.http_client import HttpClient
from bravado.requests_client import RequestsClient
from requests import (
PreparedRequest,
Response,
)
from requests.adapters import HTTPAdapter

from neptune.common.backends.utils import with_api_exceptions_handler
from neptune.common.oauth import NeptuneAuthenticator
Expand All @@ -54,7 +48,6 @@
verify_host_resolution,
)
from neptune.internal.credentials import Credentials
from neptune.internal.utils.logger import logger
from neptune.version import version as neptune_client_version

BACKEND_SWAGGER_PATH = "/api/backend/swagger.json"
Expand All @@ -73,22 +66,6 @@
}


class GzipAdapter(HTTPAdapter):
def send(self, request: PreparedRequest, stream: bool = False, **kw) -> Response:
if request.body is not None and not stream and request.headers.get("Content-Type", None) == "application/json":
try:
request_body = request.body if isinstance(request.body, bytes) else bytes(request.body, "utf-8")
gzip_compress = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS | 16)
compressed = gzip_compress.compress(request_body) + gzip_compress.flush()
request.prepare_body(compressed, None)
request.headers["Content-Encoding"] = "gzip"
except zlib.error:
logger.warning("Error on compressing request")
pass

return super(GzipAdapter, self).send(request, stream, **kw)


def _close_connections_on_fork(session: requests.Session):
try:
os.register_at_fork(before=session.close, after_in_child=session.close, after_in_parent=session.close)
Expand Down Expand Up @@ -197,11 +174,7 @@ def create_backend_client(client_config: ClientConfig, http_client: HttpClient)


@cache
def create_leaderboard_client(client_config: ClientConfig, http_client: RequestsClient) -> SwaggerClientWrapper:
if client_config.gzip_upload:
http_client.session.mount("http://", GzipAdapter())
http_client.session.mount("https://", GzipAdapter())

def create_leaderboard_client(client_config: ClientConfig, http_client: HttpClient) -> SwaggerClientWrapper:
return SwaggerClientWrapper(
create_swagger_client(
build_operation_url(client_config.api_url, LEADERBOARD_SWAGGER_PATH),
Expand Down
56 changes: 4 additions & 52 deletions src/neptune/internal/backends/hosted_neptune_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
from neptune.internal.backends.nql import NQLQuery
from neptune.internal.backends.operation_api_name_visitor import OperationApiNameVisitor
from neptune.internal.backends.operation_api_object_converter import OperationApiObjectConverter
from neptune.internal.backends.operations_preprocessor import OperationsPreprocessor
from neptune.internal.backends.utils import (
ExecuteOperationsBatchingManager,
MissingApiClient,
Expand All @@ -132,7 +133,6 @@
UploadFileSet,
)
from neptune.internal.operation_processors.operation_storage import OperationStorage
from neptune.internal.preprocessor.operations_preprocessor import OperationsPreprocessor
from neptune.internal.utils import base64_decode
from neptune.internal.utils.generic_attribute_mapper import map_attribute_result_to_value
from neptune.internal.utils.git import GitInfo
Expand All @@ -145,7 +145,6 @@
from bravado.requests_client import RequestsClient

from neptune.internal.backends.api_model import ClientConfig
from neptune.internal.preprocessor.accumulated_operations import AccumulatedOperations


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -458,9 +457,9 @@ def execute_operations(
dropped_count = operations_batch.dropped_operations_count

operations_preprocessor = OperationsPreprocessor()
operations_preprocessor.process_batch(operations_batch.operations)
operations_preprocessor.process(operations_batch.operations)

preprocessed_operations = operations_preprocessor.accumulate_operations()
preprocessed_operations = operations_preprocessor.get_operations()
errors.extend(preprocessed_operations.errors)

if preprocessed_operations.artifact_operations:
Expand Down Expand Up @@ -504,61 +503,14 @@ def execute_operations(
errors,
)

def execute_operations_from_accumulator(
self,
container_id: UniqueId,
container_type: ContainerType,
accumulated_operations: "AccumulatedOperations",
operation_storage: OperationStorage,
) -> Tuple[int, List[NeptuneException]]:
errors: List[NeptuneException] = accumulated_operations.errors

if accumulated_operations.artifact_operations:
self.verify_feature_available(OptionalFeatures.ARTIFACTS)

# Upload operations should be done first since they are idempotent
if accumulated_operations.upload_operations:
errors.extend(
self._execute_upload_operations_with_400_retry(
container_id=container_id,
container_type=container_type,
upload_operations=accumulated_operations.upload_operations,
operation_storage=operation_storage,
)
)

if accumulated_operations.artifact_operations:
artifact_operations_errors, assign_artifact_operations = self._execute_artifact_operations(
container_id=container_id,
container_type=container_type,
artifact_operations=accumulated_operations.artifact_operations,
)

errors.extend(artifact_operations_errors)
accumulated_operations.other_operations.extend(assign_artifact_operations)

if accumulated_operations.other_operations:
errors.extend(
self._execute_operations(
container_id,
container_type,
operations=accumulated_operations.other_operations,
)
)

for op in itertools.chain(accumulated_operations.upload_operations, accumulated_operations.other_operations):
op.clean(operation_storage=operation_storage)

return accumulated_operations.source_operations_count, errors

def _execute_upload_operations(
self,
container_id: str,
container_type: ContainerType,
upload_operations: List[Operation],
operation_storage: OperationStorage,
) -> List[NeptuneException]:
errors: List[NeptuneException] = list()
errors = list()

if self._client_config.has_feature(OptionalFeatures.MULTIPART_UPLOAD):
multipart_config = self._client_config.multipart_config
Expand Down
14 changes: 0 additions & 14 deletions src/neptune/internal/backends/neptune_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import abc
from typing import (
TYPE_CHECKING,
Any,
List,
Optional,
Expand Down Expand Up @@ -60,9 +59,6 @@
from neptune.internal.utils.git import GitInfo
from neptune.internal.websockets.websockets_factory import WebsocketsFactory

if TYPE_CHECKING:
from neptune.internal.preprocessor.accumulated_operations import AccumulatedOperations


class NeptuneBackend:
def close(self) -> None:
Expand Down Expand Up @@ -147,16 +143,6 @@ def execute_operations(
) -> Tuple[int, List[NeptuneException]]:
pass

@abc.abstractmethod
def execute_operations_from_accumulator(
self,
container_id: UniqueId,
container_type: ContainerType,
accumulated_operations: "AccumulatedOperations",
operation_storage: OperationStorage,
) -> Tuple[int, List[NeptuneException]]:
pass

@abc.abstractmethod
def get_attributes(self, container_id: str, container_type: ContainerType) -> List[Attribute]:
pass
Expand Down
19 changes: 0 additions & 19 deletions src/neptune/internal/backends/neptune_backend_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datetime import datetime
from shutil import copyfile
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Expand Down Expand Up @@ -133,10 +132,6 @@
from neptune.types.value import Value
from neptune.types.value_visitor import ValueVisitor

if TYPE_CHECKING:
from neptune.internal.preprocessor.accumulated_operations import AccumulatedOperations


Val = TypeVar("Val", bound=Value)


Expand Down Expand Up @@ -297,20 +292,6 @@ def execute_operations(
result.append(e)
return len(operations), result

def execute_operations_from_accumulator(
self,
container_id: UniqueId,
container_type: ContainerType,
accumulated_operations: "AccumulatedOperations",
operation_storage: OperationStorage,
) -> Tuple[int, List[NeptuneException]]:
return self.execute_operations(
container_id=container_id,
container_type=container_type,
operations=accumulated_operations.all_operations(),
operation_storage=operation_storage,
)

def _execute_operation(
self, container_id: UniqueId, container_type: ContainerType, op: Operation, operation_storage: OperationStorage
) -> None:
Expand Down
Loading

0 comments on commit 1ef5bf5

Please sign in to comment.