Skip to content

Commit

Permalink
Revert "merge last-modified header"
Browse files Browse the repository at this point in the history
This reverts commit 8e0e173.
  • Loading branch information
imatiushin committed Sep 4, 2024
1 parent 32ea53e commit c3c0934
Show file tree
Hide file tree
Showing 14 changed files with 50 additions and 192 deletions.
31 changes: 15 additions & 16 deletions deker_server_adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ def hash_ring(self) -> HashRing:
"""Return HashRing instance."""
hash_ring = self.ctx.extra.get("hash_ring")
if not hash_ring:
msg = "Attempt to use cluster logic in single server mode"
raise AttributeError(msg)
raise AttributeError("Attempt to use cluster logic in single server mode")
return hash_ring # type: ignore[attr-defined]

@property
Expand Down Expand Up @@ -174,7 +173,7 @@ def read_meta(self, array: "BaseArray") -> ArrayMeta:

# Only Varray can be located on different nodes yet.
if self.type == ArrayType.varray and self.client.cluster_mode:
response = make_request(url=url, nodes=self.nodes_urls, client=self.client, retry_on_hash_failure=True)
response = make_request(url=url, nodes=self.nodes_urls, client=self.client)
elif self.client.cluster_mode:
response = request_in_cluster(url, array, self.ctx, True)
else:
Expand Down Expand Up @@ -283,7 +282,7 @@ def update(self, array: "BaseArray", bounds: Slice, data: Numeric) -> None:
if hasattr(data, "tobytes"):
request_kwargs["data"] = data.tobytes()
else:
request_kwargs["data"] = bytes(data) # type: ignore
request_kwargs["data"] = bytes(data)
# We write (v)array through the node it belongs in cluster
try:
if self.client.cluster_mode:
Expand Down Expand Up @@ -376,12 +375,12 @@ def get_by_primary_attributes(
return None
return self.__create_array_from_response(
response,
{
"type": self.type,
"collection": collection,
"array_adapter": array_adapter,
"varray_adapter": varray_adapter,
},
dict(
type=self.type,
collection=collection,
array_adapter=array_adapter,
varray_adapter=varray_adapter,
),
)

def get_by_id(
Expand Down Expand Up @@ -428,12 +427,12 @@ def get_by_id(

return self.__create_array_from_response(
response,
{
"type": self.type,
"collection": collection,
"array_adapter": array_adapter,
"varray_adapter": varray_adapter,
},
dict(
type=self.type,
collection=collection,
array_adapter=array_adapter,
varray_adapter=varray_adapter,
),
)

def __iter__(self) -> Generator["ArrayMeta", None, None]:
Expand Down
17 changes: 3 additions & 14 deletions deker_server_adapters/cluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
from deker.ctx import CTX
from deker.uri import Uri

from deker_server_adapters.consts import LAST_MODIFIED_HEADER, STATUS_OK
from deker_server_adapters.consts import STATUS_OK
from deker_server_adapters.errors import DekerClusterError, DekerServerError
from deker_server_adapters.hash_ring import HashRing
from deker_server_adapters.utils.requests import make_request
from deker_server_adapters.utils.version import get_api_version


CLUSTER_MODE = "cluster"


Expand Down Expand Up @@ -48,7 +47,7 @@ class ClusterConfig:
leader: Node
current: List[Node]
target: Optional[List[Node]] = None # Only appears when cluster in rebalancing mode
cluster_status: Optional[str] = ""

__hash_ring: HashRing = field(init=False)
__hash_ring_target: HashRing = field(init=False)

Expand Down Expand Up @@ -76,13 +75,7 @@ def process_nodes(nodes: List[dict]) -> List[Node]:
current = process_nodes(cluster_config_dict["current"])
target = process_nodes(cluster_config_dict["target"]) if "target" in cluster_config_dict else None

return cls(
mode=cluster_config_dict["mode"],
leader=leader,
current=current,
target=target,
cluster_status=cluster_config_dict.get("cluster_status"),
)
return cls(mode=cluster_config_dict["mode"], leader=leader, current=current, target=target)


def request_config(ctx: CTX) -> dict: # type: ignore[return-value]
Expand All @@ -106,11 +99,7 @@ def request_config(ctx: CTX) -> dict: # type: ignore[return-value]

try:
config = response.json() # type: ignore[union-attr]
# Set hash of config
httpx_client.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]})
return config
except KeyError:
raise DekerClusterError(response, f"No {LAST_MODIFIED_HEADER} header found in response.")
except JSONDecodeError:
if ctx.uri.servers:
raise DekerClusterError(response, "Server responded with wrong config. Couldn't parse json")
Expand Down
4 changes: 0 additions & 4 deletions deker_server_adapters/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
CONTENT_TOO_LARGE = 413
TOO_MANY_REQUESTS = 429
NON_LEADER_WRITE = 421
CONFLICT_HASH = 409

# Name of the parameter in the server response
COLLECTION_NAME_PARAM = "collection_name"
Expand All @@ -20,9 +19,6 @@
TOO_LARGE_ERROR_MESSAGE = "Requested object is too large, use smaller subset"

EXCEPTION_CLASS_PARAM_NAME = "class"
LAST_MODIFIED_HEADER = "last-modified"
REBALANCING_STATUS = "rebalancing"
NORMAL_STATUS = "normal"


class ArrayType(Enum):
Expand Down
4 changes: 0 additions & 4 deletions deker_server_adapters/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,3 @@ class FilteringByIdInClusterIsForbidden(DekerBaseApplicationError):

class HashRingError(DekerBaseApplicationError):
"""If there is a problem with HashRing."""


class InvalidConfigHash(DekerBaseApplicationError):
"""If config was updated."""
3 changes: 1 addition & 2 deletions deker_server_adapters/hash_ring.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ def get_node(self, string_key: str) -> T: # type: ignore[type-var]
"""
pos = self.get_node_pos(string_key)
if pos is None:
msg = f"Couldn't find a position in {self.ring}"
raise HashRingError(msg)
raise HashRingError(f"Couldn't find a position in {self.ring}")
return self.ring[self._sorted_keys[pos]]

def get_node_pos(self, string_key: str) -> Optional[int]:
Expand Down
22 changes: 3 additions & 19 deletions deker_server_adapters/httpx_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,14 @@

from deker_server_adapters.cluster_config import apply_config
from deker_server_adapters.consts import (
CONFLICT_HASH,
CONTENT_TOO_LARGE,
EXCEPTION_CLASS_PARAM_NAME,
LAST_MODIFIED_HEADER,
NON_LEADER_WRITE,
RATE_ERROR_MESSAGE,
TOO_LARGE_ERROR_MESSAGE,
TOO_MANY_REQUESTS,
)
from deker_server_adapters.errors import (
DekerBaseRateLimitError,
DekerDataPointsLimitError,
DekerRateLimitError,
InvalidConfigHash,
)
from deker_server_adapters.errors import DekerBaseRateLimitError, DekerDataPointsLimitError, DekerRateLimitError


def rate_limit_err(response: Response, message: str, class_: Type[DekerBaseRateLimitError]) -> None:
Expand Down Expand Up @@ -50,23 +43,13 @@ class HttpxClient(Client):
ctx: CTX
cluster_mode: bool = False

def request(self, *args: Any, retry_on_hash_failure: bool = False, **kwargs: Any) -> Response:
def request(self, *args: Any, **kwargs: Any) -> Response:
"""Override httpx method to handle rate errors.
:param args: arguments to request
:param retry_on_hash_failure: If we should retry on invalid hash
:param kwargs: keyword arguments to request
"""
response = super().request(*args, **kwargs)
if response.status_code == CONFLICT_HASH:
apply_config(response.json(), self.ctx)
if LAST_MODIFIED_HEADER in response.headers:
self.headers.update({LAST_MODIFIED_HEADER: response.headers[LAST_MODIFIED_HEADER]})
if retry_on_hash_failure:
response = super().request(*args, **kwargs)
else:
raise InvalidConfigHash

if response.status_code == TOO_MANY_REQUESTS:
rate_limit_err(response=response, message=RATE_ERROR_MESSAGE, class_=DekerRateLimitError)
elif (
Expand All @@ -77,6 +60,7 @@ def request(self, *args: Any, retry_on_hash_failure: bool = False, **kwargs: Any

if response.status_code == NON_LEADER_WRITE:
apply_config(response.json(), self.ctx)

return super().request(*args, **kwargs)

return response
49 changes: 16 additions & 33 deletions deker_server_adapters/utils/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

from deker.ABC import BaseArray
from deker.ctx import CTX
from httpx import Response
from httpx import Client, Response

from deker_server_adapters.consts import REBALANCING_STATUS, STATUS_OK
from deker_server_adapters.errors import DekerServerError, InvalidConfigHash
from deker_server_adapters.consts import STATUS_OK
from deker_server_adapters.errors import DekerServerError
from deker_server_adapters.models import Status
from deker_server_adapters.utils.hashing import get_hash_key, get_id_and_primary_attributes
from deker_server_adapters.utils.version import get_api_version
Expand All @@ -22,17 +22,11 @@


def _request(
url: str,
node: str,
client: "HttpxClient",
method: str = "GET",
request_kwargs: Optional[Dict] = None,
retry_on_hash_failure: bool = False,
url: str, node: str, client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None
) -> Optional[Response]:
"""Internal request func - Make GET request on given node.
:param url: What we request
:param retry_on_hash_failure: If we should retry on invalid hash
:param node: Node for requesting
:param client: Httpx Client
:param method: Http method
Expand All @@ -42,9 +36,7 @@ def _request(
request_url = f"{node.rstrip('/')}/{url.lstrip('/')}"
request_kwargs = request_kwargs or {}
try:
response = client.request(method, request_url, **request_kwargs, retry_on_hash_failure=retry_on_hash_failure)
except InvalidConfigHash:
raise
response = client.request(method, request_url, **request_kwargs)
except Exception as e:
traceback.print_exc(-1)
logger.exception(f"Coudn't get response from {node}", exc_info=e) # noqa
Expand All @@ -53,16 +45,10 @@ def _request(


def make_request(
url: str,
nodes: Union[List, Tuple, Set],
client: "HttpxClient",
method: str = "GET",
request_kwargs: Optional[Dict] = None,
retry_on_hash_failure: bool = False,
url: str, nodes: Union[List, Tuple, Set], client: Client, method: str = "GET", request_kwargs: Optional[Dict] = None
) -> Optional[Response]:
"""Make GET request on random node, while response is not received.
:param retry_on_hash_failure: If we should retry on invalid hash
:param method: HTTP Method
:param url: What we request
:param request_kwargs: Kwargs for request
Expand All @@ -73,13 +59,13 @@ def make_request(
nodes = list(nodes)
if len(nodes) == 1:
node = nodes.pop(0)
response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure)
response = _request(url, node, client, method, request_kwargs)
else:
while nodes and (response is None or response.status_code != STATUS_OK):
index = randint(0, len(nodes) - 1)
node = nodes.pop(index)

response = _request(url, node, client, method, request_kwargs, retry_on_hash_failure=retry_on_hash_failure)
response = _request(url, node, client, method, request_kwargs)

return response

Expand Down Expand Up @@ -120,24 +106,21 @@ def request_in_cluster(
:param method: Http method
:param request_kwargs: Extra data for request
"""
from deker_server_adapters.cluster_config import request_and_apply_config

# Retrieve fresh config
request_and_apply_config(ctx)
client = ctx.extra["httpx_client"]

node = ctx.extra["hash_ring"].get_node(get_hash_key(array))

# Check status of file
def _check_status() -> None:
if should_check_status and ctx.extra["cluster_config"].cluster_status == REBALANCING_STATUS:
status = check_status(ctx, array)
if status == Status.MOVED:
ctx.extra["hash_ring_target"].get_node(get_hash_key(array))
if should_check_status:
status = check_status(ctx, array)
if status == Status.MOVED:
node = ctx.extra["hash_ring_target"].get_node(get_hash_key(array))

_check_status()
# Acquire locks
# TODO: Lock acquiring logic if needed
# Make request
try:
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
except InvalidConfigHash:
_check_status()
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
return make_request(url, [node.url.raw_url], client, method=method, request_kwargs=request_kwargs)
14 changes: 3 additions & 11 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from deker_server_adapters.array_adapter import ServerArrayAdapter
from deker_server_adapters.cluster_config import apply_config
from deker_server_adapters.collection_adapter import ServerCollectionAdapter
from deker_server_adapters.consts import LAST_MODIFIED_HEADER, NORMAL_STATUS, REBALANCING_STATUS
from deker_server_adapters.factory import AdaptersFactory
from deker_server_adapters.hash_ring import HashRing
from deker_server_adapters.httpx_client import HttpxClient
Expand All @@ -38,11 +37,6 @@ def mode(request) -> str:
return request.param


@pytest.fixture(params=[REBALANCING_STATUS, NORMAL_STATUS])
def status(request) -> str:
return request.param


@pytest.fixture()
def base_uri(mode, base_cluster_uri):
if mode == SINGLE_MODE:
Expand Down Expand Up @@ -82,9 +76,7 @@ def ctx(mode, base_uri: Uri, nodes: List[dict], mocked_ping: dict) -> CTX:
@pytest.fixture()
def mock_ping(mode: str, httpx_mock: HTTPXMock, mocked_ping: Dict):
if mode == CLUSTER_MODE:
httpx_mock.add_response(
method="GET", url=re.compile(r".*\/v1\/ping.*"), json=mocked_ping, headers={LAST_MODIFIED_HEADER: "foo"}
)
httpx_mock.add_response(method="GET", url=re.compile(r".*\/v1\/ping.*"), json=mocked_ping)
else:
httpx_mock.add_response(method="GET", url=re.compile(r".*\/v1\/ping.*"), status_code=200)

Expand Down Expand Up @@ -241,6 +233,6 @@ def array_url_path(array: Array) -> str:


@pytest.fixture()
def mock_status(mode: str, request: pytest.FixtureRequest, status: str):
if mode == CLUSTER_MODE and status == REBALANCING_STATUS:
def mock_status(mode: str, request: pytest.FixtureRequest):
if mode == CLUSTER_MODE:
request.getfixturevalue("mocked_filestatus_check_unmoved")
Loading

0 comments on commit c3c0934

Please sign in to comment.