Skip to content

Commit

Permalink
Merge pull request #36 from openweathermap/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
matveyvarg authored Jul 3, 2024
2 parents c265cdd + 4fb9c38 commit 1f46c33
Show file tree
Hide file tree
Showing 28 changed files with 738 additions and 570 deletions.
246 changes: 117 additions & 129 deletions deker_server_adapters/base.py

Large diffs are not rendered by default.

155 changes: 155 additions & 0 deletions deker_server_adapters/cluster_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from dataclasses import dataclass, field
from functools import cached_property
from json import JSONDecodeError
from typing import List, Optional

from deker.ctx import CTX
from deker.uri import Uri

from deker_server_adapters.consts import STATUS_OK
from deker_server_adapters.errors import DekerClusterError, DekerServerError
from deker_server_adapters.hash_ring import HashRing
from deker_server_adapters.utils.requests import make_request
from deker_server_adapters.utils.version import get_api_version

CLUSTER_MODE = "cluster"


@dataclass
class Node:
"""Node of cluster."""

host: str
port: str
protocol: str = "http"
id: Optional[str] = None

@cached_property
def url(self) -> Uri:
"""Make an Uri instance."""
return Uri.create(f"{self.protocol}://{self.host}:{self.port}")

def __hash__(self) -> int:
"""Use string form of the node as a Hash."""
return hash(str(self))

def __str__(self) -> str:
"""String representation."""
return self.id or ""


@dataclass
class ClusterConfig:
"""Normal mode of cluster config."""

mode: str
leader: Node
current: List[Node]
target: Optional[List[Node]] = None # Only appears when cluster in rebalancing mode

__hash_ring: HashRing = field(init=False)
__hash_ring_target: HashRing = field(init=False)

@classmethod
def from_dict(cls, cluster_config_dict: dict) -> "ClusterConfig":
"""Create cluster config from dict.
:param cluster_config_dict: Cluster configuration that comes from server.
"""
leader_id = cluster_config_dict["leader_id"]

def process_nodes(nodes: List[dict]) -> List[Node]:
node_list = [Node(**node_dict) for node_dict in nodes]
node_list.sort(key=lambda x: str(x))
return node_list

# cluster always returns all current RAFT nodes, thus we don't need to check target config to know the leader
# we won't need RAFT cluster config after getting the leader
raft_nodes = process_nodes(cluster_config_dict["raft"])
leader = next((node for node in raft_nodes if node.id == leader_id), None)

if not leader:
raise DekerClusterError(None, "No leader has been found")

current = process_nodes(cluster_config_dict["current"])
target = process_nodes(cluster_config_dict["target"]) if "target" in cluster_config_dict else None

return cls(mode=cluster_config_dict["mode"], leader=leader, current=current, target=target)


def request_config(ctx: CTX) -> dict: # type: ignore[return-value]
"""Request config from server and apply it on context.
:param ctx: App context
"""
httpx_client = ctx.extra["httpx_client"]
url = f"{get_api_version()}/ping"

# If we do healthcheck in cluster
nodes = [*ctx.uri.servers] if ctx.uri.servers else [ctx.uri.raw_url]
response = make_request(url=url, nodes=nodes, client=httpx_client)

if not response or response.status_code != STATUS_OK:
httpx_client.close()
raise DekerServerError(
response,
"Healthcheck failed. Deker client will be closed.",
)

try:
config = response.json() # type: ignore[union-attr]
return config
except JSONDecodeError:
if ctx.uri.servers:
raise DekerClusterError(response, "Server responded with wrong config. Couldn't parse json")


def is_config_in_cluster_mode(config: Optional[dict], ctx: CTX) -> bool:
"""Check if mode from config is set to cluster.
:param config: Config from response
:param ctx: Context of app
"""
if not ctx.uri.servers:
return config is not None and config.get("mode") == CLUSTER_MODE

if config is None or config.get("mode") != CLUSTER_MODE:
raise DekerClusterError(
config,
"Server responded with wrong config."
" Key 'mode' either doesn't exist or its value differs from 'cluster'",
)

return True


def apply_config(config_dict: dict, ctx: CTX) -> None:
"""Apply config from server.
:param config_dict: Config from server
:param ctx: Application context
"""
config = ClusterConfig.from_dict(config_dict)
# Config
ctx.extra["cluster_config"] = config

# Httpx Client
ctx.extra["httpx_client"].base_url = config.leader.url.raw_url
ctx.extra["httpx_client"].cluster_mode = True

# Hash Ring
ctx.extra["hash_ring"] = HashRing(config.current)

ctx.extra["hash_ring_target"] = None # To avoid check within the dict
if config.target:
ctx.extra["hash_ring_target"] = HashRing(config.target)


def request_and_apply_config(ctx: CTX) -> None:
"""Request cluster config from server and apply it on current context.
:param ctx: Application context
"""
config_dict = request_config(ctx)
if is_config_in_cluster_mode(config_dict, ctx):
apply_config(config_dict, ctx)
8 changes: 4 additions & 4 deletions deker_server_adapters/collection_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
from deker_server_adapters.consts import BAD_REQUEST, COLLECTION_NAME_PARAM, NOT_FOUND, STATUS_CREATED, STATUS_OK
from deker_server_adapters.errors import DekerServerError
from deker_server_adapters.httpx_client import HttpxClient
from deker_server_adapters.utils import get_api_version, make_request
from deker_server_adapters.utils.requests import make_request
from deker_server_adapters.utils.version import get_api_version


class ServerCollectionAdapter(BaseServerAdapterMixin, BaseCollectionAdapter):
Expand Down Expand Up @@ -52,7 +53,6 @@ def create(self, collection: "Collection") -> None:
"""
data = collection.as_dict
response = self.client.post(f"/{self.collections_url_prefix}", json=data)

if response.status_code == STATUS_CREATED:
return

Expand All @@ -68,7 +68,7 @@ def read(self, name: str) -> dict:
"""
url = f"{self.collection_url_prefix}/{name}"
if self.client.cluster_mode:
response = make_request(url=url, nodes=self.nodes, client=self.client)
response = make_request(url=url, nodes=self.nodes_urls, client=self.client)
else:
response = self.client.get(url)

Expand Down Expand Up @@ -117,7 +117,7 @@ def is_deleted(self, collection: Collection) -> bool:
return False

def __iter__(self) -> Generator[dict, None, None]:
nodes = self.nodes if self.client.cluster_mode else [str(self.client.base_url)]
nodes = [node.url.raw_url for node in self.nodes] if self.client.cluster_mode else [str(self.client.base_url)]
all_collections_response = make_request(url=self.collections_url_prefix, nodes=nodes, client=self.client)
if all_collections_response is None or all_collections_response.status_code != STATUS_OK:
raise DekerServerError(
Expand Down
24 changes: 16 additions & 8 deletions deker_server_adapters/errors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from json import JSONDecodeError
from typing import Optional
from typing import Optional, Union

from deker.errors import DekerBaseApplicationError
from httpx import Response
Expand All @@ -12,7 +12,7 @@ class DekerServerError(DekerBaseApplicationError):

def _make_response_from_response(
self,
response: Response,
response: Union[Response, dict],
message: Optional[str] = None,
) -> str:
"""Make an Exception message based on response and provided text.
Expand All @@ -21,17 +21,25 @@ def _make_response_from_response(
:param message: Provided message
"""
try:
server_message = response.json()
if isinstance(response, dict):
status = None
server_message = response
else:
status = response.status_code
server_message = response.json()

except JSONDecodeError:
suffix = "..." if len(response.content) > self.MAX_ERROR_TEXT_SIZE else ""
server_message = f"{response.content[:self.MAX_ERROR_TEXT_SIZE]!r}{suffix}"
status = response.status_code # type: ignore[union-attr]
suffix = "..." if len(response.content) > self.MAX_ERROR_TEXT_SIZE else "" # type: ignore[union-attr]
content = response.content[: self.MAX_ERROR_TEXT_SIZE] # type: ignore[union-attr,assignment]
server_message = f"{content!r}" f"{suffix}" # type: ignore[union-attr,assignment]

message = message or ""
return f"{message} \nResponse: status={response.status_code}, message={server_message}"
return f"{message} \nResponse: {status=}, message={server_message}"

def __init__(
self,
response: Optional[Response] = None,
response: Optional[Union[Response, dict]] = None,
message: Optional[str] = None,
) -> None:
if response:
Expand Down Expand Up @@ -70,7 +78,7 @@ class DekerDataPointsLimitError(DekerBaseRateLimitError):
"""If number of data points is too large."""


class DekerClusterError(DekerBaseApplicationError):
class DekerClusterError(DekerServerError):
"""If there is problem with cluster."""


Expand Down
96 changes: 3 additions & 93 deletions deker_server_adapters/factory.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
from json import JSONDecodeError
from typing import TYPE_CHECKING, Any, Dict, Optional, Type
from typing import TYPE_CHECKING, Any, Type

from deker.ABC.base_factory import BaseAdaptersFactory
from deker.ctx import CTX
from deker.uri import Uri
from httpx import Response

from deker_server_adapters.array_adapter import ServerArrayAdapter
from deker_server_adapters.cluster_config import request_and_apply_config
from deker_server_adapters.collection_adapter import ServerCollectionAdapter
from deker_server_adapters.consts import STATUS_OK
from deker_server_adapters.errors import DekerClusterError, DekerServerError
from deker_server_adapters.hash_ring import HashRing
from deker_server_adapters.httpx_client import HttpxClient
from deker_server_adapters.utils import get_api_version, get_leader_and_nodes_mapping, make_request
from deker_server_adapters.varray_adapter import ServerVarrayAdapter


if TYPE_CHECKING:
from deker.ABC.base_adapters import BaseArrayAdapter, BaseCollectionAdapter, BaseStorageAdapter, BaseVArrayAdapter

CLUSTER_MODE = "cluster"


class AdaptersFactory(BaseAdaptersFactory):
"""Factory that produces server adapters."""
Expand Down Expand Up @@ -59,10 +52,7 @@ def __init__(self, ctx: "CTX", uri: "Uri") -> None:
copied_ctx.extra["httpx_client"] = self.httpx_client

# Cluster config
self.get_config_and_configure_context(copied_ctx)

# Single server

request_and_apply_config(copied_ctx)
super().__init__(copied_ctx, uri)

def close(self) -> None:
Expand Down Expand Up @@ -123,83 +113,3 @@ def get_collection_adapter(
:param kwargs: Won't be passed further
"""
return ServerCollectionAdapter(self.ctx)

def do_healthcheck(self, ctx: CTX) -> Optional[Dict]:
"""Check if server is alive.
Fetches config as well.
:param ctx: App context
"""

def check_response(response: Optional[Response], client: HttpxClient) -> None:
if response is None or response.status_code != STATUS_OK:
client.close()
raise DekerServerError(
response,
"Healthcheck failed. Deker client will be closed.",
)

url = f"{get_api_version()}/ping"

# If we do healthcheck in cluster
nodes = [*ctx.uri.servers] if ctx.uri.servers else [ctx.uri.raw_url]
response = make_request(url=url, nodes=nodes, client=self.httpx_client)
check_response(response=response, client=self.httpx_client)

try:
config = response.json() # type: ignore[union-attr]
return config
except JSONDecodeError:
if ctx.uri.servers:
raise DekerClusterError(response, "Server responded with wrong config. Couldn't parse json")

def __set_cluster_config(self, cluster_config: Dict, ctx: CTX) -> None:
"""Set cluster config in the CTX.
:param cluster_config: Custer config json from server
:param ctx: App context (Deker CTX)
"""
leader_node, ids, id_to_host_mapping, nodes = get_leader_and_nodes_mapping(cluster_config)

if leader_node is None:
raise DekerServerError(None, f"Leader node cannot be setted {cluster_config=}")

# Set variables in context
ctx.extra["leader_node"] = Uri.create(leader_node)
ctx.extra["nodes_mapping"] = id_to_host_mapping
ctx.extra["hash_ring"] = HashRing(ids)
ctx.extra["nodes"] = nodes

def __is_mode_cluster(self, config: Optional[dict], ctx: CTX) -> bool:
"""Check if mode from config is set to cluster.
:param config: Config from response
:param ctx: Context of app
"""
if not ctx.uri.servers:
return config is not None and config.get("mode") == CLUSTER_MODE

if config is None or config.get("mode") != CLUSTER_MODE:
raise DekerClusterError(
config,
"Server responded with wrong config."
" Key 'mode' either doesn't exist or its value differs from 'cluster'",
)

return True

def get_config_and_configure_context(self, ctx: CTX) -> None:
"""Get info from node and set config.
:param ctx: CTX where client and config will be injected
"""
# Get config
config = self.do_healthcheck(ctx)

if self.__is_mode_cluster(config, ctx):
# Set cluster config
self.__set_cluster_config(config, ctx) # type: ignore[arg-type]

# Set Httpx client based on cluster config
self.httpx_client.base_url = ctx.extra["leader_node"].raw_url
self.httpx_client.cluster_mode = True
Loading

0 comments on commit 1f46c33

Please sign in to comment.