Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛🎨♻️Director-v0: improve registry caching #6799

Merged
merged 29 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8eaeeb0
add pytest-benchmark
sanderegg Nov 21, 2024
35de004
add external registry
sanderegg Nov 21, 2024
8599207
add prefetching and generators
sanderegg Nov 21, 2024
58de786
added tests with external registry
sanderegg Nov 21, 2024
2603f5c
let's have more generators
sanderegg Nov 21, 2024
8e36edc
skip test if no external registry set
sanderegg Nov 21, 2024
20f50ec
unflake test
sanderegg Nov 21, 2024
6c7234f
add httpx client session
sanderegg Nov 21, 2024
2e6fd90
migrated to httpx
sanderegg Nov 21, 2024
f99ff9a
migrated to httpx and added retrials
sanderegg Nov 21, 2024
181c80f
5 rounds
sanderegg Nov 21, 2024
b9dccf9
@pcrespov review: use fastapi.status
sanderegg Nov 21, 2024
886d683
cleanup
sanderegg Nov 21, 2024
b7e3acf
ruff
sanderegg Nov 21, 2024
5bcaea3
make sure the settings take care of the registry settings being correct
sanderegg Nov 21, 2024
93da4df
cleanup
sanderegg Nov 21, 2024
cb37f16
missing refactoring
sanderegg Nov 21, 2024
9ebba79
encoded true is needed here
sanderegg Nov 21, 2024
83148bd
replace aioresponses with httpx
sanderegg Nov 21, 2024
6aa1350
sonarcloud: use a specific name
sanderegg Nov 22, 2024
fdd0eba
remove confusion
sanderegg Nov 22, 2024
220b215
ensure registry proxy uses internal registry URL
sanderegg Nov 22, 2024
b6aa263
ensure registry proxy uses api_url
sanderegg Nov 22, 2024
0076aec
fix api_url to return protocol
sanderegg Nov 22, 2024
7c7c431
fixes path
sanderegg Nov 22, 2024
3e3d2f1
fix encoding
sanderegg Nov 22, 2024
a594613
fix required registry URL
sanderegg Nov 22, 2024
1843d94
fix prefix
sanderegg Nov 22, 2024
cb215ae
missing env
sanderegg Nov 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from functools import cached_property
from typing import Any
from typing import Any, Self

from pydantic import Field, SecretStr, field_validator
from pydantic import (
AnyHttpUrl,
Field,
SecretStr,
TypeAdapter,
field_validator,
model_validator,
)
from pydantic_settings import SettingsConfigDict

from .base import BaseCustomSettings
Expand All @@ -12,24 +19,41 @@ class RegistrySettings(BaseCustomSettings):
REGISTRY_PATH: str | None = Field(
default=None,
# This is useful in case of a local registry, where the registry url (path) is relative to the host docker engine"
description="development mode only, in case a local registry is used",
description="development mode only, in case a local registry is used - "
"this is the hostname to the docker registry as seen from inside the container",
)
# NOTE: name is missleading, http or https protocol are not included
REGISTRY_URL: str = Field(default="", description="address to the docker registry")
REGISTRY_URL: str = Field(
...,
description="hostname of docker registry (without protocol but with port if available) - "
"typically used by the host machine docker engine",
min_length=1,
)

REGISTRY_USER: str = Field(
..., description="username to access the docker registry"
)
REGISTRY_PW: SecretStr = Field(
..., description="password to access the docker registry"
)
REGISTRY_SSL: bool = Field(..., description="access to registry through ssl")
REGISTRY_SSL: bool = Field(
..., description="True if docker registry is using HTTPS protocol"
)

@field_validator("REGISTRY_PATH", mode="before")
@classmethod
def _escape_none_string(cls, v) -> Any | None:
return None if v == "None" else v

@model_validator(mode="after")
def check_registry_authentication(self: Self) -> Self:
if self.REGISTRY_AUTH and any(
not v for v in (self.REGISTRY_USER, self.REGISTRY_PW)
):
msg = "If REGISTRY_AUTH is True, both REGISTRY_USER and REGISTRY_PW must be provided"
raise ValueError(msg)
return self

@cached_property
def resolved_registry_url(self) -> str:
return self.REGISTRY_PATH or self.REGISTRY_URL
Expand All @@ -38,6 +62,14 @@ def resolved_registry_url(self) -> str:
def api_url(self) -> str:
return f"{self.REGISTRY_URL}/v2"

@cached_property
def registry_url(self) -> AnyHttpUrl:
"""returns the full URL to the Docker Registry for use by docker engine"""
protocol = "https" if self.REGISTRY_SSL else "http"
return TypeAdapter(AnyHttpUrl).validate_python(
f"{protocol}://{self.REGISTRY_URL}"
)

model_config = SettingsConfigDict(
json_schema_extra={
"examples": [
Expand Down
3 changes: 2 additions & 1 deletion services/director/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
aiocache
aiodocker
fastapi[all]
httpx
httpx[http2]
prometheus-client
pydantic
tenacity
10 changes: 9 additions & 1 deletion services/director/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ h11==0.14.0
# via
# httpcore
# uvicorn
h2==4.1.0
# via httpx
hpack==4.0.0
# via h2
httpcore==1.0.6
# via httpx
httptools==0.6.4
Expand All @@ -135,6 +139,8 @@ httpx==0.27.2
# -r requirements/../../../packages/service-library/requirements/_fastapi.in
# -r requirements/_base.in
# fastapi
hyperframe==6.0.1
# via h2
idna==3.10
# via
# anyio
Expand Down Expand Up @@ -422,7 +428,9 @@ starlette==0.41.3
# -c requirements/../../../requirements/constraints.txt
# fastapi
tenacity==9.0.0
# via -r requirements/../../../packages/service-library/requirements/_base.in
# via
# -r requirements/../../../packages/service-library/requirements/_base.in
# -r requirements/_base.in
toolz==1.0.0
# via -r requirements/../../../packages/service-library/requirements/_base.in
tqdm==4.67.0
Expand Down
1 change: 1 addition & 0 deletions services/director/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ faker
jsonref
pytest
pytest-asyncio
pytest-benchmark
pytest-cov
pytest-docker
pytest-instafail
Expand Down
5 changes: 5 additions & 0 deletions services/director/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,13 @@ propcache==0.2.0
# -c requirements/_base.txt
# aiohttp
# yarl
py-cpuinfo==9.0.0
# via pytest-benchmark
pytest==8.3.3
# via
# -r requirements/_test.in
# pytest-asyncio
# pytest-benchmark
# pytest-cov
# pytest-docker
# pytest-instafail
Expand All @@ -100,6 +103,8 @@ pytest-asyncio==0.23.8
# via
# -c requirements/../../../requirements/constraints.txt
# -r requirements/_test.in
pytest-benchmark==5.1.0
# via -r requirements/_test.in
pytest-cov==6.0.0
# via -r requirements/_test.in
pytest-docker==3.1.1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,22 @@
from aiohttp import ClientSession, ClientTimeout
from common_library.json_serialization import json_dumps
import httpx
from fastapi import FastAPI
from servicelib.utils import (
get_http_client_request_aiohttp_connect_timeout,
get_http_client_request_aiohttp_sock_connect_timeout,
get_http_client_request_total_timeout,
)


def setup_client_session(app: FastAPI) -> None:
async def on_startup() -> None:
# SEE https://github.com/ITISFoundation/osparc-simcore/issues/4628

# ANE: it is important to have fast connection handshakes
# also requests should be as fast as possible
# some services are not that fast to reply
timeout_settings = ClientTimeout(
total=get_http_client_request_total_timeout(),
connect=get_http_client_request_aiohttp_connect_timeout(),
sock_connect=get_http_client_request_aiohttp_sock_connect_timeout(),
)
session = ClientSession(
timeout=timeout_settings,
json_serialize=json_dumps,
)
session = httpx.AsyncClient(transport=httpx.AsyncHTTPTransport(http2=True))
app.state.aiohttp_client_session = session
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registry accepts http/2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well it does not reject it so why not use it?


async def on_shutdown() -> None:
session = app.state.aiohttp_client_session
assert isinstance(session, ClientSession) # nosec
await session.close()
assert isinstance(session, httpx.AsyncClient) # nosec
await session.aclose()

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)


def get_client_session(app: FastAPI) -> ClientSession:
def get_client_session(app: FastAPI) -> httpx.AsyncClient:
session = app.state.aiohttp_client_session
assert isinstance(session, ClientSession) # nosec
assert isinstance(session, httpx.AsyncClient) # nosec
return session
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from typing import Any

from common_library.errors_classes import OsparcErrorMixin


class DirectorRuntimeError(OsparcErrorMixin, RuntimeError):
def __init__(self, **ctx: Any) -> None:
super().__init__(**ctx)

msg_template: str = "Director-v0 unexpected error: {msg}"


Expand Down
101 changes: 45 additions & 56 deletions services/director/src/simcore_service_director/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,22 @@
import re
from datetime import timedelta
from enum import Enum
from http import HTTPStatus
from pprint import pformat
from typing import Any, Final, cast

import aiodocker
import aiodocker.networks
import aiohttp
import arrow
import httpx
import tenacity
from aiohttp import (
ClientConnectionError,
ClientError,
ClientResponse,
ClientResponseError,
ClientSession,
ClientTimeout,
)
from fastapi import FastAPI
from fastapi import FastAPI, status
from packaging.version import Version
from servicelib.async_utils import run_sequentially_in_context
from servicelib.docker_utils import to_datetime
from settings_library.docker_registry import RegistrySettings
from tenacity import retry
from tenacity import retry, wait_random_exponential
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_fixed

from . import docker_utils, registry_proxy
from .client_session import get_client_session
Expand Down Expand Up @@ -547,7 +537,7 @@ async def _pass_port_to_service(
service_name: str,
port: str,
service_boot_parameters_labels: list[Any],
session: ClientSession,
session: httpx.AsyncClient,
app_settings: ApplicationSettings,
) -> None:
for param in service_boot_parameters_labels:
Expand All @@ -566,8 +556,8 @@ async def _pass_port_to_service(
"port": str(port),
}
_logger.debug("creating request %s and query %s", service_url, query_string)
async with session.post(service_url, data=query_string) as response:
_logger.debug("query response: %s", await response.text())
response = await session.post(service_url, data=query_string)
_logger.debug("query response: %s", response.text)
return
_logger.debug("service %s does not need to know its external port", service_name)

Expand Down Expand Up @@ -1149,50 +1139,48 @@ async def get_service_details(app: FastAPI, node_uuid: str) -> dict:


@retry(
wait=wait_fixed(2),
wait=wait_random_exponential(min=1, max=5),
GitHK marked this conversation as resolved.
Show resolved Hide resolved
stop=stop_after_attempt(3),
reraise=True,
retry=retry_if_exception_type(ClientConnectionError),
retry=retry_if_exception_type(httpx.RequestError),
)
async def _save_service_state(
service_host_name: str, session: aiohttp.ClientSession
service_host_name: str, session: httpx.AsyncClient
) -> None:
response: ClientResponse
async with session.post(
url=f"http://{service_host_name}/state", # NOSONAR
timeout=ClientTimeout(
ServicesCommonSettings().director_dynamic_service_save_timeout
),
) as response:
try:
response.raise_for_status()
try:
response = await session.post(
url=f"http://{service_host_name}/state", # NOSONAR
timeout=ServicesCommonSettings().director_dynamic_service_save_timeout,
)
response.raise_for_status()

except ClientResponseError as err:
if err.status in (
HTTPStatus.METHOD_NOT_ALLOWED,
HTTPStatus.NOT_FOUND,
HTTPStatus.NOT_IMPLEMENTED,
):
# NOTE: Legacy Override. Some old services do not have a state entrypoint defined
# therefore we assume there is nothing to be saved and do not raise exception
# Responses found so far:
# METHOD NOT ALLOWED https://httpstatuses.com/405
# NOT FOUND https://httpstatuses.com/404
#
_logger.warning(
"Service '%s' does not seem to implement save state functionality: %s. Skipping save",
service_host_name,
err,
)
else:
# upss ... could service had troubles saving, reraise
raise
else:
_logger.info(
"Service '%s' successfully saved its state: %s",
except httpx.HTTPStatusError as err:

if err.response.status_code in (
status.HTTP_405_METHOD_NOT_ALLOWED,
status.HTTP_404_NOT_FOUND,
status.HTTP_501_NOT_IMPLEMENTED,
):
# NOTE: Legacy Override. Some old services do not have a state entrypoint defined
# therefore we assume there is nothing to be saved and do not raise exception
# Responses found so far:
# METHOD NOT ALLOWED https://httpstatuses.com/405
# NOT FOUND https://httpstatuses.com/404
#
_logger.warning(
"Service '%s' does not seem to implement save state functionality: %s. Skipping save",
service_host_name,
f"{response}",
err,
)
else:
# upss ... could service had troubles saving, reraise
raise
else:
_logger.info(
"Service '%s' successfully saved its state: %s",
service_host_name,
f"{response}",
)


@run_sequentially_in_context(target_args=["node_uuid"])
Expand Down Expand Up @@ -1246,20 +1234,21 @@ async def stop_service(app: FastAPI, *, node_uuid: str, save_state: bool) -> Non
await _save_service_state(
service_host_name, session=get_client_session(app)
)
except ClientResponseError as err:
except httpx.HTTPStatusError as err:

raise ServiceStateSaveError(
service_uuid=node_uuid,
reason=f"service {service_host_name} rejected to save state, "
f"responded {err.message} (status {err.status})."
f"responded {err.response.text} (status {err.response.status_code})."
"Aborting stop service to prevent data loss.",
) from err

except ClientError as err:
except httpx.RequestError as err:
_logger.warning(
"Could not save state because %s is unreachable [%s]."
"Resuming stop_service.",
service_host_name,
err,
err.request,
)

# remove the services
Expand Down
Loading
Loading