Skip to content

Commit

Permalink
Merge branch 'master' into performance-test-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
bisgaard-itis authored Nov 25, 2024
2 parents 8d45899 + 0aeae77 commit e9ec48c
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 33 deletions.
108 changes: 75 additions & 33 deletions services/director/src/simcore_service_director/registry_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import httpx
from aiocache import Cache, SimpleMemoryCache # type: ignore[import-untyped]
from fastapi import FastAPI, status
from servicelib.background_task import start_periodic_task, stop_periodic_task
from servicelib.logging_utils import log_catch, log_context
from servicelib.utils import limited_as_completed
from tenacity import retry
Expand Down Expand Up @@ -189,26 +190,26 @@ async def registry_request(
app: FastAPI,
*,
path: str,
method: str = "GET",
no_cache: bool = False,
method: str,
use_cache: bool,
**session_kwargs,
) -> tuple[dict, Mapping]:
cache: SimpleMemoryCache = app.state.registry_cache_memory
cache_key = f"{method}_{path}"
if not no_cache and (cached_response := await cache.get(cache_key)):
if use_cache and (cached_response := await cache.get(cache_key)):
assert isinstance(cached_response, tuple) # nosec
return cast(tuple[dict, Mapping], cached_response)

app_settings = get_application_settings(app)
try:
response, response_headers = await _retried_request(
app, path, method, **session_kwargs
app, path, method.upper(), **session_kwargs
)
except httpx.RequestError as exc:
msg = f"Unknown error while accessing registry: {exc!s} via {exc.request}"
raise DirectorRuntimeError(msg=msg) from exc

if not no_cache and app_settings.DIRECTOR_REGISTRY_CACHING and method == "GET":
if app_settings.DIRECTOR_REGISTRY_CACHING and method.upper() == "GET":
await cache.set(
cache_key,
(response, response_headers),
Expand All @@ -218,10 +219,6 @@ async def registry_request(
return response, response_headers


async def _is_registry_responsive(app: FastAPI) -> None:
await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0)


async def _setup_registry(app: FastAPI) -> None:
@retry(
wait=wait_fixed(1),
Expand All @@ -230,22 +227,36 @@ async def _setup_registry(app: FastAPI) -> None:
reraise=True,
)
async def _wait_until_registry_responsive(app: FastAPI) -> None:
await _is_registry_responsive(app)
await _basic_auth_registry_request(app, path="", method="HEAD", timeout=1.0)

with log_context(_logger, logging.INFO, msg="Connecting to docker registry"):
await _wait_until_registry_responsive(app)


async def _list_all_services_task(*, app: FastAPI) -> None:
with log_context(_logger, logging.INFO, msg="Updating cache with services"):
await list_services(app, ServiceType.ALL, update_cache=True)


def setup(app: FastAPI) -> None:
async def on_startup() -> None:
cache = Cache(Cache.MEMORY)
assert isinstance(cache, SimpleMemoryCache) # nosec
app.state.registry_cache_memory = cache
await _setup_registry(app)
app_settings = get_application_settings(app)
app.state.auto_cache_task = None
if app_settings.DIRECTOR_REGISTRY_CACHING:
app.state.auto_cache_task = start_periodic_task(
_list_all_services_task,
interval=app_settings.DIRECTOR_REGISTRY_CACHING_TTL / 2,
task_name="director-auto-cache-task",
app=app,
)

async def on_shutdown() -> None:
# nothing to do here
...
if app.state.auto_cache_task:
await stop_periodic_task(app.state.auto_cache_task)

app.add_event_handler("startup", on_startup)
app.add_event_handler("shutdown", on_shutdown)
Expand All @@ -266,19 +277,23 @@ def _get_prefix(service_type: ServiceType) -> str:


async def _list_repositories_gen(
app: FastAPI, service_type: ServiceType
app: FastAPI, service_type: ServiceType, *, update_cache: bool
) -> AsyncGenerator[list[str], None]:
with log_context(_logger, logging.DEBUG, msg="listing repositories"):
path = f"_catalog?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
result, headers = await registry_request(app, path=path) # initial call
result, headers = await registry_request(
app, path=path, method="GET", use_cache=not update_cache
) # initial call

while True:
if "Link" in headers:
next_path = (
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
)
prefetch_task = asyncio.create_task(
registry_request(app, path=next_path)
registry_request(
app, path=next_path, method="GET", use_cache=not update_cache
)
)
else:
prefetch_task = None
Expand All @@ -296,27 +311,36 @@ async def _list_repositories_gen(


async def list_image_tags_gen(
app: FastAPI, image_key: str
app: FastAPI, image_key: str, *, update_cache=False
) -> AsyncGenerator[list[str], None]:
with log_context(_logger, logging.DEBUG, msg=f"listing image tags in {image_key}"):
path = f"{image_key}/tags/list?n={get_application_settings(app).DIRECTOR_REGISTRY_CLIENT_MAX_NUMBER_OF_RETRIEVED_OBJECTS}"
tags, headers = await registry_request(app, path=path) # initial call
tags, headers = await registry_request(
app, path=path, method="GET", use_cache=not update_cache
) # initial call
assert "tags" in tags # nosec
while True:
if "Link" in headers:
next_path = (
str(headers["Link"]).split(";")[0].strip("<>").removeprefix("/v2/")
)
prefetch_task = asyncio.create_task(
registry_request(app, path=next_path)
registry_request(
app, path=next_path, method="GET", use_cache=not update_cache
)
)
else:
prefetch_task = None

yield list(
filter(
VERSION_REG.match,
tags["tags"],
yield (
list(
filter(
VERSION_REG.match,
tags["tags"],
)
)
if tags["tags"] is not None
else []
)
if prefetch_task:
tags, headers = await prefetch_task
Expand All @@ -342,20 +366,22 @@ async def get_image_digest(app: FastAPI, image: str, tag: str) -> str | None:
SEE https://distribution.github.io/distribution/spec/api/#digest-header
"""
path = f"{image}/manifests/{tag}"
_, headers = await registry_request(app, path=path)
_, headers = await registry_request(app, path=path, method="GET", use_cache=True)

headers = headers or {}
return headers.get(_DOCKER_CONTENT_DIGEST_HEADER, None)


async def get_image_labels(
app: FastAPI, image: str, tag: str
app: FastAPI, image: str, tag: str, *, update_cache=False
) -> tuple[dict[str, str], str | None]:
"""Returns image labels and the image manifest digest"""

_logger.debug("getting image labels of %s:%s", image, tag)
path = f"{image}/manifests/{tag}"
request_result, headers = await registry_request(app, path=path)
request_result, headers = await registry_request(
app, path=path, method="GET", use_cache=not update_cache
)
v1_compatibility_key = json.loads(request_result["history"][0]["v1Compatibility"])
container_config: dict[str, Any] = v1_compatibility_key.get(
"container_config", v1_compatibility_key["config"]
Expand All @@ -371,10 +397,12 @@ async def get_image_labels(


async def get_image_details(
app: FastAPI, image_key: str, image_tag: str
app: FastAPI, image_key: str, image_tag: str, *, update_cache=False
) -> dict[str, Any]:
image_details: dict = {}
labels, image_manifest_digest = await get_image_labels(app, image_key, image_tag)
labels, image_manifest_digest = await get_image_labels(
app, image_key, image_tag, update_cache=update_cache
)

if image_manifest_digest:
# Adds manifest as extra key in the response similar to org.opencontainers.image.base.digest
Expand Down Expand Up @@ -402,11 +430,18 @@ async def get_image_details(
return image_details


async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]:
async def get_repo_details(
app: FastAPI, image_key: str, *, update_cache=False
) -> list[dict[str, Any]]:
repo_details = []
async for image_tags in list_image_tags_gen(app, image_key):
async for image_tags in list_image_tags_gen(
app, image_key, update_cache=update_cache
):
async for image_details_future in limited_as_completed(
(get_image_details(app, image_key, tag) for tag in image_tags),
(
get_image_details(app, image_key, tag, update_cache=update_cache)
for tag in image_tags
),
limit=get_application_settings(
app
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS,
Expand All @@ -417,16 +452,23 @@ async def get_repo_details(app: FastAPI, image_key: str) -> list[dict[str, Any]]
return repo_details


async def list_services(app: FastAPI, service_type: ServiceType) -> list[dict]:
async def list_services(
app: FastAPI, service_type: ServiceType, *, update_cache=False
) -> list[dict]:
with log_context(_logger, logging.DEBUG, msg="listing services"):
services = []
concurrency_limit = get_application_settings(
app
).DIRECTOR_REGISTRY_CLIENT_MAX_CONCURRENT_CALLS
async for repos in _list_repositories_gen(app, service_type):
async for repos in _list_repositories_gen(
app, service_type, update_cache=update_cache
):
# only list as service if it actually contains the necessary labels
async for repo_details_future in limited_as_completed(
(get_repo_details(app, repo) for repo in repos),
(
get_repo_details(app, repo, update_cache=update_cache)
for repo in repos
),
limit=concurrency_limit,
):
with log_catch(_logger, reraise=False):
Expand Down
10 changes: 10 additions & 0 deletions services/director/tests/unit/test_registry_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import asyncio
import json
import time
from unittest import mock

import pytest
from fastapi import FastAPI
from pytest_benchmark.plugin import BenchmarkFixture
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
from pytest_simcore.helpers.typing_env import EnvVarsDict
from settings_library.docker_registry import RegistrySettings
Expand Down Expand Up @@ -228,9 +230,17 @@ def configure_registry_caching(
)


@pytest.fixture
def with_disabled_auto_caching(mocker: MockerFixture) -> mock.Mock:
return mocker.patch(
"simcore_service_director.registry_proxy._list_all_services_task", autospec=True
)


async def test_registry_caching(
configure_registry_access: EnvVarsDict,
configure_registry_caching: EnvVarsDict,
with_disabled_auto_caching: mock.Mock,
app_settings: ApplicationSettings,
app: FastAPI,
push_services,
Expand Down

0 comments on commit e9ec48c

Please sign in to comment.