diff --git a/deker_server_adapters/base.py b/deker_server_adapters/base.py index edb8990..7fc03e8 100644 --- a/deker_server_adapters/base.py +++ b/deker_server_adapters/base.py @@ -63,7 +63,8 @@ def hash_ring(self) -> HashRing: """Return HashRing instance.""" hash_ring = self.ctx.extra.get("hash_ring") if not hash_ring: - raise AttributeError("Attempt to use cluster logic in single server mode") + msg = "Attempt to use cluster logic in single server mode" + raise AttributeError(msg) return hash_ring # type: ignore[attr-defined] @property @@ -173,7 +174,7 @@ def read_meta(self, array: "BaseArray") -> ArrayMeta: # Only Varray can be located on different nodes yet. if self.type == ArrayType.varray and self.client.cluster_mode: - response = make_request(url=url, nodes=self.nodes_urls, client=self.client) + response = make_request(url=url, nodes=self.nodes_urls, client=self.client, retry_on_hash_failure=True) elif self.client.cluster_mode: response = request_in_cluster(url, array, self.ctx, True) else: @@ -282,7 +283,7 @@ def update(self, array: "BaseArray", bounds: Slice, data: Numeric) -> None: if hasattr(data, "tobytes"): request_kwargs["data"] = data.tobytes() else: - request_kwargs["data"] = bytes(data) + request_kwargs["data"] = bytes(data) # type: ignore # We write (v)array through the node it belongs in cluster try: if self.client.cluster_mode: @@ -375,12 +376,12 @@ def get_by_primary_attributes( return None return self.__create_array_from_response( response, - dict( - type=self.type, - collection=collection, - array_adapter=array_adapter, - varray_adapter=varray_adapter, - ), + { + "type": self.type, + "collection": collection, + "array_adapter": array_adapter, + "varray_adapter": varray_adapter, + }, ) def get_by_id( @@ -427,12 +428,12 @@ def get_by_id( return self.__create_array_from_response( response, - dict( - type=self.type, - collection=collection, - array_adapter=array_adapter, - varray_adapter=varray_adapter, - ), + { + "type": self.type, + "collection": collection, + "array_adapter": array_adapter, + "varray_adapter": varray_adapter, + }, ) def __iter__(self) -> Generator["ArrayMeta", None, None]: diff --git a/deker_server_adapters/cluster_config.py b/deker_server_adapters/cluster_config.py index 378571b..ccbdf60 100644 --- a/deker_server_adapters/cluster_config.py +++ b/deker_server_adapters/cluster_config.py @@ -6,12 +6,13 @@ from deker.ctx import CTX from deker.uri import Uri -from deker_server_adapters.consts import STATUS_OK +from deker_server_adapters.consts import LAST_MODIFIED_HEADER, STATUS_OK from deker_server_adapters.errors import DekerClusterError, DekerServerError from deker_server_adapters.hash_ring import HashRing from deker_server_adapters.utils.requests import make_request from deker_server_adapters.utils.version import get_api_version + CLUSTER_MODE = "cluster" @@ -47,7 +48,7 @@ class ClusterConfig: leader: Node current: List[Node] target: Optional[List[Node]] = None # Only appears when cluster in rebalancing mode - + cluster_status: Optional[str] = "" __hash_ring: HashRing = field(init=False) __hash_ring_target: HashRing = field(init=False) @@ -75,7 +76,13 @@ def process_nodes(nodes: List[dict]) -> List[Node]: current = process_nodes(cluster_config_dict["current"]) target = process_nodes(cluster_config_dict["target"]) if "target" in cluster_config_dict else None - return cls(mode=cluster_config_dict["mode"], leader=leader, current=current, target=target) + return cls( + mode=cluster_config_dict["mode"], + leader=leader, + current=current, + target=target, + cluster_status=cluster_config_dict.get("cluster_status"), + ) def request_config(ctx: CTX) -> dict: # type: ignore[return-value] @@ -99,7 +106,11 @@ def request_config(ctx: CTX) -> dict: # type: ignore[return-value] try: config = response.json() # type: ignore[union-attr] + # Set hash of config + httpx_client.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]}) return config + except KeyError: + raise DekerClusterError(response, f"No {LAST_MODIFIED_HEADER} header found in response.") except JSONDecodeError: if ctx.uri.servers: raise DekerClusterError(response, "Server responded with wrong config. Couldn't parse json") diff --git a/deker_server_adapters/consts.py b/deker_server_adapters/consts.py index 62a4029..73ed1c4 100644 --- a/deker_server_adapters/consts.py +++ b/deker_server_adapters/consts.py @@ -11,6 +11,7 @@ CONTENT_TOO_LARGE = 413 TOO_MANY_REQUESTS = 429 NON_LEADER_WRITE = 421 +CONFLICT_HASH = 409 # Name of the parameter in the server response COLLECTION_NAME_PARAM = "collection_name" @@ -19,6 +20,9 @@ TOO_LARGE_ERROR_MESSAGE = "Requested object is too large, use smaller subset" EXCEPTION_CLASS_PARAM_NAME = "class" +LAST_MODIFIED_HEADER = "last-modified" +REBALANCING_STATUS = "rebalancing" +NORMAL_STATUS = "normal" class ArrayType(Enum): diff --git a/deker_server_adapters/errors.py b/deker_server_adapters/errors.py index 2b182b0..9269ac8 100644 --- a/deker_server_adapters/errors.py +++ b/deker_server_adapters/errors.py @@ -97,3 +97,7 @@ class FilteringByIdInClusterIsForbidden(DekerBaseApplicationError): class HashRingError(DekerBaseApplicationError): """If there is a problem with HashRing.""" + + +class InvalidConfigHash(DekerBaseApplicationError): + """If config was updated.""" diff --git a/deker_server_adapters/hash_ring.py b/deker_server_adapters/hash_ring.py index e460d36..d0b7796 100644 --- a/deker_server_adapters/hash_ring.py +++ b/deker_server_adapters/hash_ring.py @@ -73,7 +73,8 @@ def get_node(self, string_key: str) -> T: # type: ignore[type-var] """ pos = self.get_node_pos(string_key) if pos is None: - raise HashRingError(f"Couldn't find a position in {self.ring}") + msg = f"Couldn't find a position in {self.ring}" + raise HashRingError(msg) return self.ring[self._sorted_keys[pos]] def get_node_pos(self, string_key: str) -> Optional[int]: diff --git a/deker_server_adapters/httpx_client.py b/deker_server_adapters/httpx_client.py index a608165..bc9b483 100644 --- a/deker_server_adapters/httpx_client.py +++ b/deker_server_adapters/httpx_client.py @@ -6,14 +6,21 @@ from deker_server_adapters.cluster_config import apply_config from deker_server_adapters.consts import ( + CONFLICT_HASH, CONTENT_TOO_LARGE, EXCEPTION_CLASS_PARAM_NAME, + LAST_MODIFIED_HEADER, NON_LEADER_WRITE, RATE_ERROR_MESSAGE, TOO_LARGE_ERROR_MESSAGE, TOO_MANY_REQUESTS, ) -from deker_server_adapters.errors import DekerBaseRateLimitError, DekerDataPointsLimitError, DekerRateLimitError +from deker_server_adapters.errors import ( + DekerBaseRateLimitError, + DekerDataPointsLimitError, + DekerRateLimitError, + InvalidConfigHash, +) def rate_limit_err(response: Response, message: str, class_: Type[DekerBaseRateLimitError]) -> None: @@ -43,13 +50,23 @@ class HttpxClient(Client): ctx: CTX cluster_mode: bool = False - def request(self, *args: Any, **kwargs: Any) -> Response: + def request(self, *args: Any, retry_on_hash_failure: bool = False, **kwargs: Any) -> Response: """Override httpx method to handle rate errors. :param args: arguments to request + :param retry_on_hash_failure: If we should retry on invalid hash :param kwargs: keyword arguments to request """ response = super().request(*args, **kwargs) + if response.status_code == CONFLICT_HASH: + apply_config(response.json(), self.ctx) + if LAST_MODIFIED_HEADER in response.headers: + self.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]}) + if retry_on_hash_failure: + response = super().request(*args, **kwargs) + else: + raise InvalidConfigHash + if response.status_code == TOO_MANY_REQUESTS: rate_limit_err(response=response, message=RATE_ERROR_MESSAGE, class_=DekerRateLimitError) elif ( @@ -60,7 +77,6 @@ def request(self, *args: Any, **kwargs: Any) -> Response: if response.status_code == NON_LEADER_WRITE: apply_config(response.json(), self.ctx) - return super().request(*args, **kwargs) return response diff --git a/deker_server_adapters/utils/requests.py b/deker_server_adapters/utils/requests.py index bf8c6f1..2c44086 100644 --- a/deker_server_adapters/utils/requests.py +++ b/deker_server_adapters/utils/requests.py @@ -6,10 +6,10 @@ from deker.ABC import BaseArray from deker.ctx import CTX -from httpx import Client, Response +from httpx import Response -from deker_server_adapters.consts import STATUS_OK -from deker_server_adapters.errors import DekerServerError +from deker_server_adapters.consts import REBALANCING_STATUS, STATUS_OK +from deker_server_adapters.errors import DekerServerError, InvalidConfigHash from deker_server_adapters.models import Status from deker_server_adapters.utils.hashing import get_hash_key, get_id_and_primary_attributes from deker_server_adapters.utils.version import get_api_version @@ -22,11 +22,17 @@ def _request( - url: str, node: str, client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None + url: str, + node: str, + client: "HttpxClient", + method: str = "GET", + request_kwargs: Optional[Dict] = None, + retry_on_hash_failure: bool = False, ) -> Optional[Response]: """Internal request func - Make GET request on given node. :param url: What we request + :param retry_on_hash_failure: If we should retry on invalid hash :param node: Node for requesting :param client: Httpx Client :param method: Http method @@ -36,7 +42,9 @@ def _request( request_url = f"{node.rstrip('/')}/{url.lstrip('/')}" request_kwargs = request_kwargs or {} try: - response = client.request(method, request_url, **request_kwargs) + response = client.request(method, request_url, **request_kwargs, retry_on_hash_failure=retry_on_hash_failure) + except InvalidConfigHash: + raise except Exception as e: traceback.print_exc(-1) logger.exception(f"Coudn't get response from {node}", exc_info=e) # noqa @@ -45,10 +53,16 @@ def _request( def make_request( - url: str, nodes: Union[List, Tuple, Set], client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None + url: str, + nodes: Union[List, Tuple, Set], + client: "HttpxClient", + method: str = "GET", + request_kwargs: Optional[Dict] = None, + retry_on_hash_failure: bool = False, ) -> Optional[Response]: """Make GET request on random node, while response is not received. + :param retry_on_hash_failure: If we should retry on invalid hash :param method: HTTP Method :param url: What we request :param request_kwargs: Kwargs for request @@ -59,13 +73,13 @@ def make_request( nodes = list(nodes) if len(nodes) == 1: node = nodes.pop(0) - response = _request(url, node, client, method, request_kwargs) + response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure) else: while nodes and (response is None or response.status_code != STATUS_OK): index = randint(0, len(nodes) - 1) node = nodes.pop(index) - response = _request(url, node, client, method, request_kwargs) + response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure) return response @@ -106,21 +120,24 @@ def request_in_cluster( :param method: Http method :param request_kwargs: Extra data for request """ - from deker_server_adapters.cluster_config import request_and_apply_config - # Retrieve fresh config - request_and_apply_config(ctx) client = ctx.extra["httpx_client"] node = ctx.extra["hash_ring"].get_node(get_hash_key(array)) # Check status of file - if should_check_status: - status = check_status(ctx, array) - if status == Status.MOVED: - node = ctx.extra["hash_ring_target"].get_node(get_hash_key(array)) + def _check_status() -> None: + if should_check_status and ctx.extra["cluster_config"].cluster_status == REBALANCING_STATUS: + status = check_status(ctx, array) + if status == Status.MOVED: + ctx.extra["hash_ring_target"].get_node(get_hash_key(array)) + _check_status() # Acquire locks # TODO: Lock acquiring logic if needed # Make request - return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs) + try: + return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs) + except InvalidConfigHash: + _check_status() + return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs) diff --git a/tests/conftest.py b/tests/conftest.py index 4821be7..6b9eec6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,6 +18,7 @@ from deker_server_adapters.array_adapter import ServerArrayAdapter from deker_server_adapters.cluster_config import apply_config from deker_server_adapters.collection_adapter import ServerCollectionAdapter +from deker_server_adapters.consts import LAST_MODIFIED_HEADER, NORMAL_STATUS, REBALANCING_STATUS from deker_server_adapters.factory import AdaptersFactory from deker_server_adapters.hash_ring import HashRing from deker_server_adapters.httpx_client import HttpxClient @@ -37,6 +38,11 @@ def mode(request) -> str: return request.param +@pytest.fixture(params=[REBALANCING_STATUS, NORMAL_STATUS]) +def status(request) -> str: + return request.param + + @pytest.fixture() def base_uri(mode, base_cluster_uri): if mode == SINGLE_MODE: @@ -76,7 +82,9 @@ def ctx(mode, base_uri: Uri, nodes: List[dict], mocked_ping: dict) -> CTX: @pytest.fixture() def mock_ping(mode: str, httpx_mock: HTTPXMock, mocked_ping: Dict): if mode == CLUSTER_MODE: - httpx_mock.add_response(method="GET", url=re.compile(r".*\/v1\/ping.*"), json=mocked_ping) + httpx_mock.add_response( + method="GET", url=re.compile(r".*\/v1\/ping.*"), json=mocked_ping, headers={LAST_MODIFIED_HEADER: "foo"} + ) else: httpx_mock.add_response(method="GET", url=re.compile(r".*\/v1\/ping.*"), status_code=200) @@ -233,6 +241,6 @@ def array_url_path(array: Array) -> str: @pytest.fixture() -def mock_status(mode: str, request: pytest.FixtureRequest): - if mode == CLUSTER_MODE: +def mock_status(mode: str, request: pytest.FixtureRequest, status: str): + if mode == CLUSTER_MODE and status == REBALANCING_STATUS: request.getfixturevalue("mocked_filestatus_check_unmoved") diff --git a/tests/plugins/cluster.py b/tests/plugins/cluster.py index 7302a3d..5635078 100644 --- a/tests/plugins/cluster.py +++ b/tests/plugins/cluster.py @@ -7,6 +7,7 @@ from deker.uri import Uri from pytest_httpx import HTTPXMock +from deker_server_adapters.consts import REBALANCING_STATUS from deker_server_adapters.models import Status @@ -18,14 +19,14 @@ def nodes() -> List[Dict]: "host": "localhost", "port": 8000, "protocol": "http", - "storage": "file:///tmp/deker_server" + "storage": "file:///tmp/deker_server", }, { "id": "8381202B-8C95-487A-B9B5-0B5270568040", "host": "localhost", "port": 8012, "protocol": "http", - "storage": "file:///tmp/deker_server" + "storage": "file:///tmp/deker_server", }, ] @@ -48,13 +49,14 @@ def base_cluster_uri(nodes_urls): @pytest.fixture() -def mocked_ping(nodes: List[Dict]) -> Dict: +def mocked_ping(nodes: List[Dict], status: str) -> Dict: return { "mode": "cluster", + "cluster_status": status, "this_id": "8381202B-8C95-487A-B9B5-0B527056804E", "leader_id": "8381202B-8C95-487A-B9B5-0B527056804E", "current": nodes, - "raft": nodes + "raft": nodes, } @@ -64,5 +66,6 @@ def mock_healthcheck(httpx_mock: HTTPXMock, mocked_ping): @pytest.fixture() -def mocked_filestatus_check_unmoved(httpx_mock: HTTPXMock): - httpx_mock.add_response(method="GET", url=re.compile(r".*\/status.*"), text=Status.UNMOVED.value) +def mocked_filestatus_check_unmoved(httpx_mock: HTTPXMock, status: str): + if status == REBALANCING_STATUS: + httpx_mock.add_response(method="GET", url=re.compile(r".*\/status.*"), text=Status.UNMOVED.value) diff --git a/tests/test_cases/test_cluster/conftest.py b/tests/test_cases/test_cluster/conftest.py index 3a54a44..d8accaa 100644 --- a/tests/test_cases/test_cluster/conftest.py +++ b/tests/test_cases/test_cluster/conftest.py @@ -1,7 +1,12 @@ +import re + +import httpx import pytest from tests.conftest import CLUSTER_MODE +from deker_server_adapters.consts import LAST_MODIFIED_HEADER + @pytest.fixture() def mode() -> str: diff --git a/tests/test_cases/test_cluster/test_array_adapter.py b/tests/test_cases/test_cluster/test_array_adapter.py index abce3c6..513be07 100644 --- a/tests/test_cases/test_cluster/test_array_adapter.py +++ b/tests/test_cases/test_cluster/test_array_adapter.py @@ -2,8 +2,8 @@ import re from typing import TYPE_CHECKING, List -from unittest.mock import patch +import httpx import numpy as np import pytest @@ -12,7 +12,9 @@ from pytest_httpx import HTTPXMock from deker_server_adapters.array_adapter import ServerArrayAdapter +from deker_server_adapters.consts import LAST_MODIFIED_HEADER from deker_server_adapters.errors import FilteringByIdInClusterIsForbidden +from deker_server_adapters.models import Status from deker_server_adapters.utils.hashing import get_hash_key @@ -74,3 +76,33 @@ def test_iter_success( def test_filter_by_id_is_not_allowed(collection_with_primary_attributes): with pytest.raises(FilteringByIdInClusterIsForbidden): collection_with_primary_attributes.filter({"id": "foo"}).last() + + +def test_hash_updated(httpx_mock: HTTPXMock, server_array_adapter: ServerArrayAdapter, mocked_ping, array, status: str): + class RequestCounter: + def __init__(self): + self.count = 0 + + request_counter = RequestCounter() + + def limited_mock_response(request, extensions, request_counter, mocked_ping, data) -> httpx.Response: + url_to_mock = re.compile(r".*/v1/collection/.*/array/by-id/.*") + + if url_to_mock.search(str(request.url)): + request_counter.count += 1 + + if request_counter.count < 2: + return httpx.Response(409, json=mocked_ping, headers={LAST_MODIFIED_HEADER: "new-hash"}) + else: + return httpx.Response(200, json=data) + + elif "status" in str(request.url): + return httpx.Response(text=Status.UNMOVED.value, status_code=200) + + return httpx.Response(404, json={"error": "Not found"}) + + httpx_mock.add_callback( + lambda request: limited_mock_response(request, None, request_counter, mocked_ping, array.as_dict) + ) + server_array_adapter.read_meta(array) + assert server_array_adapter.client.headers[LAST_MODIFIED_HEADER] == "new-hash" diff --git a/tests/test_cases/test_cluster/test_factory.py b/tests/test_cases/test_cluster/test_factory.py index 53e2180..2429f7a 100644 --- a/tests/test_cases/test_cluster/test_factory.py +++ b/tests/test_cases/test_cluster/test_factory.py @@ -10,6 +10,7 @@ from deker_local_adapters.storage_adapters.hdf5 import HDF5StorageAdapter from pytest_httpx import HTTPXMock +from deker_server_adapters.consts import LAST_MODIFIED_HEADER from deker_server_adapters.errors import DekerClusterError from deker_server_adapters.factory import AdaptersFactory @@ -91,3 +92,9 @@ def test_factory_get_config_from_single_server(ctx, mock_ping, mocked_ping): assert sorted([asdict(node) for node in adapter.hash_ring.nodes], key=lambda x: x["id"]) == sorted( [node for node in mocked_ping["current"]], key=lambda x: x["id"] ) + + +def test_hash_added_to_headers(ctx: CTX, mock_ping): + uri = Uri.create("http://test:test@localhost/") + factory = AdaptersFactory(ctx, uri) + assert factory.httpx_client.headers[LAST_MODIFIED_HEADER] diff --git a/tests/test_cases/test_cluster/test_httpx_client.py b/tests/test_cases/test_cluster/test_httpx_client.py index e0e0414..21e42b3 100644 --- a/tests/test_cases/test_cluster/test_httpx_client.py +++ b/tests/test_cases/test_cluster/test_httpx_client.py @@ -20,7 +20,7 @@ def test_new_cluster_config_is_applied_after_non_leader_error( "host": "newhost.owm.io", "port": 80, "protocol": "http", - "storage": "file:///tmp/deker_server" + "storage": "file:///tmp/deker_server", } ], "raft": [ @@ -29,7 +29,7 @@ def test_new_cluster_config_is_applied_after_non_leader_error( "host": "newhost.owm.io", "port": 80, "protocol": "http", - "storage": "file:///tmp/deker_server" + "storage": "file:///tmp/deker_server", } ], } diff --git a/tests/test_cases/test_cluster/test_varray_adapters.py b/tests/test_cases/test_cluster/test_varray_adapters.py index abd77ad..b406f68 100644 --- a/tests/test_cases/test_cluster/test_varray_adapters.py +++ b/tests/test_cases/test_cluster/test_varray_adapters.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING, List from unittest.mock import patch +import httpx import pytest from deker.arrays import VArray @@ -11,7 +12,9 @@ from pytest_httpx import HTTPXMock from deker_server_adapters.array_adapter import ServerArrayAdapter +from deker_server_adapters.consts import LAST_MODIFIED_HEADER from deker_server_adapters.errors import FilteringByIdInClusterIsForbidden +from deker_server_adapters.models import Status from deker_server_adapters.utils.hashing import get_hash_key from deker_server_adapters.varray_adapter import ServerVarrayAdapter @@ -80,3 +83,33 @@ def test_read_meta_success(varray: VArray, httpx_mock: HTTPXMock, server_varray_ def test_filter_by_id_is_not_allowed(varray_collection_with_primary_attributes): with pytest.raises(FilteringByIdInClusterIsForbidden): varray_collection_with_primary_attributes.filter({"id": "foo"}).last() + + +def test_hash_updated(httpx_mock: HTTPXMock, server_varray_adapter: ServerArrayAdapter, mocked_ping, varray): + class RequestCounter: + def __init__(self): + self.count = 0 + + request_counter = RequestCounter() + + def limited_mock_response(request, extensions, request_counter, mocked_ping, data) -> httpx.Response: + url_to_mock = re.compile(r".*/v1/collection/.*/varray/by-id/.*") + + if url_to_mock.search(str(request.url)): + request_counter.count += 1 + + if request_counter.count < 2: + return httpx.Response(409, json=mocked_ping, headers={LAST_MODIFIED_HEADER: "new-hash"}) + else: + return httpx.Response(200, json=data) + + elif "status" in str(request.url): + return httpx.Response(text=Status.UNMOVED.value) + + return httpx.Response(404, json={"error": "Not found"}) + + httpx_mock.add_callback( + lambda request: limited_mock_response(request, None, request_counter, mocked_ping, varray.as_dict) + ) + server_varray_adapter.read_meta(varray) + assert server_varray_adapter.client.headers[LAST_MODIFIED_HEADER] == "new-hash"