diff --git a/controller/src/controller/background.py b/controller/src/controller/background.py new file mode 100644 index 000000000..a38759385 --- /dev/null +++ b/controller/src/controller/background.py @@ -0,0 +1,201 @@ +"""Nublado controller background processing.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Coroutine +from datetime import timedelta + +from aiojobs import Scheduler +from safir.datetime import current_datetime +from safir.slack.webhook import SlackWebhookClient +from structlog.stdlib import BoundLogger + +from .constants import ( + FILE_SERVER_RECONCILE_INTERVAL, + IMAGE_REFRESH_INTERVAL, + LAB_RECONCILE_INTERVAL, +) +from .services.fileserver import FileserverManager +from .services.image import ImageService +from .services.lab import LabManager +from .services.prepuller import Prepuller + +__all__ = ["BackgroundTaskManager"] + + +class BackgroundTaskManager: + """Manage Nublado controller background tasks. + + While the Nublado controller is running, it needs to perform several + periodic or continuous background tasks, namely: + + #. Refresh the list of available remote images and local cached images. + #. Prepull images to all eligible nodes. + #. Reconcile Kubernetes lab state with internal data structures. + #. Reap tasks that were monitoring lab spawning or deletion. + #. Watch file servers for changes in pod status (startup or timeout). + #. Reconcile Kubernetes file server state with internal data structures. + + This class manages all of these background tasks including, where + relevant, their schedules. It only does the task management; all of the + work of these tasks is done by methods on the underlying service objects. + + This class is created during startup and tracked as part of the + `~controller.factory.ProcessContext`. + + Parameters + ---------- + image_service + Image service. + prepuller + Prepuller service. + lab_manager + Lab management service. + fileserver_manager + File server management service. + slack_client + Optional Slack webhook client for alerts. + logger + Logger to use. + """ + + def __init__( + self, + *, + image_service: ImageService, + prepuller: Prepuller, + lab_manager: LabManager, + fileserver_manager: FileserverManager | None, + slack_client: SlackWebhookClient | None, + logger: BoundLogger, + ) -> None: + self._image_service = image_service + self._prepuller = prepuller + self._lab_manager = lab_manager + self._fileserver_manager = fileserver_manager + self._slack = slack_client + self._logger = logger + + self._scheduler: Scheduler | None = None + + async def start(self) -> None: + """Start all background tasks. + + Intended to be called during Nublado controller startup. Several of + the background tasks are run in the foreground first to ensure + internal state is valid before starting to serve requests. + """ + if self._scheduler: + msg = "Background tasks already running, cannot start" + self._logger.warning(msg) + return + self._scheduler = Scheduler() + + # Run some of the tasks in the foreground first to ensure internal + # data is consistent after startup. All of them can run in parallel. + async with asyncio.TaskGroup() as tg: + self._logger.info("Populating internal state") + tg.create_task(self._image_service.refresh()) + tg.create_task(self._lab_manager.reconcile()) + if self._fileserver_manager: + tg.create_task(self._fileserver_manager.reconcile()) + + # Now, start all of the tasks in the background. + coros = [ + self._loop( + self._image_service.refresh(), + IMAGE_REFRESH_INTERVAL, + "refreshing image data", + ), + self._prepull_loop(), + self._loop( + self._lab_manager.reconcile(), + LAB_RECONCILE_INTERVAL, + "reconciling lab state", + ), + self._lab_manager.reap_spawners(), + ] + if self._fileserver_manager: + coros.append( + self._loop( + self._fileserver_manager.reconcile(), + FILE_SERVER_RECONCILE_INTERVAL, + "reconciling file server state", + ) + ) + coros.append(self._fileserver_manager.watch_servers()) + self._logger.info("Starting background tasks") + for coro in coros: + await self._scheduler.spawn(coro) + + async def stop(self) -> None: + """Stop the background tasks.""" + if not self._scheduler: + msg = "Background tasks were already stopped" + self._logger.warning(msg) + return + self._logger.info("Stopping background tasks") + await self._scheduler.close() + self._scheduler = None + await self._lab_manager.stop_monitor_tasks() + + async def _loop( + self, + coro: Coroutine[None, None, None], + interval: timedelta, + description: str, + ) -> None: + """Wrap a coroutine in a periodic scheduling loop. + + The provided coroutine is run on every interval. This method always + delays by the interval first before running the coroutine for the + first time. + + Parameters + ---------- + coro + Coroutine to run repeatedly. + interval + Scheduling interval to use. + description + Description of the background task for error reporting. + """ + while True: + start = current_datetime(microseconds=True) + try: + await coro + except Exception as e: + # On failure, log the exception but otherwise continue as + # normal, including the delay. This will provide some time for + # whatever the problem was to be resolved. + elapsed = current_datetime(microseconds=True) - start + msg = f"Uncaught exception {description}" + self._logger.exception(msg, delay=elapsed.total_seconds) + if self._slack: + await self._slack.post_uncaught_exception(e) + delay = interval - (current_datetime(microseconds=True) - start) + if delay.total_seconds() < 1: + msg = f"{description.capitalize()} is running continuously" + self._logger.warning(msg) + else: + await asyncio.sleep(delay.total_seconds()) + + async def _prepull_loop(self) -> None: + """Execute the prepuller in an infinite loop. + + The prepuller loop uses an `asyncio.Event` set by the image service to + decide when to run instead of a simple interval. This ensures the + prepuller runs immediately after a possible image list update. + """ + while True: + try: + await self._image_service.prepuller_wait() + await self._prepuller.prepull_images() + except Exception as e: + self._logger.exception("Uncaught exception prepulling images") + if self._slack: + await self._slack.post_uncaught_exception(e) + pause = IMAGE_REFRESH_INTERVAL.total_seconds() + self._logger.warning("Pausing failed prepuller for {pause}s") + await asyncio.sleep(pause) diff --git a/controller/src/controller/constants.py b/controller/src/controller/constants.py index ff7b264d5..f7788ba33 100644 --- a/controller/src/controller/constants.py +++ b/controller/src/controller/constants.py @@ -9,12 +9,12 @@ "DOCKER_CREDENTIALS_PATH", "DROPDOWN_SENTINEL_VALUE", "GROUPNAME_REGEX", - "FILE_SERVER_REFRESH_INTERVAL", + "FILE_SERVER_RECONCILE_INTERVAL", "IMAGE_REFRESH_INTERVAL", "KUBERNETES_NAME_PATTERN", "KUBERNETES_REQUEST_TIMEOUT", "LAB_COMMAND", - "LAB_STATE_REFRESH_INTERVAL", + "LAB_RECONCILE_INTERVAL", "LIMIT_TO_REQUEST_RATIO", "METADATA_PATH", "MOUNT_PATH_DOWNWARD_API", @@ -47,7 +47,7 @@ DROPDOWN_SENTINEL_VALUE = "use_image_from_dropdown" """Used in the lab form for ``image_list`` when ``image_dropdown`` is used.""" -FILE_SERVER_REFRESH_INTERVAL = timedelta(minutes=60) +FILE_SERVER_RECONCILE_INTERVAL = timedelta(minutes=60) """How frequently to refresh file server state from Kubernetes. This will detect when file servers disappear out from under us, such as being @@ -75,7 +75,7 @@ This should be configurable but isn't yet. """ -LAB_STATE_REFRESH_INTERVAL = timedelta(minutes=60) +LAB_RECONCILE_INTERVAL = timedelta(minutes=60) """How frequently to refresh user lab state from Kubernetes. This will detect when user labs disappear out from under us without user diff --git a/controller/src/controller/factory.py b/controller/src/controller/factory.py index 3fb1a6e53..ae098f8e9 100644 --- a/controller/src/controller/factory.py +++ b/controller/src/controller/factory.py @@ -14,6 +14,7 @@ from safir.slack.webhook import SlackWebhookClient from structlog.stdlib import BoundLogger +from .background import BackgroundTaskManager from .config import Config from .exceptions import NotConfiguredError from .models.v1.prepuller_config import DockerSourceConfig, GARSourceConfig @@ -75,6 +76,9 @@ class ProcessContext: _fileserver_manager: FileserverManager | None """State management for user file servers.""" + background: BackgroundTaskManager + """Manager for background tasks.""" + @classmethod async def from_config(cls, config: Config) -> Self: """Create a new process context from the controller configuration. @@ -148,32 +152,42 @@ async def from_config(cls, config: Config) -> Self: slack_client=slack_client, logger=logger, ) + prepuller = Prepuller( + image_service=image_service, + prepuller_builder=PrepullerBuilder( + metadata_storage=metadata_storage, + pull_secret=config.lab.pull_secret, + ), + metadata_storage=metadata_storage, + pod_storage=PodStorage(kubernetes_client, logger), + slack_client=slack_client, + logger=logger, + ) + lab_manager = LabManager( + config=config.lab, + image_service=image_service, + lab_builder=LabBuilder(config.lab, config.base_url, logger), + metadata_storage=metadata_storage, + lab_storage=LabStorage(kubernetes_client, logger), + slack_client=slack_client, + logger=logger, + ) return cls( config=config, http_client=http_client, image_service=image_service, kubernetes_client=kubernetes_client, - prepuller=Prepuller( - image_service=image_service, - prepuller_builder=PrepullerBuilder( - metadata_storage=metadata_storage, - pull_secret=config.lab.pull_secret, - ), - metadata_storage=metadata_storage, - pod_storage=PodStorage(kubernetes_client, logger), - slack_client=slack_client, - logger=logger, - ), - lab_manager=LabManager( - config=config.lab, + prepuller=prepuller, + lab_manager=lab_manager, + _fileserver_manager=fileserver_manager, + background=BackgroundTaskManager( image_service=image_service, - lab_builder=LabBuilder(config.lab, config.base_url, logger), - metadata_storage=metadata_storage, - lab_storage=LabStorage(kubernetes_client, logger), + prepuller=prepuller, + lab_manager=lab_manager, + fileserver_manager=fileserver_manager, slack_client=slack_client, logger=logger, ), - _fileserver_manager=fileserver_manager, ) @property @@ -189,11 +203,7 @@ async def aclose(self) -> None: async def start(self) -> None: """Start the background threads running.""" - await self.image_service.start() - await self.prepuller.start() - await self.lab_manager.start() - if self._fileserver_manager: - await self._fileserver_manager.start() + await self.background.start() async def stop(self) -> None: """Clean up a process context. @@ -201,11 +211,7 @@ async def stop(self) -> None: Called during shutdown, or before recreating the process context using a different configuration. """ - if self._fileserver_manager: - await self._fileserver_manager.stop() - await self.prepuller.stop() - await self.image_service.stop() - await self.lab_manager.stop() + await self.background.stop() class Factory: diff --git a/controller/src/controller/services/fileserver.py b/controller/src/controller/services/fileserver.py index b11883d8e..b20d06fd2 100644 --- a/controller/src/controller/services/fileserver.py +++ b/controller/src/controller/services/fileserver.py @@ -6,19 +6,15 @@ import contextlib from dataclasses import dataclass, field -from aiojobs import Scheduler -from safir.datetime import current_datetime from safir.slack.blockkit import SlackException from safir.slack.webhook import SlackWebhookClient from structlog.stdlib import BoundLogger from ..config import EnabledFileserverConfig from ..constants import ( - FILE_SERVER_REFRESH_INTERVAL, KUBERNETES_REQUEST_TIMEOUT, ) from ..exceptions import ( - MissingObjectError, UnknownUserError, ) from ..models.domain.gafaelfawr import GafaelfawrUserInfo @@ -61,6 +57,8 @@ class FileserverManager: Builder that constructs file server Kubernetes objects. fileserver_storage Kubernetes storage layer for file servers. + slack_client + Optional Slack webhook client for alerts. logger Logger to use. """ @@ -80,9 +78,6 @@ def __init__( self._slack = slack_client self._logger = logger - # Background task management. - self._scheduler: Scheduler | None = None - # Mapping of usernames to internal state. self._servers: dict[str, _State] = {} @@ -158,39 +153,92 @@ async def list(self) -> list[str]: """List users with running file servers.""" return [u for u, s in self._servers.items() if s.running] - async def start(self) -> None: - """Start the background file server tasks. + async def reconcile(self) -> None: + """Reconcile internal state with Kubernetes. - Reconstructs the user state map in the foreground before backgrounding - the reconciliation and monitor tasks. + Runs at Nublado controller startup to reconcile internal state with + the content of Kubernetes. This picks up changes made in Kubernetes + outside of the controller, and is also responsible for building the + internal state from the current state of Kubernetes during startup. + It is called during startup and from a background task. """ + self._logger.info("Reconciling file server state") namespace = self._config.namespace - timeout = Timeout("Reading namespace", KUBERNETES_REQUEST_TIMEOUT) - if not await self._storage.namespace_exists(namespace, timeout): - raise MissingObjectError( - "File server namespace missing", - kind="Namespace", - name=namespace, + timeout = Timeout( + "Reading file server state", KUBERNETES_REQUEST_TIMEOUT + ) + seen = await self._storage.read_fileserver_state(namespace, timeout) + known_users = {k for k, v in self._servers.items() if v.running} + + # Check each fileserver we found to see if it's properly running. If + # not, delete it and remove it from the seen map. + to_delete = set() + invalid = set() + for username, state in seen.items(): + valid = self._builder.is_valid(username, state) + if valid: + self._servers[username] = _State(running=valid) + elif username in self._servers: + to_delete.add(username) + else: + invalid.add(username) + seen = {k: v for k, v in seen.items() if k not in to_delete} + + # Also tidy up any supposedly-running users that we didn't find. They + # may have some objects remaining. This should only be possible if + # something outside of the controller deleted resources. + seen_users = set(seen.keys()) + for user in (known_users - seen_users) | to_delete: + msg = "Removing broken fileserver for user" + self._logger.warning(msg, user=user) + await self.delete(user) + for username in invalid: + if username in self._servers: + continue + msg = "File server present but not running, deleteing" + self._logger.info(msg, user=username) + + # There is an unavoidable race condition where if the user for + # this invalid file server attempts to create a valid file server + # just as we make this call, we may delete parts of their new file + # server. Solving this is complicated; live with it for now. + name = self._builder.build_name(username) + timeout = Timeout( + "Deleting file server", KUBERNETES_REQUEST_TIMEOUT, username + ) + await self._storage.delete( + name, self._config.namespace, username, timeout ) - await self._reconcile_file_servers() - self._scheduler = Scheduler() - self._logger.info("Starting file server watcher task") - await self._scheduler.spawn(self._watch_file_servers()) - self._logger.info("Starting file server periodic reconciliation task") - await self._scheduler.spawn(self._reconcile_loop()) + self._logger.debug("File server reconciliation complete") - async def stop(self) -> None: - """Stop background file server tasks. + async def watch_servers(self) -> None: + """Watch the file server namespace for completed file servers. - Any started file servers will keep running. + Each file server has a timeout, after which it exits. When one exits, + we want to clean up its Kubernetes objects and update its state. This + method runs as a background task watching for changes and triggers the + delete when appropriate. """ - if not self._scheduler: - msg = "File server background tasks were already stopped" - self._logger.warning(msg) - return - self._logger.info("Stopping file server background tasks") - await self._scheduler.close() - self._scheduler = None + namespace = self._config.namespace + while True: + try: + async for change in self._storage.watch_pods(namespace): + if change.phase in (PodPhase.FAILED, PodPhase.SUCCEEDED): + pod = change.pod + username = self._builder.get_username_for_pod(pod) + if not username: + continue + self._logger.info( + "File server exited, cleaning up", + phase=change.phase.value, + user=username, + ) + with contextlib.suppress(UnknownUserError): + await self.delete(username) + except Exception as e: + self._logger.exception("Error watching file server pod phase") + await self._maybe_post_slack_exception(e) + await asyncio.sleep(1) async def _create_file_server( self, user: GafaelfawrUserInfo, timeout: Timeout @@ -267,104 +315,3 @@ async def _maybe_post_slack_exception( await self._slack.post_exception(exc) else: await self._slack.post_uncaught_exception(exc) - - async def _reconcile_file_servers(self) -> None: - """Reconcile internal state with Kubernetes. - - Runs at Nublado controller startup to reconcile the initially empty - state map with the contents of Kubernetes. - """ - self._logger.debug("Reconciling file server state") - namespace = self._config.namespace - timeout = Timeout( - "Reading file server state", KUBERNETES_REQUEST_TIMEOUT - ) - seen = await self._storage.read_fileserver_state(namespace, timeout) - known_users = {k for k, v in self._servers.items() if v.running} - - # Check each fileserver we found to see if it's properly running. If - # not, delete it and remove it from the seen map. - to_delete = set() - invalid = set() - for username, state in seen.items(): - valid = self._builder.is_valid(username, state) - if valid: - self._servers[username] = _State(running=valid) - elif username in self._servers: - to_delete.add(username) - else: - invalid.add(username) - seen = {k: v for k, v in seen.items() if k not in to_delete} - - # Also tidy up any supposedly-running users that we didn't find. They - # may have some objects remaining. This should only be possible if - # something outside of the controller deleted resources. - seen_users = set(seen.keys()) - for user in (known_users - seen_users) | to_delete: - msg = "Removing broken fileserver for user" - self._logger.warning(msg, user=user) - await self.delete(user) - for username in invalid: - if username in self._servers: - continue - msg = "File server present but not running, deleteing" - self._logger.info(msg, user=username) - - # There is an unavoidable race condition where if the user for - # this invalid file server attempts to create a valid file server - # just as we make this call, we may delete parts of their new file - # server. Solving this is complicated; live with it for now. - name = self._builder.build_name(username) - timeout = Timeout( - "Deleting file server", KUBERNETES_REQUEST_TIMEOUT, username - ) - await self._storage.delete( - name, self._config.namespace, username, timeout - ) - self._logger.debug("File server reconciliation complete") - - async def _reconcile_loop(self) -> None: - """Run in the background by `start`, stopped with `stop`.""" - while True: - start = current_datetime(microseconds=True) - try: - await self._reconcile_file_servers() - except Exception as e: - self._logger.exception("Unable to reconcile file servers") - await self._maybe_post_slack_exception(e) - now = current_datetime(microseconds=True) - delay = FILE_SERVER_REFRESH_INTERVAL - (now - start) - if delay.total_seconds() < 1: - msg = "File server reconciliation is running continuously" - self._logger.warning(msg) - else: - await asyncio.sleep(delay.total_seconds()) - - async def _watch_file_servers(self) -> None: - """Watch the file server namespace for completed file servers. - - Each file server has a timeout, after which it exits. When one exits, - we want to clean up its Kubernetes objects and update its state. This - method runs as a background task watching for changes and triggers the - delete when appropriate. - """ - namespace = self._config.namespace - while True: - try: - async for change in self._storage.watch_pods(namespace): - if change.phase in (PodPhase.FAILED, PodPhase.SUCCEEDED): - pod = change.pod - username = self._builder.get_username_for_pod(pod) - if not username: - continue - self._logger.info( - "File server exited, cleaning up", - phase=change.phase.value, - user=username, - ) - with contextlib.suppress(UnknownUserError): - await self.delete(username) - except Exception as e: - self._logger.exception("Error watching file server pod phase") - await self._maybe_post_slack_exception(e) - await asyncio.sleep(1) diff --git a/controller/src/controller/services/image.py b/controller/src/controller/services/image.py index 033eb547a..a94c39c8c 100644 --- a/controller/src/controller/services/image.py +++ b/controller/src/controller/services/image.py @@ -4,13 +4,11 @@ import asyncio -from aiojobs import Scheduler from kubernetes_asyncio.client import V1Node -from safir.datetime import current_datetime from safir.slack.webhook import SlackWebhookClient from structlog.stdlib import BoundLogger -from ..constants import IMAGE_REFRESH_INTERVAL, KUBERNETES_REQUEST_TIMEOUT +from ..constants import KUBERNETES_REQUEST_TIMEOUT from ..exceptions import UnknownDockerImageError from ..models.domain.docker import DockerReference from ..models.domain.image import MenuImage, MenuImages, NodeData @@ -93,8 +91,7 @@ def __init__( self._slack_client = slack_client self._logger = logger - # Background task management. - self._scheduler: Scheduler | None = None + # Prepuller synchronization. self._lock = asyncio.Lock() self._refreshed = asyncio.Event() @@ -310,9 +307,9 @@ def prepull_status(self) -> PrepullerStatus: async def refresh(self) -> None: """Refresh data from Docker and Kubernetes. - Normally run in the background by the task started with `start`, but - can be called directly to force an immediate refresh. Does not catch - exceptions; the caller must do that if desired. + Normally run in a background task, but can be called directly to force + an immediate refresh. Does not catch exceptions; the caller must do + that if desired. """ timeout = Timeout("List nodes", KUBERNETES_REQUEST_TIMEOUT) selector = self._node_selector @@ -323,32 +320,7 @@ async def refresh(self) -> None: self._nodes = self._build_nodes(to_prepull, node_list, cached) self._to_prepull = to_prepull self._logger.info("Refreshed image information") - - async def start(self) -> None: - """Start a periodic refresh as a background task. - - Does not return until the background refresh has completed its first - run. We don't want to start answering user requests until we have - populated our lists of available images; otherwise, we might return - bogus information for the spawner form. - """ - if self._scheduler: - msg = "Image service already running, cannot start" - self._logger.warning(msg) - return - self._logger.info("Starting image service") - self._scheduler = Scheduler() - await self._scheduler.spawn(self._refresh_loop()) - await self._refreshed.wait() - - async def stop(self) -> None: - """Stop the background refresh task.""" - if not self._scheduler: - self._logger.warning("Image service was already stopped") - return - self._logger.info("Stopping image service") - await self._scheduler.close() - self._scheduler = None + self._refreshed.set() async def prepuller_wait(self) -> None: """Wait for a data refresh. @@ -406,25 +378,3 @@ def _build_nodes( comment=tolerate.comment, ) return node_data - - async def _refresh_loop(self) -> None: - """Run in the background by `start`, stopped with `stop`.""" - while True: - start = current_datetime() - try: - await self.refresh() - self._refreshed.set() - except Exception as e: - # On failure, log the exception and do not indicate we hve - # updated our data but otherwise continue as normal, including - # the delay. This will provide some time for whatever the - # problem was to be resolved. - self._logger.exception("Unable to refresh image information") - if self._slack_client: - await self._slack_client.post_uncaught_exception(e) - delay = IMAGE_REFRESH_INTERVAL - (current_datetime() - start) - if delay.total_seconds() < 1: - msg = "Image refresh is running continuously" - self._logger.warning(msg) - else: - await asyncio.sleep(delay.total_seconds()) diff --git a/controller/src/controller/services/lab.py b/controller/src/controller/services/lab.py index 172c6d890..dedc3b10a 100644 --- a/controller/src/controller/services/lab.py +++ b/controller/src/controller/services/lab.py @@ -10,9 +10,7 @@ from enum import Enum from typing import Self -from aiojobs import Scheduler from safir.asyncio import AsyncMultiQueue -from safir.datetime import current_datetime from safir.slack.blockkit import ( SlackException, ) @@ -20,7 +18,7 @@ from structlog.stdlib import BoundLogger from ..config import LabConfig -from ..constants import KUBERNETES_REQUEST_TIMEOUT, LAB_STATE_REFRESH_INTERVAL +from ..constants import KUBERNETES_REQUEST_TIMEOUT from ..exceptions import ( ControllerTimeoutError, InsufficientQuotaError, @@ -172,9 +170,6 @@ def __init__( self._slack = slack_client self._logger = logger - # Background task management. - self._scheduler: Scheduler | None = None - # Mapping of usernames to internal lab state. self._labs: dict[str, _State] = {} @@ -497,34 +492,117 @@ async def list_lab_users(self, *, only_running: bool = False) -> list[str]: else: return [u for u, s in self._labs.items() if s.state] - async def start(self) -> None: - """Synchronize with Kubernetes and start a background refresh task. + async def reap_spawners(self) -> None: + """Wait for spawner tasks to complete and record their status. + + When a user spawns a lab, the lab controller creates a background task + to create the Kubernetes objects and then wait for the pod to finish + starting. Something needs to await those tasks so that they can be + cleanly finalized and to catch any uncaught exceptions. That function + is performed by a background task running this method. - Examine Kubernetes for current user lab state, update our internal - data structures accordingly, and then start a background refresh task - that does this periodically. (Labs may be destroyed by Kubernetes node - upgrades, for example.) + Notes + ----- + Doing this properly is a bit tricky, since we have to avoid both + busy-waiting when no operations are in progress and not reaping + anything if one operation keeps running forever. The approach used + here is to have every spawn set the ``_spawner_done`` `asyncio.Event` + when it is complete, and use that as a trigger for doing a reaper + pass. Deletes do not do this since they're normally awaited by the + caller and thus don't need to be reaped separately. """ - if self._scheduler: - msg = "User lab state tasks already running, cannot start again" - self._logger.warning(msg) - return - await self._reconcile_lab_state() - self._logger.info("Starting periodic reconciliation task") - self._scheduler = Scheduler() - await self._scheduler.spawn(self._reconcile_loop()) - self._logger.info("Starting reaper for spawn monitoring tasks") - await self._scheduler.spawn(self._reap_spawners()) - - async def stop(self) -> None: - """Stop the background refresh task.""" - if not self._scheduler: - msg = "User lab state background tasks were already stopped" - self._logger.warning(msg) + while True: + await self._spawner_done.wait() + self._spawner_done.clear() + for username, lab in self._labs.items(): + if lab.monitor.in_progress and lab.monitor.is_done(): + try: + await lab.monitor.wait() + except NoOperationError: + # There is a race condition with deletes, since the + # task doing the delete kicks it off and then + # immediately waits for its completion. We may + # discover the completed task right before that wait + # wakes up and reaps it, and then have no task by the + # time we call wait ourselves. This should be harmless + # and ignorable. + pass + except Exception as e: + msg = "Uncaught exception in monitor thread" + self._logger.exception(msg, user=username) + await self._maybe_post_slack_exception(e, username) + if lab.state: + lab.state.status = LabStatus.FAILED + + async def reconcile(self) -> None: + """Reconcile user lab state with Kubernetes. + + This method is called on startup and then periodically from a + background thread to check Kubernetes and ensure the in-memory record + of the user's lab state matches reality. On startup, it also needs to + recreate the internal state from the contents of Kubernetes. + + Raises + ------ + KubernetesError + Raised if there is some failure in a Kubernetes API call. + """ + self._logger.info("Reconciling user lab state with Kubernetes") + known_users = set(self._labs.keys()) + + # Gather information about all extant Kubernetes namespaces and delete + # any malformed namespaces for which no operation is in progress. + observed = await self._gather_current_state() + + # If the set of users we expected to see changed during + # reconciliation, that means someone added a new user while we were + # reconciling. Play it safe and skip this background update; we'll + # catch any inconsistencies the next time around. + # + # From this point forward, make sure not to do any asyncio operations + # until we've finished reconciling state, since if we yield control + # our state may change out from under us. + if set(self._labs.keys()) != known_users: + msg = "Known users changed during reconciliation, skipping" + self._logger.info(msg) return - self._logger.info("Stopping user lab state background tasks") - await self._scheduler.close() - self._scheduler = None + + # First pass: check all users already recorded in internal state + # against Kubernetes and correct them (or remove them) if needed. + to_monitor = self._reconcile_known_users(observed) + + # Second pass: take observed state and create any missing internal + # state. This is the normal case after a restart of the lab + # controller. + for username in set(observed.keys()) - known_users: + msg = f"Creating record for user {username} from Kubernetes" + self._logger.info(msg) + self._labs[username] = _State( + state=observed[username], + monitor=_LabMonitor( + username=username, + slack_client=self._slack, + logger=self._logger, + ), + ) + if observed[username].status == LabStatus.PENDING: + to_monitor.add(username) + + # If we discovered any pods unexpectedly in the pending state, kick + # off monitoring jobs to wait for them to become ready and handle + # timeouts if they never do. We've now fixed internal state, so it's + # safe to do asyncio operations again. + for username in sorted(to_monitor): + await self._monitor_pending_spawn(username) + + # Finally, for all labs in failed or terminated state (spawn failed, + # killed by the idle culler, killed by the OOM killer, etc.), clean up + # the lab as long as the user hasn't started some other operation in + # the meantime. + await self._delete_completed_labs() + + async def stop_monitor_tasks(self) -> None: + """Stop any tasks that are waiting for labs to spawn.""" self._logger.info("Stopping spawning monitor tasks") labs = self._labs self._labs = {} @@ -750,48 +828,6 @@ async def _monitor_pending_spawn(self, username: str) -> None: with contextlib.suppress(OperationConflictError): await lab.monitor.monitor(operation, timeout, self._spawner_done) - async def _reap_spawners(self) -> None: - """Wait for spawner tasks to complete and record their status. - - When a user spawns a lab, the lab controller creates a background task - to create the Kubernetes objects and then wait for the pod to finish - starting. Something needs to await those tasks so that they can be - cleanly finalized and to catch any uncaught exceptions. That function - is performed by a background task running this method. - - Notes - ----- - Doing this properly is a bit tricky, since we have to avoid both - busy-waiting when no operations are in progress and not reaping - anything if one operation keeps running forever. The approach used - here is to have every spawn set the ``_spawner_done`` `asyncio.Event` - when it is complete, and use that as a trigger for doing a reaper - pass. Deletes do not do this since they're normally awaited by the - caller and thus don't need to be reaped separately. - """ - while True: - await self._spawner_done.wait() - self._spawner_done.clear() - for username, lab in self._labs.items(): - if lab.monitor.in_progress and lab.monitor.is_done(): - try: - await lab.monitor.wait() - except NoOperationError: - # There is a race condition with deletes, since the - # task doing the delete kicks it off and then - # immediately waits for its completion. We may - # discover the completed task right before that wait - # wakes up and reaps it, and then have no task by the - # time we call wait ourselves. This should be harmless - # and ignorable. - pass - except Exception as e: - msg = "Uncaught exception in monitor thread" - self._logger.exception(msg, user=username) - await self._maybe_post_slack_exception(e, username) - if lab.state: - lab.state.status = LabStatus.FAILED - def _reconcile_known_users( self, observed: dict[str, UserLabState] ) -> set[str]: @@ -853,91 +889,6 @@ def _reconcile_known_users( to_monitor.add(username) return to_monitor - async def _reconcile_lab_state(self) -> None: - """Reconcile user lab state with Kubernetes. - - This method is called on startup and then periodically from a - background thread to check Kubernetes and ensure the in-memory record - of the user's lab state matches reality. On startup, it also needs to - recreate the internal state from the contents of Kubernetes. - - Raises - ------ - KubernetesError - Raised if there is some failure in a Kubernetes API call. - """ - self._logger.info("Reconciling user lab state with Kubernetes") - known_users = set(self._labs.keys()) - - # Gather information about all extant Kubernetes namespaces and delete - # any malformed namespaces for which no operation is in progress. - observed = await self._gather_current_state() - - # If the set of users we expected to see changed during - # reconciliation, that means someone added a new user while we were - # reconciling. Play it safe and skip this background update; we'll - # catch any inconsistencies the next time around. - # - # From this point forward, make sure not to do any asyncio operations - # until we've finished reconciling state, since if we yield control - # our state may change out from under us. - if set(self._labs.keys()) != known_users: - msg = "Known users changed during reconciliation, skipping" - self._logger.info(msg) - return - - # First pass: check all users already recorded in internal state - # against Kubernetes and correct them (or remove them) if needed. - to_monitor = self._reconcile_known_users(observed) - - # Second pass: take observed state and create any missing internal - # state. This is the normal case after a restart of the lab - # controller. - for username in set(observed.keys()) - known_users: - msg = f"Creating record for user {username} from Kubernetes" - self._logger.info(msg) - self._labs[username] = _State( - state=observed[username], - monitor=_LabMonitor( - username=username, - slack_client=self._slack, - logger=self._logger, - ), - ) - if observed[username].status == LabStatus.PENDING: - to_monitor.add(username) - - # If we discovered any pods unexpectedly in the pending state, kick - # off monitoring jobs to wait for them to become ready and handle - # timeouts if they never do. We've now fixed internal state, so it's - # safe to do asyncio operations again. - for username in sorted(to_monitor): - await self._monitor_pending_spawn(username) - - # Finally, for all labs in failed or terminated state (spawn failed, - # killed by the idle culler, killed by the OOM killer, etc.), clean up - # the lab as long as the user hasn't started some other operation in - # the meantime. - await self._delete_completed_labs() - - async def _reconcile_loop(self) -> None: - """Run in the background by `start`, stopped with `stop`.""" - while True: - start = current_datetime(microseconds=True) - try: - await self._reconcile_lab_state() - except Exception as e: - self._logger.exception("Unable to reconcile user lab state") - if self._slack: - await self._slack.post_uncaught_exception(e) - now = current_datetime(microseconds=True) - delay = LAB_STATE_REFRESH_INTERVAL - (now - start) - if delay.total_seconds() < 1: - msg = "User lab state reconciliation is running continuously" - self._logger.warning(msg) - else: - await asyncio.sleep(delay.total_seconds()) - async def _spawn_lab( self, *, diff --git a/controller/src/controller/services/prepuller.py b/controller/src/controller/services/prepuller.py index e7ea098c3..a177a49b9 100644 --- a/controller/src/controller/services/prepuller.py +++ b/controller/src/controller/services/prepuller.py @@ -2,12 +2,11 @@ import asyncio -from aiojobs import Scheduler from safir.slack.blockkit import SlackException from safir.slack.webhook import SlackWebhookClient from structlog.stdlib import BoundLogger -from ..constants import IMAGE_REFRESH_INTERVAL, PREPULLER_POD_TIMEOUT +from ..constants import PREPULLER_POD_TIMEOUT from ..models.domain.rspimage import RSPImage from ..storage.kubernetes.pod import PodStorage from ..storage.metadata import MetadataStorage @@ -59,58 +58,6 @@ def __init__( self._slack = slack_client self._logger = logger - # Scheduler to manage background tasks that prepull images to nodes. - self._scheduler: Scheduler | None = None - - async def start(self) -> None: - """Start the prepuller. - - The prepuller normally runs for the lifetime of the process in the - background, but when first called, wait for the image data to populate - in the foreground. This ensures that we populate image data before - FastAPI completes its startup event, and therefore before we start - answering requests. That in turn means a more accurate health check, - since until we have populated image data our API is fairly useless. - (It also makes life easier for the test suite.) - """ - if self._scheduler: - msg = "Prepuller already running, cannot start" - self._logger.warning(msg) - return - self._logger.info("Starting prepuller tasks") - await self._image_service.prepuller_wait() - self._running = True - self._scheduler = Scheduler() - await self._scheduler.spawn(self._prepull_loop()) - - async def stop(self) -> None: - """Stop the prepuller.""" - if not self._scheduler: - self._logger.warning("Prepuller was already stopped") - return - self._logger.info("Stopping prepuller") - await self._scheduler.close() - self._scheduler = None - - async def _prepull_loop(self) -> None: - """Continually prepull images in a loop. - - When the prepuller is stopped, we will orphan prepuller pods. Avoiding - this is difficult and unreliable. The prepuller should instead detect - orphaned pods on startup and clean them up. - """ - while True: - try: - await self.prepull_images() - await self._image_service.prepuller_wait() - except Exception as e: - self._logger.exception("Uncaught exception in prepuller") - if self._slack: - await self._slack.post_uncaught_exception(e) - pause = IMAGE_REFRESH_INTERVAL.total_seconds() - self._logger.warning("Pausing failed prepuller for {pause}s") - await asyncio.sleep(pause) - async def prepull_images(self) -> None: """Prepull missing images.""" missing_by_node = self._image_service.missing_images_by_node() @@ -119,13 +66,10 @@ async def prepull_images(self) -> None: # node at a time. We do this by creating a separate background task # per node, each of which works through the list of images that are # missing on that node. - node_tasks = set() - for node, images in missing_by_node.items(): - self._logger.debug(f"Creating prepull task for {node}") - prepull_call = self._prepull_images_for_node(node, images) - task = asyncio.create_task(prepull_call) - node_tasks.add(task) - await asyncio.gather(*node_tasks) + async with asyncio.TaskGroup() as tg: + for node, images in missing_by_node.items(): + self._logger.debug(f"Creating prepull task for {node}") + tg.create_task(self._prepull_images_for_node(node, images)) self._logger.debug("Finished prepulling all images") async def _prepull_images_for_node( diff --git a/controller/src/controller/storage/kubernetes/fileserver.py b/controller/src/controller/storage/kubernetes/fileserver.py index fff705f95..376121e65 100644 --- a/controller/src/controller/storage/kubernetes/fileserver.py +++ b/controller/src/controller/storage/kubernetes/fileserver.py @@ -18,7 +18,6 @@ from .custom import GafaelfawrIngressStorage from .deleter import JobStorage, PersistentVolumeClaimStorage, ServiceStorage from .ingress import IngressStorage -from .namespace import NamespaceStorage from .pod import PodStorage __all__ = ["FileserverStorage"] @@ -48,7 +47,6 @@ def __init__(self, api_client: ApiClient, logger: BoundLogger) -> None: self._gafaelfawr = GafaelfawrIngressStorage(api_client, logger) self._ingress = IngressStorage(api_client, logger) self._job = JobStorage(api_client, logger) - self._namespace = NamespaceStorage(api_client, logger) self._pod = PodStorage(api_client, logger) self._pvc = PersistentVolumeClaimStorage(api_client, logger) self._service = ServiceStorage(api_client, logger) @@ -158,23 +156,6 @@ async def delete( for pvc in pvcs: await self._pvc.delete(pvc.metadata.name, namespace, timeout) - async def namespace_exists(self, name: str, timeout: Timeout) -> bool: - """Check whether a namespace is present. - - Parameters - ---------- - name - Name of the namespace. - timeout - Timeout on operation. - - Returns - ------- - bool - `True` if the namespace is present, `False` otherwise. - """ - return await self._namespace.read(name, timeout) is not None - async def read_fileserver_state( self, namespace: str, timeout: Timeout ) -> dict[str, FileserverStateObjects]: diff --git a/controller/tests/conftest.py b/controller/tests/conftest.py index 9e38a7bd3..8ab9fe88f 100644 --- a/controller/tests/conftest.py +++ b/controller/tests/conftest.py @@ -81,6 +81,7 @@ async def factory( mock_kubernetes.set_nodes_for_test(nodes) async with Factory.standalone(config) as factory: yield factory + await factory.stop_background_services() @pytest.fixture diff --git a/controller/tests/services/prepuller_test.py b/controller/tests/services/prepuller_test.py index 225190a7c..6338e4982 100644 --- a/controller/tests/services/prepuller_test.py +++ b/controller/tests/services/prepuller_test.py @@ -66,8 +66,7 @@ async def test_docker( factory: Factory, config: Config, mock_kubernetes: MockKubernetesApi ) -> None: """Test the prepuller service configured to talk to Docker.""" - await factory.image_service.start() - await asyncio.sleep(0.2) + await factory.image_service.refresh() # With the default data, the image service running, and no prepuller, we # should see node1 as up-to-date but node2 out-of-date. @@ -80,7 +79,7 @@ async def test_docker( assert image.nodes == ["node1", "node2"] # Start the prepuller and give it a moment to run. - await factory.prepuller.start() + await factory.start_background_services() await asyncio.sleep(0.2) # The default data configures Kubernetes with missing images on some @@ -105,10 +104,6 @@ async def test_docker( if image.tag == "d_2077_10_23": assert image.nodes == ["node1", "node2"] - # Stop everything. (Ideally this should be done in a try/finally block.) - await factory.image_service.stop() - await factory.prepuller.stop() - @pytest.mark.asyncio async def test_gar( @@ -273,9 +268,8 @@ def callback(method: str, *args: Any) -> None: mock_kubernetes.error_callback = callback - await factory.start_background_services() - await asyncio.sleep(0.2) - + await factory.image_service.refresh() + await factory.prepuller.prepull_images() obj = "nublado/prepull-d-2077-10-23-node2" error = f"Error creating object (Pod {obj}, status 400)" assert mock_slack.messages == [ diff --git a/docs/dev/api/controller.rst b/docs/dev/api/controller.rst index b3690ad54..34e94ae0f 100644 --- a/docs/dev/api/controller.rst +++ b/docs/dev/api/controller.rst @@ -8,6 +8,9 @@ This documentation therefore exists only to assist developers and code analysis .. automodapi:: controller :include-all-objects: +.. automodapi:: controller.background + :include-all-objects: + .. automodapi:: controller.config :include-all-objects: