Skip to content

Commit

Permalink
merge last-modified header
Browse files Browse the repository at this point in the history
  • Loading branch information
imatiushin committed Sep 4, 2024
1 parent 7019211 commit 8e0e173
Show file tree
Hide file tree
Showing 14 changed files with 192 additions and 50 deletions.
31 changes: 16 additions & 15 deletions deker_server_adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def hash_ring(self) -> HashRing:
"""Return HashRing instance."""
hash_ring = self.ctx.extra.get("hash_ring")
if not hash_ring:
raise AttributeError("Attempt to use cluster logic in single server mode")
msg = "Attempt to use cluster logic in single server mode"
raise AttributeError(msg)
return hash_ring # type: ignore[attr-defined]

@property
Expand Down Expand Up @@ -173,7 +174,7 @@ def read_meta(self, array: "BaseArray") -> ArrayMeta:

# Only Varray can be located on different nodes yet.
if self.type == ArrayType.varray and self.client.cluster_mode:
response = make_request(url=url, nodes=self.nodes_urls, client=self.client)
response = make_request(url=url, nodes=self.nodes_urls, client=self.client, retry_on_hash_failure=True)
elif self.client.cluster_mode:
response = request_in_cluster(url, array, self.ctx, True)
else:
Expand Down Expand Up @@ -282,7 +283,7 @@ def update(self, array: "BaseArray", bounds: Slice, data: Numeric) -> None:
if hasattr(data, "tobytes"):
request_kwargs["data"] = data.tobytes()
else:
request_kwargs["data"] = bytes(data)
request_kwargs["data"] = bytes(data) # type: ignore
# We write (v)array through the node it belongs in cluster
try:
if self.client.cluster_mode:
Expand Down Expand Up @@ -375,12 +376,12 @@ def get_by_primary_attributes(
return None
return self.__create_array_from_response(
response,
dict(
type=self.type,
collection=collection,
array_adapter=array_adapter,
varray_adapter=varray_adapter,
),
{
"type": self.type,
"collection": collection,
"array_adapter": array_adapter,
"varray_adapter": varray_adapter,
},
)

def get_by_id(
Expand Down Expand Up @@ -427,12 +428,12 @@ def get_by_id(

return self.__create_array_from_response(
response,
dict(
type=self.type,
collection=collection,
array_adapter=array_adapter,
varray_adapter=varray_adapter,
),
{
"type": self.type,
"collection": collection,
"array_adapter": array_adapter,
"varray_adapter": varray_adapter,
},
)

def __iter__(self) -> Generator["ArrayMeta", None, None]:
Expand Down
17 changes: 14 additions & 3 deletions deker_server_adapters/cluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from deker.ctx import CTX
from deker.uri import Uri

from deker_server_adapters.consts import STATUS_OK
from deker_server_adapters.consts import LAST_MODIFIED_HEADER, STATUS_OK
from deker_server_adapters.errors import DekerClusterError, DekerServerError
from deker_server_adapters.hash_ring import HashRing
from deker_server_adapters.utils.requests import make_request
from deker_server_adapters.utils.version import get_api_version


CLUSTER_MODE = "cluster"


Expand Down Expand Up @@ -47,7 +48,7 @@ class ClusterConfig:
leader: Node
current: List[Node]
target: Optional[List[Node]] = None # Only appears when cluster in rebalancing mode

cluster_status: Optional[str] = ""
__hash_ring: HashRing = field(init=False)
__hash_ring_target: HashRing = field(init=False)

Expand Down Expand Up @@ -75,7 +76,13 @@ def process_nodes(nodes: List[dict]) -> List[Node]:
current = process_nodes(cluster_config_dict["current"])
target = process_nodes(cluster_config_dict["target"]) if "target" in cluster_config_dict else None

return cls(mode=cluster_config_dict["mode"], leader=leader, current=current, target=target)
return cls(
mode=cluster_config_dict["mode"],
leader=leader,
current=current,
target=target,
cluster_status=cluster_config_dict.get("cluster_status"),
)


def request_config(ctx: CTX) -> dict: # type: ignore[return-value]
Expand All @@ -99,7 +106,11 @@ def request_config(ctx: CTX) -> dict: # type: ignore[return-value]

try:
config = response.json() # type: ignore[union-attr]
# Set hash of config
httpx_client.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]})
return config
except KeyError:
raise DekerClusterError(response, f"No {LAST_MODIFIED_HEADER} header found in response.")
except JSONDecodeError:
if ctx.uri.servers:
raise DekerClusterError(response, "Server responded with wrong config. Couldn't parse json")
Expand Down
4 changes: 4 additions & 0 deletions deker_server_adapters/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
CONTENT_TOO_LARGE = 413
TOO_MANY_REQUESTS = 429
NON_LEADER_WRITE = 421
CONFLICT_HASH = 409

# Name of the parameter in the server response
COLLECTION_NAME_PARAM = "collection_name"
Expand All @@ -19,6 +20,9 @@
TOO_LARGE_ERROR_MESSAGE = "Requested object is too large, use smaller subset"

EXCEPTION_CLASS_PARAM_NAME = "class"
LAST_MODIFIED_HEADER = "last-modified"
REBALANCING_STATUS = "rebalancing"
NORMAL_STATUS = "normal"


class ArrayType(Enum):
Expand Down
4 changes: 4 additions & 0 deletions deker_server_adapters/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,7 @@ class FilteringByIdInClusterIsForbidden(DekerBaseApplicationError):

class HashRingError(DekerBaseApplicationError):
"""If there is a problem with HashRing."""


class InvalidConfigHash(DekerBaseApplicationError):
"""If config was updated."""
3 changes: 2 additions & 1 deletion deker_server_adapters/hash_ring.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def get_node(self, string_key: str) -> T: # type: ignore[type-var]
"""
pos = self.get_node_pos(string_key)
if pos is None:
raise HashRingError(f"Couldn't find a position in {self.ring}")
msg = f"Couldn't find a position in {self.ring}"
raise HashRingError(msg)
return self.ring[self._sorted_keys[pos]]

def get_node_pos(self, string_key: str) -> Optional[int]:
Expand Down
22 changes: 19 additions & 3 deletions deker_server_adapters/httpx_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@

from deker_server_adapters.cluster_config import apply_config
from deker_server_adapters.consts import (
CONFLICT_HASH,
CONTENT_TOO_LARGE,
EXCEPTION_CLASS_PARAM_NAME,
LAST_MODIFIED_HEADER,
NON_LEADER_WRITE,
RATE_ERROR_MESSAGE,
TOO_LARGE_ERROR_MESSAGE,
TOO_MANY_REQUESTS,
)
from deker_server_adapters.errors import DekerBaseRateLimitError, DekerDataPointsLimitError, DekerRateLimitError
from deker_server_adapters.errors import (
DekerBaseRateLimitError,
DekerDataPointsLimitError,
DekerRateLimitError,
InvalidConfigHash,
)


def rate_limit_err(response: Response, message: str, class_: Type[DekerBaseRateLimitError]) -> None:
Expand Down Expand Up @@ -43,13 +50,23 @@ class HttpxClient(Client):
ctx: CTX
cluster_mode: bool = False

def request(self, *args: Any, **kwargs: Any) -> Response:
def request(self, *args: Any, retry_on_hash_failure: bool = False, **kwargs: Any) -> Response:
"""Override httpx method to handle rate errors.
:param args: arguments to request
:param retry_on_hash_failure: If we should retry on invalid hash
:param kwargs: keyword arguments to request
"""
response = super().request(*args, **kwargs)
if response.status_code == CONFLICT_HASH:
apply_config(response.json(), self.ctx)
if LAST_MODIFIED_HEADER in response.headers:
self.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]})
if retry_on_hash_failure:
response = super().request(*args, **kwargs)
else:
raise InvalidConfigHash

if response.status_code == TOO_MANY_REQUESTS:
rate_limit_err(response=response, message=RATE_ERROR_MESSAGE, class_=DekerRateLimitError)
elif (
Expand All @@ -60,7 +77,6 @@ def request(self, *args: Any, **kwargs: Any) -> Response:

if response.status_code == NON_LEADER_WRITE:
apply_config(response.json(), self.ctx)

return super().request(*args, **kwargs)

return response
49 changes: 33 additions & 16 deletions deker_server_adapters/utils/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

from deker.ABC import BaseArray
from deker.ctx import CTX
from httpx import Client, Response
from httpx import Response

from deker_server_adapters.consts import STATUS_OK
from deker_server_adapters.errors import DekerServerError
from deker_server_adapters.consts import REBALANCING_STATUS, STATUS_OK
from deker_server_adapters.errors import DekerServerError, InvalidConfigHash
from deker_server_adapters.models import Status
from deker_server_adapters.utils.hashing import get_hash_key, get_id_and_primary_attributes
from deker_server_adapters.utils.version import get_api_version
Expand All @@ -22,11 +22,17 @@


def _request(
url: str, node: str, client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None
url: str,
node: str,
client: "HttpxClient",
method: str = "GET",
request_kwargs: Optional[Dict] = None,
retry_on_hash_failure: bool = False,
) -> Optional[Response]:
"""Internal request func - Make GET request on given node.
:param url: What we request
:param retry_on_hash_failure: If we should retry on invalid hash
:param node: Node for requesting
:param client: Httpx Client
:param method: Http method
Expand All @@ -36,7 +42,9 @@ def _request(
request_url = f"{node.rstrip('/')}/{url.lstrip('/')}"
request_kwargs = request_kwargs or {}
try:
response = client.request(method, request_url, **request_kwargs)
response = client.request(method, request_url, **request_kwargs, retry_on_hash_failure=retry_on_hash_failure)
except InvalidConfigHash:
raise
except Exception as e:
traceback.print_exc(-1)
logger.exception(f"Coudn't get response from {node}", exc_info=e) # noqa
Expand All @@ -45,10 +53,16 @@ def _request(


def make_request(
url: str, nodes: Union[List, Tuple, Set], client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None
url: str,
nodes: Union[List, Tuple, Set],
client: "HttpxClient",
method: str = "GET",
request_kwargs: Optional[Dict] = None,
retry_on_hash_failure: bool = False,
) -> Optional[Response]:
"""Make GET request on random node, while response is not received.
:param retry_on_hash_failure: If we should retry on invalid hash
:param method: HTTP Method
:param url: What we request
:param request_kwargs: Kwargs for request
Expand All @@ -59,13 +73,13 @@ def make_request(
nodes = list(nodes)
if len(nodes) == 1:
node = nodes.pop(0)
response = _request(url, node, client, method, request_kwargs)
response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure)
else:
while nodes and (response is None or response.status_code != STATUS_OK):
index = randint(0, len(nodes) - 1)
node = nodes.pop(index)

response = _request(url, node, client, method, request_kwargs)
response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure)

return response

Expand Down Expand Up @@ -106,21 +120,24 @@ def request_in_cluster(
:param method: Http method
:param request_kwargs: Extra data for request
"""
from deker_server_adapters.cluster_config import request_and_apply_config

# Retrieve fresh config
request_and_apply_config(ctx)
client = ctx.extra["httpx_client"]

node = ctx.extra["hash_ring"].get_node(get_hash_key(array))

# Check status of file
if should_check_status:
status = check_status(ctx, array)
if status == Status.MOVED:
node = ctx.extra["hash_ring_target"].get_node(get_hash_key(array))
def _check_status() -> None:
if should_check_status and ctx.extra["cluster_config"].cluster_status == REBALANCING_STATUS:
status = check_status(ctx, array)
if status == Status.MOVED:
ctx.extra["hash_ring_target"].get_node(get_hash_key(array))

_check_status()
# Acquire locks
# TODO: Lock acquiring logic if needed
# Make request
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
try:
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
except InvalidConfigHash:
_check_status()
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
14 changes: 11 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from deker_server_adapters.array_adapter import ServerArrayAdapter
from deker_server_adapters.cluster_config import apply_config
from deker_server_adapters.collection_adapter import ServerCollectionAdapter
from deker_server_adapters.consts import LAST_MODIFIED_HEADER, NORMAL_STATUS, REBALANCING_STATUS
from deker_server_adapters.factory import AdaptersFactory
from deker_server_adapters.hash_ring import HashRing
from deker_server_adapters.httpx_client import HttpxClient
Expand All @@ -37,6 +38,11 @@ def mode(request) -> str:
return request.param


@pytest.fixture(params=[REBALANCING_STATUS, NORMAL_STATUS])
def status(request) -> str:
return request.param


@pytest.fixture()
def base_uri(mode, base_cluster_uri):
if mode == SINGLE_MODE:
Expand Down Expand Up @@ -76,7 +82,9 @@ def ctx(mode, base_uri: Uri, nodes: List[dict], mocked_ping: dict) -> CTX:
@pytest.fixture()
def mock_ping(mode: str, httpx_mock: HTTPXMock, mocked_ping: Dict):
if mode == CLUSTER_MODE:
httpx_mock.add_response(method="GET", url=re.compile(r".*\/v1\/ping.*"), json=mocked_ping)
httpx_mock.add_response(
method="GET", url=re.compile(r".*\/v1\/ping.*"), json=mocked_ping, headers={LAST_MODIFIED_HEADER: "foo"}
)
else:
httpx_mock.add_response(method="GET", url=re.compile(r".*\/v1\/ping.*"), status_code=200)

Expand Down Expand Up @@ -233,6 +241,6 @@ def array_url_path(array: Array) -> str:


@pytest.fixture()
def mock_status(mode: str, request: pytest.FixtureRequest):
if mode == CLUSTER_MODE:
def mock_status(mode: str, request: pytest.FixtureRequest, status: str):
if mode == CLUSTER_MODE and status == REBALANCING_STATUS:
request.getfixturevalue("mocked_filestatus_check_unmoved")
Loading

0 comments on commit 8e0e173

Please sign in to comment.