From c5fdeeac2d1aa042bb1a7b1028ad1e235a262ae6 Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Thu, 7 Dec 2023 16:46:29 -0800 Subject: [PATCH] Move background task management Rather than each service having its own separate start and stop routines and background task scheduler, collect all background task management into its own BackgroundTaskManager. It handles the sequencing of running some operations on startup and then managing all of the tasks, and all of the service objects provide simple coroutine methods to do the work. This will be used in later changes for cleaner invocation of work normally done in background tasks, avoiding delays and allowing easier testing of race conditions. --- controller/src/controller/background.py | 201 +++++++++++++ controller/src/controller/constants.py | 8 +- controller/src/controller/factory.py | 60 ++-- .../src/controller/services/fileserver.py | 215 ++++++-------- controller/src/controller/services/image.py | 62 +--- controller/src/controller/services/lab.py | 267 +++++++----------- .../src/controller/services/prepuller.py | 66 +---- .../storage/kubernetes/fileserver.py | 19 -- controller/tests/conftest.py | 1 + controller/tests/services/prepuller_test.py | 14 +- docs/dev/api/controller.rst | 3 + 11 files changed, 447 insertions(+), 469 deletions(-) create mode 100644 controller/src/controller/background.py diff --git a/controller/src/controller/background.py b/controller/src/controller/background.py new file mode 100644 index 000000000..a38759385 --- /dev/null +++ b/controller/src/controller/background.py @@ -0,0 +1,201 @@ +"""Nublado controller background processing.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Coroutine +from datetime import timedelta + +from aiojobs import Scheduler +from safir.datetime import current_datetime +from safir.slack.webhook import SlackWebhookClient +from structlog.stdlib import BoundLogger + +from .constants import ( + FILE_SERVER_RECONCILE_INTERVAL, + IMAGE_REFRESH_INTERVAL, + LAB_RECONCILE_INTERVAL, +) +from .services.fileserver import FileserverManager +from .services.image import ImageService +from .services.lab import LabManager +from .services.prepuller import Prepuller + +__all__ = ["BackgroundTaskManager"] + + +class BackgroundTaskManager: + """Manage Nublado controller background tasks. + + While the Nublado controller is running, it needs to perform several + periodic or continuous background tasks, namely: + + #. Refresh the list of available remote images and local cached images. + #. Prepull images to all eligible nodes. + #. Reconcile Kubernetes lab state with internal data structures. + #. Reap tasks that were monitoring lab spawning or deletion. + #. Watch file servers for changes in pod status (startup or timeout). + #. Reconcile Kubernetes file server state with internal data structures. + + This class manages all of these background tasks including, where + relevant, their schedules. It only does the task management; all of the + work of these tasks is done by methods on the underlying service objects. + + This class is created during startup and tracked as part of the + `~controller.factory.ProcessContext`. + + Parameters + ---------- + image_service + Image service. + prepuller + Prepuller service. + lab_manager + Lab management service. + fileserver_manager + File server management service. + slack_client + Optional Slack webhook client for alerts. + logger + Logger to use. + """ + + def __init__( + self, + *, + image_service: ImageService, + prepuller: Prepuller, + lab_manager: LabManager, + fileserver_manager: FileserverManager | None, + slack_client: SlackWebhookClient | None, + logger: BoundLogger, + ) -> None: + self._image_service = image_service + self._prepuller = prepuller + self._lab_manager = lab_manager + self._fileserver_manager = fileserver_manager + self._slack = slack_client + self._logger = logger + + self._scheduler: Scheduler | None = None + + async def start(self) -> None: + """Start all background tasks. + + Intended to be called during Nublado controller startup. Several of + the background tasks are run in the foreground first to ensure + internal state is valid before starting to serve requests. + """ + if self._scheduler: + msg = "Background tasks already running, cannot start" + self._logger.warning(msg) + return + self._scheduler = Scheduler() + + # Run some of the tasks in the foreground first to ensure internal + # data is consistent after startup. All of them can run in parallel. + async with asyncio.TaskGroup() as tg: + self._logger.info("Populating internal state") + tg.create_task(self._image_service.refresh()) + tg.create_task(self._lab_manager.reconcile()) + if self._fileserver_manager: + tg.create_task(self._fileserver_manager.reconcile()) + + # Now, start all of the tasks in the background. + coros = [ + self._loop( + self._image_service.refresh(), + IMAGE_REFRESH_INTERVAL, + "refreshing image data", + ), + self._prepull_loop(), + self._loop( + self._lab_manager.reconcile(), + LAB_RECONCILE_INTERVAL, + "reconciling lab state", + ), + self._lab_manager.reap_spawners(), + ] + if self._fileserver_manager: + coros.append( + self._loop( + self._fileserver_manager.reconcile(), + FILE_SERVER_RECONCILE_INTERVAL, + "reconciling file server state", + ) + ) + coros.append(self._fileserver_manager.watch_servers()) + self._logger.info("Starting background tasks") + for coro in coros: + await self._scheduler.spawn(coro) + + async def stop(self) -> None: + """Stop the background tasks.""" + if not self._scheduler: + msg = "Background tasks were already stopped" + self._logger.warning(msg) + return + self._logger.info("Stopping background tasks") + await self._scheduler.close() + self._scheduler = None + await self._lab_manager.stop_monitor_tasks() + + async def _loop( + self, + coro: Coroutine[None, None, None], + interval: timedelta, + description: str, + ) -> None: + """Wrap a coroutine in a periodic scheduling loop. + + The provided coroutine is run on every interval. This method always + delays by the interval first before running the coroutine for the + first time. + + Parameters + ---------- + coro + Coroutine to run repeatedly. + interval + Scheduling interval to use. + description + Description of the background task for error reporting. + """ + while True: + start = current_datetime(microseconds=True) + try: + await coro + except Exception as e: + # On failure, log the exception but otherwise continue as + # normal, including the delay. This will provide some time for + # whatever the problem was to be resolved. + elapsed = current_datetime(microseconds=True) - start + msg = f"Uncaught exception {description}" + self._logger.exception(msg, delay=elapsed.total_seconds) + if self._slack: + await self._slack.post_uncaught_exception(e) + delay = interval - (current_datetime(microseconds=True) - start) + if delay.total_seconds() < 1: + msg = f"{description.capitalize()} is running continuously" + self._logger.warning(msg) + else: + await asyncio.sleep(delay.total_seconds()) + + async def _prepull_loop(self) -> None: + """Execute the prepuller in an infinite loop. + + The prepuller loop uses an `asyncio.Event` set by the image service to + decide when to run instead of a simple interval. This ensures the + prepuller runs immediately after a possible image list update. + """ + while True: + try: + await self._image_service.prepuller_wait() + await self._prepuller.prepull_images() + except Exception as e: + self._logger.exception("Uncaught exception prepulling images") + if self._slack: + await self._slack.post_uncaught_exception(e) + pause = IMAGE_REFRESH_INTERVAL.total_seconds() + self._logger.warning("Pausing failed prepuller for {pause}s") + await asyncio.sleep(pause) diff --git a/controller/src/controller/constants.py b/controller/src/controller/constants.py index ff7b264d5..f7788ba33 100644 --- a/controller/src/controller/constants.py +++ b/controller/src/controller/constants.py @@ -9,12 +9,12 @@ "DOCKER_CREDENTIALS_PATH", "DROPDOWN_SENTINEL_VALUE", "GROUPNAME_REGEX", - "FILE_SERVER_REFRESH_INTERVAL", + "FILE_SERVER_RECONCILE_INTERVAL", "IMAGE_REFRESH_INTERVAL", "KUBERNETES_NAME_PATTERN", "KUBERNETES_REQUEST_TIMEOUT", "LAB_COMMAND", - "LAB_STATE_REFRESH_INTERVAL", + "LAB_RECONCILE_INTERVAL", "LIMIT_TO_REQUEST_RATIO", "METADATA_PATH", "MOUNT_PATH_DOWNWARD_API", @@ -47,7 +47,7 @@ DROPDOWN_SENTINEL_VALUE = "use_image_from_dropdown" """Used in the lab form for ``image_list`` when ``image_dropdown`` is used.""" -FILE_SERVER_REFRESH_INTERVAL = timedelta(minutes=60) +FILE_SERVER_RECONCILE_INTERVAL = timedelta(minutes=60) """How frequently to refresh file server state from Kubernetes. This will detect when file servers disappear out from under us, such as being @@ -75,7 +75,7 @@ This should be configurable but isn't yet. """ -LAB_STATE_REFRESH_INTERVAL = timedelta(minutes=60) +LAB_RECONCILE_INTERVAL = timedelta(minutes=60) """How frequently to refresh user lab state from Kubernetes. This will detect when user labs disappear out from under us without user diff --git a/controller/src/controller/factory.py b/controller/src/controller/factory.py index 3fb1a6e53..ae098f8e9 100644 --- a/controller/src/controller/factory.py +++ b/controller/src/controller/factory.py @@ -14,6 +14,7 @@ from safir.slack.webhook import SlackWebhookClient from structlog.stdlib import BoundLogger +from .background import BackgroundTaskManager from .config import Config from .exceptions import NotConfiguredError from .models.v1.prepuller_config import DockerSourceConfig, GARSourceConfig @@ -75,6 +76,9 @@ class ProcessContext: _fileserver_manager: FileserverManager | None """State management for user file servers.""" + background: BackgroundTaskManager + """Manager for background tasks.""" + @classmethod async def from_config(cls, config: Config) -> Self: """Create a new process context from the controller configuration. @@ -148,32 +152,42 @@ async def from_config(cls, config: Config) -> Self: slack_client=slack_client, logger=logger, ) + prepuller = Prepuller( + image_service=image_service, + prepuller_builder=PrepullerBuilder( + metadata_storage=metadata_storage, + pull_secret=config.lab.pull_secret, + ), + metadata_storage=metadata_storage, + pod_storage=PodStorage(kubernetes_client, logger), + slack_client=slack_client, + logger=logger, + ) + lab_manager = LabManager( + config=config.lab, + image_service=image_service, + lab_builder=LabBuilder(config.lab, config.base_url, logger), + metadata_storage=metadata_storage, + lab_storage=LabStorage(kubernetes_client, logger), + slack_client=slack_client, + logger=logger, + ) return cls( config=config, http_client=http_client, image_service=image_service, kubernetes_client=kubernetes_client, - prepuller=Prepuller( - image_service=image_service, - prepuller_builder=PrepullerBuilder( - metadata_storage=metadata_storage, - pull_secret=config.lab.pull_secret, - ), - metadata_storage=metadata_storage, - pod_storage=PodStorage(kubernetes_client, logger), - slack_client=slack_client, - logger=logger, - ), - lab_manager=LabManager( - config=config.lab, + prepuller=prepuller, + lab_manager=lab_manager, + _fileserver_manager=fileserver_manager, + background=BackgroundTaskManager( image_service=image_service, - lab_builder=LabBuilder(config.lab, config.base_url, logger), - metadata_storage=metadata_storage, - lab_storage=LabStorage(kubernetes_client, logger), + prepuller=prepuller, + lab_manager=lab_manager, + fileserver_manager=fileserver_manager, slack_client=slack_client, logger=logger, ), - _fileserver_manager=fileserver_manager, ) @property @@ -189,11 +203,7 @@ async def aclose(self) -> None: async def start(self) -> None: """Start the background threads running.""" - await self.image_service.start() - await self.prepuller.start() - await self.lab_manager.start() - if self._fileserver_manager: - await self._fileserver_manager.start() + await self.background.start() async def stop(self) -> None: """Clean up a process context. @@ -201,11 +211,7 @@ async def stop(self) -> None: Called during shutdown, or before recreating the process context using a different configuration. """ - if self._fileserver_manager: - await self._fileserver_manager.stop() - await self.prepuller.stop() - await self.image_service.stop() - await self.lab_manager.stop() + await self.background.stop() class Factory: diff --git a/controller/src/controller/services/fileserver.py b/controller/src/controller/services/fileserver.py index b11883d8e..b20d06fd2 100644 --- a/controller/src/controller/services/fileserver.py +++ b/controller/src/controller/services/fileserver.py @@ -6,19 +6,15 @@ import contextlib from dataclasses import dataclass, field -from aiojobs import Scheduler -from safir.datetime import current_datetime from safir.slack.blockkit import SlackException from safir.slack.webhook import SlackWebhookClient from structlog.stdlib import BoundLogger from ..config import EnabledFileserverConfig from ..constants import ( - FILE_SERVER_REFRESH_INTERVAL, KUBERNETES_REQUEST_TIMEOUT, ) from ..exceptions import ( - MissingObjectError, UnknownUserError, ) from ..models.domain.gafaelfawr import GafaelfawrUserInfo @@ -61,6 +57,8 @@ class FileserverManager: Builder that constructs file server Kubernetes objects. fileserver_storage Kubernetes storage layer for file servers. + slack_client + Optional Slack webhook client for alerts. logger Logger to use. """ @@ -80,9 +78,6 @@ def __init__( self._slack = slack_client self._logger = logger - # Background task management. - self._scheduler: Scheduler | None = None - # Mapping of usernames to internal state. self._servers: dict[str, _State] = {} @@ -158,39 +153,92 @@ async def list(self) -> list[str]: """List users with running file servers.""" return [u for u, s in self._servers.items() if s.running] - async def start(self) -> None: - """Start the background file server tasks. + async def reconcile(self) -> None: + """Reconcile internal state with Kubernetes. - Reconstructs the user state map in the foreground before backgrounding - the reconciliation and monitor tasks. + Runs at Nublado controller startup to reconcile internal state with + the content of Kubernetes. This picks up changes made in Kubernetes + outside of the controller, and is also responsible for building the + internal state from the current state of Kubernetes during startup. + It is called during startup and from a background task. """ + self._logger.info("Reconciling file server state") namespace = self._config.namespace - timeout = Timeout("Reading namespace", KUBERNETES_REQUEST_TIMEOUT) - if not await self._storage.namespace_exists(namespace, timeout): - raise MissingObjectError( - "File server namespace missing", - kind="Namespace", - name=namespace, + timeout = Timeout( + "Reading file server state", KUBERNETES_REQUEST_TIMEOUT + ) + seen = await self._storage.read_fileserver_state(namespace, timeout) + known_users = {k for k, v in self._servers.items() if v.running} + + # Check each fileserver we found to see if it's properly running. If + # not, delete it and remove it from the seen map. + to_delete = set() + invalid = set() + for username, state in seen.items(): + valid = self._builder.is_valid(username, state) + if valid: + self._servers[username] = _State(running=valid) + elif username in self._servers: + to_delete.add(username) + else: + invalid.add(username) + seen = {k: v for k, v in seen.items() if k not in to_delete} + + # Also tidy up any supposedly-running users that we didn't find. They + # may have some objects remaining. This should only be possible if + # something outside of the controller deleted resources. + seen_users = set(seen.keys()) + for user in (known_users - seen_users) | to_delete: + msg = "Removing broken fileserver for user" + self._logger.warning(msg, user=user) + await self.delete(user) + for username in invalid: + if username in self._servers: + continue + msg = "File server present but not running, deleteing" + self._logger.info(msg, user=username) + + # There is an unavoidable race condition where if the user for + # this invalid file server attempts to create a valid file server + # just as we make this call, we may delete parts of their new file + # server. Solving this is complicated; live with it for now. + name = self._builder.build_name(username) + timeout = Timeout( + "Deleting file server", KUBERNETES_REQUEST_TIMEOUT, username + ) + await self._storage.delete( + name, self._config.namespace, username, timeout ) - await self._reconcile_file_servers() - self._scheduler = Scheduler() - self._logger.info("Starting file server watcher task") - await self._scheduler.spawn(self._watch_file_servers()) - self._logger.info("Starting file server periodic reconciliation task") - await self._scheduler.spawn(self._reconcile_loop()) + self._logger.debug("File server reconciliation complete") - async def stop(self) -> None: - """Stop background file server tasks. + async def watch_servers(self) -> None: + """Watch the file server namespace for completed file servers. - Any started file servers will keep running. + Each file server has a timeout, after which it exits. When one exits, + we want to clean up its Kubernetes objects and update its state. This + method runs as a background task watching for changes and triggers the + delete when appropriate. """ - if not self._scheduler: - msg = "File server background tasks were already stopped" - self._logger.warning(msg) - return - self._logger.info("Stopping file server background tasks") - await self._scheduler.close() - self._scheduler = None + namespace = self._config.namespace + while True: + try: + async for change in self._storage.watch_pods(namespace): + if change.phase in (PodPhase.FAILED, PodPhase.SUCCEEDED): + pod = change.pod + username = self._builder.get_username_for_pod(pod) + if not username: + continue + self._logger.info( + "File server exited, cleaning up", + phase=change.phase.value, + user=username, + ) + with contextlib.suppress(UnknownUserError): + await self.delete(username) + except Exception as e: + self._logger.exception("Error watching file server pod phase") + await self._maybe_post_slack_exception(e) + await asyncio.sleep(1) async def _create_file_server( self, user: GafaelfawrUserInfo, timeout: Timeout @@ -267,104 +315,3 @@ async def _maybe_post_slack_exception( await self._slack.post_exception(exc) else: await self._slack.post_uncaught_exception(exc) - - async def _reconcile_file_servers(self) -> None: - """Reconcile internal state with Kubernetes. - - Runs at Nublado controller startup to reconcile the initially empty - state map with the contents of Kubernetes. - """ - self._logger.debug("Reconciling file server state") - namespace = self._config.namespace - timeout = Timeout( - "Reading file server state", KUBERNETES_REQUEST_TIMEOUT - ) - seen = await self._storage.read_fileserver_state(namespace, timeout) - known_users = {k for k, v in self._servers.items() if v.running} - - # Check each fileserver we found to see if it's properly running. If - # not, delete it and remove it from the seen map. - to_delete = set() - invalid = set() - for username, state in seen.items(): - valid = self._builder.is_valid(username, state) - if valid: - self._servers[username] = _State(running=valid) - elif username in self._servers: - to_delete.add(username) - else: - invalid.add(username) - seen = {k: v for k, v in seen.items() if k not in to_delete} - - # Also tidy up any supposedly-running users that we didn't find. They - # may have some objects remaining. This should only be possible if - # something outside of the controller deleted resources. - seen_users = set(seen.keys()) - for user in (known_users - seen_users) | to_delete: - msg = "Removing broken fileserver for user" - self._logger.warning(msg, user=user) - await self.delete(user) - for username in invalid: - if username in self._servers: - continue - msg = "File server present but not running, deleteing" - self._logger.info(msg, user=username) - - # There is an unavoidable race condition where if the user for - # this invalid file server attempts to create a valid file server - # just as we make this call, we may delete parts of their new file - # server. Solving this is complicated; live with it for now. - name = self._builder.build_name(username) - timeout = Timeout( - "Deleting file server", KUBERNETES_REQUEST_TIMEOUT, username - ) - await self._storage.delete( - name, self._config.namespace, username, timeout - ) - self._logger.debug("File server reconciliation complete") - - async def _reconcile_loop(self) -> None: - """Run in the background by `start`, stopped with `stop`.""" - while True: - start = current_datetime(microseconds=True) - try: - await self._reconcile_file_servers() - except Exception as e: - self._logger.exception("Unable to reconcile file servers") - await self._maybe_post_slack_exception(e) - now = current_datetime(microseconds=True) - delay = FILE_SERVER_REFRESH_INTERVAL - (now - start) - if delay.total_seconds() < 1: - msg = "File server reconciliation is running continuously" - self._logger.warning(msg) - else: - await asyncio.sleep(delay.total_seconds()) - - async def _watch_file_servers(self) -> None: - """Watch the file server namespace for completed file servers. - - Each file server has a timeout, after which it exits. When one exits, - we want to clean up its Kubernetes objects and update its state. This - method runs as a background task watching for changes and triggers the - delete when appropriate. - """ - namespace = self._config.namespace - while True: - try: - async for change in self._storage.watch_pods(namespace): - if change.phase in (PodPhase.FAILED, PodPhase.SUCCEEDED): - pod = change.pod - username = self._builder.get_username_for_pod(pod) - if not username: - continue - self._logger.info( - "File server exited, cleaning up", - phase=change.phase.value, - user=username, - ) - with contextlib.suppress(UnknownUserError): - await self.delete(username) - except Exception as e: - self._logger.exception("Error watching file server pod phase") - await self._maybe_post_slack_exception(e) - await asyncio.sleep(1) diff --git a/controller/src/controller/services/image.py b/controller/src/controller/services/image.py index 033eb547a..a94c39c8c 100644 --- a/controller/src/controller/services/image.py +++ b/controller/src/controller/services/image.py @@ -4,13 +4,11 @@ import asyncio -from aiojobs import Scheduler from kubernetes_asyncio.client import V1Node -from safir.datetime import current_datetime from safir.slack.webhook import SlackWebhookClient from structlog.stdlib import BoundLogger -from ..constants import IMAGE_REFRESH_INTERVAL, KUBERNETES_REQUEST_TIMEOUT +from ..constants import KUBERNETES_REQUEST_TIMEOUT from ..exceptions import UnknownDockerImageError from ..models.domain.docker import DockerReference from ..models.domain.image import MenuImage, MenuImages, NodeData @@ -93,8 +91,7 @@ def __init__( self._slack_client = slack_client self._logger = logger - # Background task management. - self._scheduler: Scheduler | None = None + # Prepuller synchronization. self._lock = asyncio.Lock() self._refreshed = asyncio.Event() @@ -310,9 +307,9 @@ def prepull_status(self) -> PrepullerStatus: async def refresh(self) -> None: """Refresh data from Docker and Kubernetes. - Normally run in the background by the task started with `start`, but - can be called directly to force an immediate refresh. Does not catch - exceptions; the caller must do that if desired. + Normally run in a background task, but can be called directly to force + an immediate refresh. Does not catch exceptions; the caller must do + that if desired. """ timeout = Timeout("List nodes", KUBERNETES_REQUEST_TIMEOUT) selector = self._node_selector @@ -323,32 +320,7 @@ async def refresh(self) -> None: self._nodes = self._build_nodes(to_prepull, node_list, cached) self._to_prepull = to_prepull self._logger.info("Refreshed image information") - - async def start(self) -> None: - """Start a periodic refresh as a background task. - - Does not return until the background refresh has completed its first - run. We don't want to start answering user requests until we have - populated our lists of available images; otherwise, we might return - bogus information for the spawner form. - """ - if self._scheduler: - msg = "Image service already running, cannot start" - self._logger.warning(msg) - return - self._logger.info("Starting image service") - self._scheduler = Scheduler() - await self._scheduler.spawn(self._refresh_loop()) - await self._refreshed.wait() - - async def stop(self) -> None: - """Stop the background refresh task.""" - if not self._scheduler: - self._logger.warning("Image service was already stopped") - return - self._logger.info("Stopping image service") - await self._scheduler.close() - self._scheduler = None + self._refreshed.set() async def prepuller_wait(self) -> None: """Wait for a data refresh. @@ -406,25 +378,3 @@ def _build_nodes( comment=tolerate.comment, ) return node_data - - async def _refresh_loop(self) -> None: - """Run in the background by `start`, stopped with `stop`.""" - while True: - start = current_datetime() - try: - await self.refresh() - self._refreshed.set() - except Exception as e: - # On failure, log the exception and do not indicate we hve - # updated our data but otherwise continue as normal, including - # the delay. This will provide some time for whatever the - # problem was to be resolved. - self._logger.exception("Unable to refresh image information") - if self._slack_client: - await self._slack_client.post_uncaught_exception(e) - delay = IMAGE_REFRESH_INTERVAL - (current_datetime() - start) - if delay.total_seconds() < 1: - msg = "Image refresh is running continuously" - self._logger.warning(msg) - else: - await asyncio.sleep(delay.total_seconds()) diff --git a/controller/src/controller/services/lab.py b/controller/src/controller/services/lab.py index 172c6d890..dedc3b10a 100644 --- a/controller/src/controller/services/lab.py +++ b/controller/src/controller/services/lab.py @@ -10,9 +10,7 @@ from enum import Enum from typing import Self -from aiojobs import Scheduler from safir.asyncio import AsyncMultiQueue -from safir.datetime import current_datetime from safir.slack.blockkit import ( SlackException, ) @@ -20,7 +18,7 @@ from structlog.stdlib import BoundLogger from ..config import LabConfig -from ..constants import KUBERNETES_REQUEST_TIMEOUT, LAB_STATE_REFRESH_INTERVAL +from ..constants import KUBERNETES_REQUEST_TIMEOUT from ..exceptions import ( ControllerTimeoutError, InsufficientQuotaError, @@ -172,9 +170,6 @@ def __init__( self._slack = slack_client self._logger = logger - # Background task management. - self._scheduler: Scheduler | None = None - # Mapping of usernames to internal lab state. self._labs: dict[str, _State] = {} @@ -497,34 +492,117 @@ async def list_lab_users(self, *, only_running: bool = False) -> list[str]: else: return [u for u, s in self._labs.items() if s.state] - async def start(self) -> None: - """Synchronize with Kubernetes and start a background refresh task. + async def reap_spawners(self) -> None: + """Wait for spawner tasks to complete and record their status. + + When a user spawns a lab, the lab controller creates a background task + to create the Kubernetes objects and then wait for the pod to finish + starting. Something needs to await those tasks so that they can be + cleanly finalized and to catch any uncaught exceptions. That function + is performed by a background task running this method. - Examine Kubernetes for current user lab state, update our internal - data structures accordingly, and then start a background refresh task - that does this periodically. (Labs may be destroyed by Kubernetes node - upgrades, for example.) + Notes + ----- + Doing this properly is a bit tricky, since we have to avoid both + busy-waiting when no operations are in progress and not reaping + anything if one operation keeps running forever. The approach used + here is to have every spawn set the ``_spawner_done`` `asyncio.Event` + when it is complete, and use that as a trigger for doing a reaper + pass. Deletes do not do this since they're normally awaited by the + caller and thus don't need to be reaped separately. """ - if self._scheduler: - msg = "User lab state tasks already running, cannot start again" - self._logger.warning(msg) - return - await self._reconcile_lab_state() - self._logger.info("Starting periodic reconciliation task") - self._scheduler = Scheduler() - await self._scheduler.spawn(self._reconcile_loop()) - self._logger.info("Starting reaper for spawn monitoring tasks") - await self._scheduler.spawn(self._reap_spawners()) - - async def stop(self) -> None: - """Stop the background refresh task.""" - if not self._scheduler: - msg = "User lab state background tasks were already stopped" - self._logger.warning(msg) + while True: + await self._spawner_done.wait() + self._spawner_done.clear() + for username, lab in self._labs.items(): + if lab.monitor.in_progress and lab.monitor.is_done(): + try: + await lab.monitor.wait() + except NoOperationError: + # There is a race condition with deletes, since the + # task doing the delete kicks it off and then + # immediately waits for its completion. We may + # discover the completed task right before that wait + # wakes up and reaps it, and then have no task by the + # time we call wait ourselves. This should be harmless + # and ignorable. + pass + except Exception as e: + msg = "Uncaught exception in monitor thread" + self._logger.exception(msg, user=username) + await self._maybe_post_slack_exception(e, username) + if lab.state: + lab.state.status = LabStatus.FAILED + + async def reconcile(self) -> None: + """Reconcile user lab state with Kubernetes. + + This method is called on startup and then periodically from a + background thread to check Kubernetes and ensure the in-memory record + of the user's lab state matches reality. On startup, it also needs to + recreate the internal state from the contents of Kubernetes. + + Raises + ------ + KubernetesError + Raised if there is some failure in a Kubernetes API call. + """ + self._logger.info("Reconciling user lab state with Kubernetes") + known_users = set(self._labs.keys()) + + # Gather information about all extant Kubernetes namespaces and delete + # any malformed namespaces for which no operation is in progress. + observed = await self._gather_current_state() + + # If the set of users we expected to see changed during + # reconciliation, that means someone added a new user while we were + # reconciling. Play it safe and skip this background update; we'll + # catch any inconsistencies the next time around. + # + # From this point forward, make sure not to do any asyncio operations + # until we've finished reconciling state, since if we yield control + # our state may change out from under us. + if set(self._labs.keys()) != known_users: + msg = "Known users changed during reconciliation, skipping" + self._logger.info(msg) return - self._logger.info("Stopping user lab state background tasks") - await self._scheduler.close() - self._scheduler = None + + # First pass: check all users already recorded in internal state + # against Kubernetes and correct them (or remove them) if needed. + to_monitor = self._reconcile_known_users(observed) + + # Second pass: take observed state and create any missing internal + # state. This is the normal case after a restart of the lab + # controller. + for username in set(observed.keys()) - known_users: + msg = f"Creating record for user {username} from Kubernetes" + self._logger.info(msg) + self._labs[username] = _State( + state=observed[username], + monitor=_LabMonitor( + username=username, + slack_client=self._slack, + logger=self._logger, + ), + ) + if observed[username].status == LabStatus.PENDING: + to_monitor.add(username) + + # If we discovered any pods unexpectedly in the pending state, kick + # off monitoring jobs to wait for them to become ready and handle + # timeouts if they never do. We've now fixed internal state, so it's + # safe to do asyncio operations again. + for username in sorted(to_monitor): + await self._monitor_pending_spawn(username) + + # Finally, for all labs in failed or terminated state (spawn failed, + # killed by the idle culler, killed by the OOM killer, etc.), clean up + # the lab as long as the user hasn't started some other operation in + # the meantime. + await self._delete_completed_labs() + + async def stop_monitor_tasks(self) -> None: + """Stop any tasks that are waiting for labs to spawn.""" self._logger.info("Stopping spawning monitor tasks") labs = self._labs self._labs = {} @@ -750,48 +828,6 @@ async def _monitor_pending_spawn(self, username: str) -> None: with contextlib.suppress(OperationConflictError): await lab.monitor.monitor(operation, timeout, self._spawner_done) - async def _reap_spawners(self) -> None: - """Wait for spawner tasks to complete and record their status. - - When a user spawns a lab, the lab controller creates a background task - to create the Kubernetes objects and then wait for the pod to finish - starting. Something needs to await those tasks so that they can be - cleanly finalized and to catch any uncaught exceptions. That function - is performed by a background task running this method. - - Notes - ----- - Doing this properly is a bit tricky, since we have to avoid both - busy-waiting when no operations are in progress and not reaping - anything if one operation keeps running forever. The approach used - here is to have every spawn set the ``_spawner_done`` `asyncio.Event` - when it is complete, and use that as a trigger for doing a reaper - pass. Deletes do not do this since they're normally awaited by the - caller and thus don't need to be reaped separately. - """ - while True: - await self._spawner_done.wait() - self._spawner_done.clear() - for username, lab in self._labs.items(): - if lab.monitor.in_progress and lab.monitor.is_done(): - try: - await lab.monitor.wait() - except NoOperationError: - # There is a race condition with deletes, since the - # task doing the delete kicks it off and then - # immediately waits for its completion. We may - # discover the completed task right before that wait - # wakes up and reaps it, and then have no task by the - # time we call wait ourselves. This should be harmless - # and ignorable. - pass - except Exception as e: - msg = "Uncaught exception in monitor thread" - self._logger.exception(msg, user=username) - await self._maybe_post_slack_exception(e, username) - if lab.state: - lab.state.status = LabStatus.FAILED - def _reconcile_known_users( self, observed: dict[str, UserLabState] ) -> set[str]: @@ -853,91 +889,6 @@ def _reconcile_known_users( to_monitor.add(username) return to_monitor - async def _reconcile_lab_state(self) -> None: - """Reconcile user lab state with Kubernetes. - - This method is called on startup and then periodically from a - background thread to check Kubernetes and ensure the in-memory record - of the user's lab state matches reality. On startup, it also needs to - recreate the internal state from the contents of Kubernetes. - - Raises - ------ - KubernetesError - Raised if there is some failure in a Kubernetes API call. - """ - self._logger.info("Reconciling user lab state with Kubernetes") - known_users = set(self._labs.keys()) - - # Gather information about all extant Kubernetes namespaces and delete - # any malformed namespaces for which no operation is in progress. - observed = await self._gather_current_state() - - # If the set of users we expected to see changed during - # reconciliation, that means someone added a new user while we were - # reconciling. Play it safe and skip this background update; we'll - # catch any inconsistencies the next time around. - # - # From this point forward, make sure not to do any asyncio operations - # until we've finished reconciling state, since if we yield control - # our state may change out from under us. - if set(self._labs.keys()) != known_users: - msg = "Known users changed during reconciliation, skipping" - self._logger.info(msg) - return - - # First pass: check all users already recorded in internal state - # against Kubernetes and correct them (or remove them) if needed. - to_monitor = self._reconcile_known_users(observed) - - # Second pass: take observed state and create any missing internal - # state. This is the normal case after a restart of the lab - # controller. - for username in set(observed.keys()) - known_users: - msg = f"Creating record for user {username} from Kubernetes" - self._logger.info(msg) - self._labs[username] = _State( - state=observed[username], - monitor=_LabMonitor( - username=username, - slack_client=self._slack, - logger=self._logger, - ), - ) - if observed[username].status == LabStatus.PENDING: - to_monitor.add(username) - - # If we discovered any pods unexpectedly in the pending state, kick - # off monitoring jobs to wait for them to become ready and handle - # timeouts if they never do. We've now fixed internal state, so it's - # safe to do asyncio operations again. - for username in sorted(to_monitor): - await self._monitor_pending_spawn(username) - - # Finally, for all labs in failed or terminated state (spawn failed, - # killed by the idle culler, killed by the OOM killer, etc.), clean up - # the lab as long as the user hasn't started some other operation in - # the meantime. - await self._delete_completed_labs() - - async def _reconcile_loop(self) -> None: - """Run in the background by `start`, stopped with `stop`.""" - while True: - start = current_datetime(microseconds=True) - try: - await self._reconcile_lab_state() - except Exception as e: - self._logger.exception("Unable to reconcile user lab state") - if self._slack: - await self._slack.post_uncaught_exception(e) - now = current_datetime(microseconds=True) - delay = LAB_STATE_REFRESH_INTERVAL - (now - start) - if delay.total_seconds() < 1: - msg = "User lab state reconciliation is running continuously" - self._logger.warning(msg) - else: - await asyncio.sleep(delay.total_seconds()) - async def _spawn_lab( self, *, diff --git a/controller/src/controller/services/prepuller.py b/controller/src/controller/services/prepuller.py index e7ea098c3..a177a49b9 100644 --- a/controller/src/controller/services/prepuller.py +++ b/controller/src/controller/services/prepuller.py @@ -2,12 +2,11 @@ import asyncio -from aiojobs import Scheduler from safir.slack.blockkit import SlackException from safir.slack.webhook import SlackWebhookClient from structlog.stdlib import BoundLogger -from ..constants import IMAGE_REFRESH_INTERVAL, PREPULLER_POD_TIMEOUT +from ..constants import PREPULLER_POD_TIMEOUT from ..models.domain.rspimage import RSPImage from ..storage.kubernetes.pod import PodStorage from ..storage.metadata import MetadataStorage @@ -59,58 +58,6 @@ def __init__( self._slack = slack_client self._logger = logger - # Scheduler to manage background tasks that prepull images to nodes. - self._scheduler: Scheduler | None = None - - async def start(self) -> None: - """Start the prepuller. - - The prepuller normally runs for the lifetime of the process in the - background, but when first called, wait for the image data to populate - in the foreground. This ensures that we populate image data before - FastAPI completes its startup event, and therefore before we start - answering requests. That in turn means a more accurate health check, - since until we have populated image data our API is fairly useless. - (It also makes life easier for the test suite.) - """ - if self._scheduler: - msg = "Prepuller already running, cannot start" - self._logger.warning(msg) - return - self._logger.info("Starting prepuller tasks") - await self._image_service.prepuller_wait() - self._running = True - self._scheduler = Scheduler() - await self._scheduler.spawn(self._prepull_loop()) - - async def stop(self) -> None: - """Stop the prepuller.""" - if not self._scheduler: - self._logger.warning("Prepuller was already stopped") - return - self._logger.info("Stopping prepuller") - await self._scheduler.close() - self._scheduler = None - - async def _prepull_loop(self) -> None: - """Continually prepull images in a loop. - - When the prepuller is stopped, we will orphan prepuller pods. Avoiding - this is difficult and unreliable. The prepuller should instead detect - orphaned pods on startup and clean them up. - """ - while True: - try: - await self.prepull_images() - await self._image_service.prepuller_wait() - except Exception as e: - self._logger.exception("Uncaught exception in prepuller") - if self._slack: - await self._slack.post_uncaught_exception(e) - pause = IMAGE_REFRESH_INTERVAL.total_seconds() - self._logger.warning("Pausing failed prepuller for {pause}s") - await asyncio.sleep(pause) - async def prepull_images(self) -> None: """Prepull missing images.""" missing_by_node = self._image_service.missing_images_by_node() @@ -119,13 +66,10 @@ async def prepull_images(self) -> None: # node at a time. We do this by creating a separate background task # per node, each of which works through the list of images that are # missing on that node. - node_tasks = set() - for node, images in missing_by_node.items(): - self._logger.debug(f"Creating prepull task for {node}") - prepull_call = self._prepull_images_for_node(node, images) - task = asyncio.create_task(prepull_call) - node_tasks.add(task) - await asyncio.gather(*node_tasks) + async with asyncio.TaskGroup() as tg: + for node, images in missing_by_node.items(): + self._logger.debug(f"Creating prepull task for {node}") + tg.create_task(self._prepull_images_for_node(node, images)) self._logger.debug("Finished prepulling all images") async def _prepull_images_for_node( diff --git a/controller/src/controller/storage/kubernetes/fileserver.py b/controller/src/controller/storage/kubernetes/fileserver.py index fff705f95..376121e65 100644 --- a/controller/src/controller/storage/kubernetes/fileserver.py +++ b/controller/src/controller/storage/kubernetes/fileserver.py @@ -18,7 +18,6 @@ from .custom import GafaelfawrIngressStorage from .deleter import JobStorage, PersistentVolumeClaimStorage, ServiceStorage from .ingress import IngressStorage -from .namespace import NamespaceStorage from .pod import PodStorage __all__ = ["FileserverStorage"] @@ -48,7 +47,6 @@ def __init__(self, api_client: ApiClient, logger: BoundLogger) -> None: self._gafaelfawr = GafaelfawrIngressStorage(api_client, logger) self._ingress = IngressStorage(api_client, logger) self._job = JobStorage(api_client, logger) - self._namespace = NamespaceStorage(api_client, logger) self._pod = PodStorage(api_client, logger) self._pvc = PersistentVolumeClaimStorage(api_client, logger) self._service = ServiceStorage(api_client, logger) @@ -158,23 +156,6 @@ async def delete( for pvc in pvcs: await self._pvc.delete(pvc.metadata.name, namespace, timeout) - async def namespace_exists(self, name: str, timeout: Timeout) -> bool: - """Check whether a namespace is present. - - Parameters - ---------- - name - Name of the namespace. - timeout - Timeout on operation. - - Returns - ------- - bool - `True` if the namespace is present, `False` otherwise. - """ - return await self._namespace.read(name, timeout) is not None - async def read_fileserver_state( self, namespace: str, timeout: Timeout ) -> dict[str, FileserverStateObjects]: diff --git a/controller/tests/conftest.py b/controller/tests/conftest.py index 9e38a7bd3..8ab9fe88f 100644 --- a/controller/tests/conftest.py +++ b/controller/tests/conftest.py @@ -81,6 +81,7 @@ async def factory( mock_kubernetes.set_nodes_for_test(nodes) async with Factory.standalone(config) as factory: yield factory + await factory.stop_background_services() @pytest.fixture diff --git a/controller/tests/services/prepuller_test.py b/controller/tests/services/prepuller_test.py index 225190a7c..6338e4982 100644 --- a/controller/tests/services/prepuller_test.py +++ b/controller/tests/services/prepuller_test.py @@ -66,8 +66,7 @@ async def test_docker( factory: Factory, config: Config, mock_kubernetes: MockKubernetesApi ) -> None: """Test the prepuller service configured to talk to Docker.""" - await factory.image_service.start() - await asyncio.sleep(0.2) + await factory.image_service.refresh() # With the default data, the image service running, and no prepuller, we # should see node1 as up-to-date but node2 out-of-date. @@ -80,7 +79,7 @@ async def test_docker( assert image.nodes == ["node1", "node2"] # Start the prepuller and give it a moment to run. - await factory.prepuller.start() + await factory.start_background_services() await asyncio.sleep(0.2) # The default data configures Kubernetes with missing images on some @@ -105,10 +104,6 @@ async def test_docker( if image.tag == "d_2077_10_23": assert image.nodes == ["node1", "node2"] - # Stop everything. (Ideally this should be done in a try/finally block.) - await factory.image_service.stop() - await factory.prepuller.stop() - @pytest.mark.asyncio async def test_gar( @@ -273,9 +268,8 @@ def callback(method: str, *args: Any) -> None: mock_kubernetes.error_callback = callback - await factory.start_background_services() - await asyncio.sleep(0.2) - + await factory.image_service.refresh() + await factory.prepuller.prepull_images() obj = "nublado/prepull-d-2077-10-23-node2" error = f"Error creating object (Pod {obj}, status 400)" assert mock_slack.messages == [ diff --git a/docs/dev/api/controller.rst b/docs/dev/api/controller.rst index b3690ad54..34e94ae0f 100644 --- a/docs/dev/api/controller.rst +++ b/docs/dev/api/controller.rst @@ -8,6 +8,9 @@ This documentation therefore exists only to assist developers and code analysis .. automodapi:: controller :include-all-objects: +.. automodapi:: controller.background + :include-all-objects: + .. automodapi:: controller.config :include-all-objects: