diff --git a/deker_server_adapters/base.py b/deker_server_adapters/base.py index 9a0bfcd..0eda736 100644 --- a/deker_server_adapters/base.py +++ b/deker_server_adapters/base.py @@ -63,8 +63,7 @@ def hash_ring(self) -> HashRing: """Return HashRing instance.""" hash_ring = self.ctx.extra.get("hash_ring") if not hash_ring: - msg = "Attempt to use cluster logic in single server mode" - raise AttributeError(msg) + raise AttributeError("Attempt to use cluster logic in single server mode") return hash_ring # type: ignore[attr-defined] @property @@ -174,7 +173,7 @@ def read_meta(self, array: "BaseArray") -> ArrayMeta: # Only Varray can be located on different nodes yet. if self.type == ArrayType.varray and self.client.cluster_mode: - response = make_request(url=url, nodes=self.nodes_urls, client=self.client, retry_on_hash_failure=True) + response = make_request(url=url, nodes=self.nodes_urls, client=self.client) elif self.client.cluster_mode: response = request_in_cluster(url, array, self.ctx, True) else: @@ -283,7 +282,7 @@ def update(self, array: "BaseArray", bounds: Slice, data: Numeric) -> None: if hasattr(data, "tobytes"): request_kwargs["data"] = data.tobytes() else: - request_kwargs["data"] = bytes(data) # type: ignore + request_kwargs["data"] = bytes(data) # We write (v)array through the node it belongs in cluster try: if self.client.cluster_mode: @@ -376,12 +375,12 @@ def get_by_primary_attributes( return None return self.__create_array_from_response( response, - { - "type": self.type, - "collection": collection, - "array_adapter": array_adapter, - "varray_adapter": varray_adapter, - }, + dict( + type=self.type, + collection=collection, + array_adapter=array_adapter, + varray_adapter=varray_adapter, + ), ) def get_by_id( @@ -428,12 +427,12 @@ def get_by_id( return self.__create_array_from_response( response, - { - "type": self.type, - "collection": collection, - "array_adapter": array_adapter, - "varray_adapter": varray_adapter, - }, + dict( + type=self.type, + collection=collection, + array_adapter=array_adapter, + varray_adapter=varray_adapter, + ), ) def __iter__(self) -> Generator["ArrayMeta", None, None]: diff --git a/deker_server_adapters/cluster_config.py b/deker_server_adapters/cluster_config.py index ccbdf60..378571b 100644 --- a/deker_server_adapters/cluster_config.py +++ b/deker_server_adapters/cluster_config.py @@ -6,13 +6,12 @@ from deker.ctx import CTX from deker.uri import Uri -from deker_server_adapters.consts import LAST_MODIFIED_HEADER, STATUS_OK +from deker_server_adapters.consts import STATUS_OK from deker_server_adapters.errors import DekerClusterError, DekerServerError from deker_server_adapters.hash_ring import HashRing from deker_server_adapters.utils.requests import make_request from deker_server_adapters.utils.version import get_api_version - CLUSTER_MODE = "cluster" @@ -48,7 +47,7 @@ class ClusterConfig: leader: Node current: List[Node] target: Optional[List[Node]] = None # Only appears when cluster in rebalancing mode - cluster_status: Optional[str] = "" + __hash_ring: HashRing = field(init=False) __hash_ring_target: HashRing = field(init=False) @@ -76,13 +75,7 @@ def process_nodes(nodes: List[dict]) -> List[Node]: current = process_nodes(cluster_config_dict["current"]) target = process_nodes(cluster_config_dict["target"]) if "target" in cluster_config_dict else None - return cls( - mode=cluster_config_dict["mode"], - leader=leader, - current=current, - target=target, - cluster_status=cluster_config_dict.get("cluster_status"), - ) + return cls(mode=cluster_config_dict["mode"], leader=leader, current=current, target=target) def request_config(ctx: CTX) -> dict: # type: ignore[return-value] @@ -106,11 +99,7 @@ def request_config(ctx: CTX) -> dict: # type: ignore[return-value] try: config = response.json() # type: ignore[union-attr] - # Set hash of config - httpx_client.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]}) return config - except KeyError: - raise DekerClusterError(response, f"No {LAST_MODIFIED_HEADER} header found in response.") except JSONDecodeError: if ctx.uri.servers: raise DekerClusterError(response, "Server responded with wrong config. Couldn't parse json") diff --git a/deker_server_adapters/consts.py b/deker_server_adapters/consts.py index 73ed1c4..62a4029 100644 --- a/deker_server_adapters/consts.py +++ b/deker_server_adapters/consts.py @@ -11,7 +11,6 @@ CONTENT_TOO_LARGE = 413 TOO_MANY_REQUESTS = 429 NON_LEADER_WRITE = 421 -CONFLICT_HASH = 409 # Name of the parameter in the server response COLLECTION_NAME_PARAM = "collection_name" @@ -20,9 +19,6 @@ TOO_LARGE_ERROR_MESSAGE = "Requested object is too large, use smaller subset" EXCEPTION_CLASS_PARAM_NAME = "class" -LAST_MODIFIED_HEADER = "last-modified" -REBALANCING_STATUS = "rebalancing" -NORMAL_STATUS = "normal" class ArrayType(Enum): diff --git a/deker_server_adapters/errors.py b/deker_server_adapters/errors.py index 9269ac8..2b182b0 100644 --- a/deker_server_adapters/errors.py +++ b/deker_server_adapters/errors.py @@ -97,7 +97,3 @@ class FilteringByIdInClusterIsForbidden(DekerBaseApplicationError): class HashRingError(DekerBaseApplicationError): """If there is a problem with HashRing.""" - - -class InvalidConfigHash(DekerBaseApplicationError): - """If config was updated.""" diff --git a/deker_server_adapters/hash_ring.py b/deker_server_adapters/hash_ring.py index d0b7796..e460d36 100644 --- a/deker_server_adapters/hash_ring.py +++ b/deker_server_adapters/hash_ring.py @@ -73,8 +73,7 @@ def get_node(self, string_key: str) -> T: # type: ignore[type-var] """ pos = self.get_node_pos(string_key) if pos is None: - msg = f"Couldn't find a position in {self.ring}" - raise HashRingError(msg) + raise HashRingError(f"Couldn't find a position in {self.ring}") return self.ring[self._sorted_keys[pos]] def get_node_pos(self, string_key: str) -> Optional[int]: diff --git a/deker_server_adapters/httpx_client.py b/deker_server_adapters/httpx_client.py index bc9b483..a608165 100644 --- a/deker_server_adapters/httpx_client.py +++ b/deker_server_adapters/httpx_client.py @@ -6,21 +6,14 @@ from deker_server_adapters.cluster_config import apply_config from deker_server_adapters.consts import ( - CONFLICT_HASH, CONTENT_TOO_LARGE, EXCEPTION_CLASS_PARAM_NAME, - LAST_MODIFIED_HEADER, NON_LEADER_WRITE, RATE_ERROR_MESSAGE, TOO_LARGE_ERROR_MESSAGE, TOO_MANY_REQUESTS, ) -from deker_server_adapters.errors import ( - DekerBaseRateLimitError, - DekerDataPointsLimitError, - DekerRateLimitError, - InvalidConfigHash, -) +from deker_server_adapters.errors import DekerBaseRateLimitError, DekerDataPointsLimitError, DekerRateLimitError def rate_limit_err(response: Response, message: str, class_: Type[DekerBaseRateLimitError]) -> None: @@ -50,23 +43,13 @@ class HttpxClient(Client): ctx: CTX cluster_mode: bool = False - def request(self, *args: Any, retry_on_hash_failure: bool = False, **kwargs: Any) -> Response: + def request(self, *args: Any, **kwargs: Any) -> Response: """Override httpx method to handle rate errors. :param args: arguments to request - :param retry_on_hash_failure: If we should retry on invalid hash :param kwargs: keyword arguments to request """ response = super().request(*args, **kwargs) - if response.status_code == CONFLICT_HASH: - apply_config(response.json(), self.ctx) - if LAST_MODIFIED_HEADER in response.headers: - self.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]}) - if retry_on_hash_failure: - response = super().request(*args, **kwargs) - else: - raise InvalidConfigHash - if response.status_code == TOO_MANY_REQUESTS: rate_limit_err(response=response, message=RATE_ERROR_MESSAGE, class_=DekerRateLimitError) elif ( @@ -77,6 +60,7 @@ def request(self, *args: Any, retry_on_hash_failure: bool = False, **kwargs: Any if response.status_code == NON_LEADER_WRITE: apply_config(response.json(), self.ctx) + return super().request(*args, **kwargs) return response diff --git a/deker_server_adapters/utils/requests.py b/deker_server_adapters/utils/requests.py index 2c44086..bf8c6f1 100644 --- a/deker_server_adapters/utils/requests.py +++ b/deker_server_adapters/utils/requests.py @@ -6,10 +6,10 @@ from deker.ABC import BaseArray from deker.ctx import CTX -from httpx import Response +from httpx import Client, Response -from deker_server_adapters.consts import REBALANCING_STATUS, STATUS_OK -from deker_server_adapters.errors import DekerServerError, InvalidConfigHash +from deker_server_adapters.consts import STATUS_OK +from deker_server_adapters.errors import DekerServerError from deker_server_adapters.models import Status from deker_server_adapters.utils.hashing import get_hash_key, get_id_and_primary_attributes from deker_server_adapters.utils.version import get_api_version @@ -22,17 +22,11 @@ def _request( - url: str, - node: str, - client: "HttpxClient", - method: str = "GET", - request_kwargs: Optional[Dict] = None, - retry_on_hash_failure: bool = False, + url: str, node: str, client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None ) -> Optional[Response]: """Internal request func - Make GET request on given node. :param url: What we request - :param retry_on_hash_failure: If we should retry on invalid hash :param node: Node for requesting :param client: Httpx Client :param method: Http method @@ -42,9 +36,7 @@ def _request( request_url = f"{node.rstrip('/')}/{url.lstrip('/')}" request_kwargs = request_kwargs or {} try: - response = client.request(method, request_url, **request_kwargs, retry_on_hash_failure=retry_on_hash_failure) - except InvalidConfigHash: - raise + response = client.request(method, request_url, **request_kwargs) except Exception as e: traceback.print_exc(-1) logger.exception(f"Coudn't get response from {node}", exc_info=e) # noqa @@ -53,16 +45,10 @@ def _request( def make_request( - url: str, - nodes: Union[List, Tuple, Set], - client: "HttpxClient", - method: str = "GET", - request_kwargs: Optional[Dict] = None, - retry_on_hash_failure: bool = False, + url: str, nodes: Union[List, Tuple, Set], client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None ) -> Optional[Response]: """Make GET request on random node, while response is not received. - :param retry_on_hash_failure: If we should retry on invalid hash :param method: HTTP Method :param url: What we request :param request_kwargs: Kwargs for request @@ -73,13 +59,13 @@ def make_request( nodes = list(nodes) if len(nodes) == 1: node = nodes.pop(0) - response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure) + response = _request(url, node, client, method, request_kwargs) else: while nodes and (response is None or response.status_code != STATUS_OK): index = randint(0, len(nodes) - 1) node = nodes.pop(index) - response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure) + response = _request(url, node, client, method, request_kwargs) return response @@ -120,24 +106,21 @@ def request_in_cluster( :param method: Http method :param request_kwargs: Extra data for request """ + from deker_server_adapters.cluster_config import request_and_apply_config + # Retrieve fresh config + request_and_apply_config(ctx) client = ctx.extra["httpx_client"] node = ctx.extra["hash_ring"].get_node(get_hash_key(array)) # Check status of file - def _check_status() -> None: - if should_check_status and ctx.extra["cluster_config"].cluster_status == REBALANCING_STATUS: - status = check_status(ctx, array) - if status == Status.MOVED: - ctx.extra["hash_ring_target"].get_node(get_hash_key(array)) + if should_check_status: + status = check_status(ctx, array) + if status == Status.MOVED: + node = ctx.extra["hash_ring_target"].get_node(get_hash_key(array)) - _check_status() # Acquire locks # TODO: Lock acquiring logic if needed # Make request - try: - return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs) - except InvalidConfigHash: - _check_status() - return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs) + return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs) diff --git a/tests/conftest.py b/tests/conftest.py index 6b9eec6..4821be7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,7 +18,6 @@ from deker_server_adapters.array_adapter import ServerArrayAdapter from deker_server_adapters.cluster_config import apply_config from deker_server_adapters.collection_adapter import ServerCollectionAdapter -from deker_server_adapters.consts import LAST_MODIFIED_HEADER, NORMAL_STATUS, REBALANCING_STATUS from deker_server_adapters.factory import AdaptersFactory from deker_server_adapters.hash_ring import HashRing from deker_server_adapters.httpx_client import HttpxClient @@ -38,11 +37,6 @@ def mode(request) -> str: return request.param -@pytest.fixture(params=[REBALANCING_STATUS, NORMAL_STATUS]) -def status(request) -> str: - return request.param - - @pytest.fixture() def base_uri(mode, base_cluster_uri): if mode == SINGLE_MODE: @@ -82,9 +76,7 @@ def ctx(mode, base_uri: Uri, nodes: List[dict], mocked_ping: dict) -> CTX: @pytest.fixture() def mock_ping(mode: str, httpx_mock: HTTPXMock, mocked_ping: Dict): if mode == CLUSTER_MODE: - httpx_mock.add_response( - method="GET", url=re.compile(r".*\/v1\/ping.*"), json=mocked_ping, headers={LAST_MODIFIED_HEADER: "foo"} - ) + httpx_mock.add_response(method="GET", url=re.compile(r".*\/v1\/ping.*"), json=mocked_ping) else: httpx_mock.add_response(method="GET", url=re.compile(r".*\/v1\/ping.*"), status_code=200) @@ -241,6 +233,6 @@ def array_url_path(array: Array) -> str: @pytest.fixture() -def mock_status(mode: str, request: pytest.FixtureRequest, status: str): - if mode == CLUSTER_MODE and status == REBALANCING_STATUS: +def mock_status(mode: str, request: pytest.FixtureRequest): + if mode == CLUSTER_MODE: request.getfixturevalue("mocked_filestatus_check_unmoved") diff --git a/tests/plugins/cluster.py b/tests/plugins/cluster.py index 5635078..7302a3d 100644 --- a/tests/plugins/cluster.py +++ b/tests/plugins/cluster.py @@ -7,7 +7,6 @@ from deker.uri import Uri from pytest_httpx import HTTPXMock -from deker_server_adapters.consts import REBALANCING_STATUS from deker_server_adapters.models import Status @@ -19,14 +18,14 @@ def nodes() -> List[Dict]: "host": "localhost", "port": 8000, "protocol": "http", - "storage": "file:///tmp/deker_server", + "storage": "file:///tmp/deker_server" }, { "id": "8381202B-8C95-487A-B9B5-0B5270568040", "host": "localhost", "port": 8012, "protocol": "http", - "storage": "file:///tmp/deker_server", + "storage": "file:///tmp/deker_server" }, ] @@ -49,14 +48,13 @@ def base_cluster_uri(nodes_urls): @pytest.fixture() -def mocked_ping(nodes: List[Dict], status: str) -> Dict: +def mocked_ping(nodes: List[Dict]) -> Dict: return { "mode": "cluster", - "cluster_status": status, "this_id": "8381202B-8C95-487A-B9B5-0B527056804E", "leader_id": "8381202B-8C95-487A-B9B5-0B527056804E", "current": nodes, - "raft": nodes, + "raft": nodes } @@ -66,6 +64,5 @@ def mock_healthcheck(httpx_mock: HTTPXMock, mocked_ping): @pytest.fixture() -def mocked_filestatus_check_unmoved(httpx_mock: HTTPXMock, status: str): - if status == REBALANCING_STATUS: - httpx_mock.add_response(method="GET", url=re.compile(r".*\/status.*"), text=Status.UNMOVED.value) +def mocked_filestatus_check_unmoved(httpx_mock: HTTPXMock): + httpx_mock.add_response(method="GET", url=re.compile(r".*\/status.*"), text=Status.UNMOVED.value) diff --git a/tests/test_cases/test_cluster/conftest.py b/tests/test_cases/test_cluster/conftest.py index d8accaa..3a54a44 100644 --- a/tests/test_cases/test_cluster/conftest.py +++ b/tests/test_cases/test_cluster/conftest.py @@ -1,12 +1,7 @@ -import re - -import httpx import pytest from tests.conftest import CLUSTER_MODE -from deker_server_adapters.consts import LAST_MODIFIED_HEADER - @pytest.fixture() def mode() -> str: diff --git a/tests/test_cases/test_cluster/test_array_adapter.py b/tests/test_cases/test_cluster/test_array_adapter.py index 513be07..abce3c6 100644 --- a/tests/test_cases/test_cluster/test_array_adapter.py +++ b/tests/test_cases/test_cluster/test_array_adapter.py @@ -2,8 +2,8 @@ import re from typing import TYPE_CHECKING, List +from unittest.mock import patch -import httpx import numpy as np import pytest @@ -12,9 +12,7 @@ from pytest_httpx import HTTPXMock from deker_server_adapters.array_adapter import ServerArrayAdapter -from deker_server_adapters.consts import LAST_MODIFIED_HEADER from deker_server_adapters.errors import FilteringByIdInClusterIsForbidden -from deker_server_adapters.models import Status from deker_server_adapters.utils.hashing import get_hash_key @@ -76,33 +74,3 @@ def test_iter_success( def test_filter_by_id_is_not_allowed(collection_with_primary_attributes): with pytest.raises(FilteringByIdInClusterIsForbidden): collection_with_primary_attributes.filter({"id": "foo"}).last() - - -def test_hash_updated(httpx_mock: HTTPXMock, server_array_adapter: ServerArrayAdapter, mocked_ping, array, status: str): - class RequestCounter: - def __init__(self): - self.count = 0 - - request_counter = RequestCounter() - - def limited_mock_response(request, extensions, request_counter, mocked_ping, data) -> httpx.Response: - url_to_mock = re.compile(r".*/v1/collection/.*/array/by-id/.*") - - if url_to_mock.search(str(request.url)): - request_counter.count += 1 - - if request_counter.count < 2: - return httpx.Response(409, json=mocked_ping, headers={LAST_MODIFIED_HEADER: "new-hash"}) - else: - return httpx.Response(200, json=data) - - elif "status" in str(request.url): - return httpx.Response(text=Status.UNMOVED.value, status_code=200) - - return httpx.Response(404, json={"error": "Not found"}) - - httpx_mock.add_callback( - lambda request: limited_mock_response(request, None, request_counter, mocked_ping, array.as_dict) - ) - server_array_adapter.read_meta(array) - assert server_array_adapter.client.headers[LAST_MODIFIED_HEADER] == "new-hash" diff --git a/tests/test_cases/test_cluster/test_factory.py b/tests/test_cases/test_cluster/test_factory.py index 2429f7a..53e2180 100644 --- a/tests/test_cases/test_cluster/test_factory.py +++ b/tests/test_cases/test_cluster/test_factory.py @@ -10,7 +10,6 @@ from deker_local_adapters.storage_adapters.hdf5 import HDF5StorageAdapter from pytest_httpx import HTTPXMock -from deker_server_adapters.consts import LAST_MODIFIED_HEADER from deker_server_adapters.errors import DekerClusterError from deker_server_adapters.factory import AdaptersFactory @@ -92,9 +91,3 @@ def test_factory_get_config_from_single_server(ctx, mock_ping, mocked_ping): assert sorted([asdict(node) for node in adapter.hash_ring.nodes], key=lambda x: x["id"]) == sorted( [node for node in mocked_ping["current"]], key=lambda x: x["id"] ) - - -def test_hash_added_to_headers(ctx: CTX, mock_ping): - uri = Uri.create("http://test:test@localhost/") - factory = AdaptersFactory(ctx, uri) - assert factory.httpx_client.headers[LAST_MODIFIED_HEADER] diff --git a/tests/test_cases/test_cluster/test_httpx_client.py b/tests/test_cases/test_cluster/test_httpx_client.py index 21e42b3..e0e0414 100644 --- a/tests/test_cases/test_cluster/test_httpx_client.py +++ b/tests/test_cases/test_cluster/test_httpx_client.py @@ -20,7 +20,7 @@ def test_new_cluster_config_is_applied_after_non_leader_error( "host": "newhost.owm.io", "port": 80, "protocol": "http", - "storage": "file:///tmp/deker_server", + "storage": "file:///tmp/deker_server" } ], "raft": [ @@ -29,7 +29,7 @@ def test_new_cluster_config_is_applied_after_non_leader_error( "host": "newhost.owm.io", "port": 80, "protocol": "http", - "storage": "file:///tmp/deker_server", + "storage": "file:///tmp/deker_server" } ], } diff --git a/tests/test_cases/test_cluster/test_varray_adapters.py b/tests/test_cases/test_cluster/test_varray_adapters.py index b406f68..abd77ad 100644 --- a/tests/test_cases/test_cluster/test_varray_adapters.py +++ b/tests/test_cases/test_cluster/test_varray_adapters.py @@ -4,7 +4,6 @@ from typing import TYPE_CHECKING, List from unittest.mock import patch -import httpx import pytest from deker.arrays import VArray @@ -12,9 +11,7 @@ from pytest_httpx import HTTPXMock from deker_server_adapters.array_adapter import ServerArrayAdapter -from deker_server_adapters.consts import LAST_MODIFIED_HEADER from deker_server_adapters.errors import FilteringByIdInClusterIsForbidden -from deker_server_adapters.models import Status from deker_server_adapters.utils.hashing import get_hash_key from deker_server_adapters.varray_adapter import ServerVarrayAdapter @@ -83,33 +80,3 @@ def test_read_meta_success(varray: VArray, httpx_mock: HTTPXMock, server_varray_ def test_filter_by_id_is_not_allowed(varray_collection_with_primary_attributes): with pytest.raises(FilteringByIdInClusterIsForbidden): varray_collection_with_primary_attributes.filter({"id": "foo"}).last() - - -def test_hash_updated(httpx_mock: HTTPXMock, server_varray_adapter: ServerArrayAdapter, mocked_ping, varray): - class RequestCounter: - def __init__(self): - self.count = 0 - - request_counter = RequestCounter() - - def limited_mock_response(request, extensions, request_counter, mocked_ping, data) -> httpx.Response: - url_to_mock = re.compile(r".*/v1/collection/.*/varray/by-id/.*") - - if url_to_mock.search(str(request.url)): - request_counter.count += 1 - - if request_counter.count < 2: - return httpx.Response(409, json=mocked_ping, headers={LAST_MODIFIED_HEADER: "new-hash"}) - else: - return httpx.Response(200, json=data) - - elif "status" in str(request.url): - return httpx.Response(text=Status.UNMOVED.value) - - return httpx.Response(404, json={"error": "Not found"}) - - httpx_mock.add_callback( - lambda request: limited_mock_response(request, None, request_counter, mocked_ping, varray.as_dict) - ) - server_varray_adapter.read_meta(varray) - assert server_varray_adapter.client.headers[LAST_MODIFIED_HEADER] == "new-hash"