Skip to content

Commit

Permalink
Merge pull request #299 from lsst-sqre/tickets/DM-41907
Browse files Browse the repository at this point in the history
DM-41907: Move background task management
  • Loading branch information
rra authored Dec 8, 2023
2 parents 4b676e9 + c5fdeea commit 44063d0
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 469 deletions.
201 changes: 201 additions & 0 deletions controller/src/controller/background.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
"""Nublado controller background processing."""

from __future__ import annotations

import asyncio
from collections.abc import Coroutine
from datetime import timedelta

from aiojobs import Scheduler
from safir.datetime import current_datetime
from safir.slack.webhook import SlackWebhookClient
from structlog.stdlib import BoundLogger

from .constants import (
FILE_SERVER_RECONCILE_INTERVAL,
IMAGE_REFRESH_INTERVAL,
LAB_RECONCILE_INTERVAL,
)
from .services.fileserver import FileserverManager
from .services.image import ImageService
from .services.lab import LabManager
from .services.prepuller import Prepuller

__all__ = ["BackgroundTaskManager"]


class BackgroundTaskManager:
"""Manage Nublado controller background tasks.
While the Nublado controller is running, it needs to perform several
periodic or continuous background tasks, namely:
#. Refresh the list of available remote images and local cached images.
#. Prepull images to all eligible nodes.
#. Reconcile Kubernetes lab state with internal data structures.
#. Reap tasks that were monitoring lab spawning or deletion.
#. Watch file servers for changes in pod status (startup or timeout).
#. Reconcile Kubernetes file server state with internal data structures.
This class manages all of these background tasks including, where
relevant, their schedules. It only does the task management; all of the
work of these tasks is done by methods on the underlying service objects.
This class is created during startup and tracked as part of the
`~controller.factory.ProcessContext`.
Parameters
----------
image_service
Image service.
prepuller
Prepuller service.
lab_manager
Lab management service.
fileserver_manager
File server management service.
slack_client
Optional Slack webhook client for alerts.
logger
Logger to use.
"""

def __init__(
self,
*,
image_service: ImageService,
prepuller: Prepuller,
lab_manager: LabManager,
fileserver_manager: FileserverManager | None,
slack_client: SlackWebhookClient | None,
logger: BoundLogger,
) -> None:
self._image_service = image_service
self._prepuller = prepuller
self._lab_manager = lab_manager
self._fileserver_manager = fileserver_manager
self._slack = slack_client
self._logger = logger

self._scheduler: Scheduler | None = None

async def start(self) -> None:
"""Start all background tasks.
Intended to be called during Nublado controller startup. Several of
the background tasks are run in the foreground first to ensure
internal state is valid before starting to serve requests.
"""
if self._scheduler:
msg = "Background tasks already running, cannot start"
self._logger.warning(msg)
return
self._scheduler = Scheduler()

# Run some of the tasks in the foreground first to ensure internal
# data is consistent after startup. All of them can run in parallel.
async with asyncio.TaskGroup() as tg:
self._logger.info("Populating internal state")
tg.create_task(self._image_service.refresh())
tg.create_task(self._lab_manager.reconcile())
if self._fileserver_manager:
tg.create_task(self._fileserver_manager.reconcile())

# Now, start all of the tasks in the background.
coros = [
self._loop(
self._image_service.refresh(),
IMAGE_REFRESH_INTERVAL,
"refreshing image data",
),
self._prepull_loop(),
self._loop(
self._lab_manager.reconcile(),
LAB_RECONCILE_INTERVAL,
"reconciling lab state",
),
self._lab_manager.reap_spawners(),
]
if self._fileserver_manager:
coros.append(
self._loop(
self._fileserver_manager.reconcile(),
FILE_SERVER_RECONCILE_INTERVAL,
"reconciling file server state",
)
)
coros.append(self._fileserver_manager.watch_servers())
self._logger.info("Starting background tasks")
for coro in coros:
await self._scheduler.spawn(coro)

async def stop(self) -> None:
"""Stop the background tasks."""
if not self._scheduler:
msg = "Background tasks were already stopped"
self._logger.warning(msg)
return
self._logger.info("Stopping background tasks")
await self._scheduler.close()
self._scheduler = None
await self._lab_manager.stop_monitor_tasks()

async def _loop(
self,
coro: Coroutine[None, None, None],
interval: timedelta,
description: str,
) -> None:
"""Wrap a coroutine in a periodic scheduling loop.
The provided coroutine is run on every interval. This method always
delays by the interval first before running the coroutine for the
first time.
Parameters
----------
coro
Coroutine to run repeatedly.
interval
Scheduling interval to use.
description
Description of the background task for error reporting.
"""
while True:
start = current_datetime(microseconds=True)
try:
await coro
except Exception as e:
# On failure, log the exception but otherwise continue as
# normal, including the delay. This will provide some time for
# whatever the problem was to be resolved.
elapsed = current_datetime(microseconds=True) - start
msg = f"Uncaught exception {description}"
self._logger.exception(msg, delay=elapsed.total_seconds)
if self._slack:
await self._slack.post_uncaught_exception(e)
delay = interval - (current_datetime(microseconds=True) - start)
if delay.total_seconds() < 1:
msg = f"{description.capitalize()} is running continuously"
self._logger.warning(msg)
else:
await asyncio.sleep(delay.total_seconds())

async def _prepull_loop(self) -> None:
"""Execute the prepuller in an infinite loop.
The prepuller loop uses an `asyncio.Event` set by the image service to
decide when to run instead of a simple interval. This ensures the
prepuller runs immediately after a possible image list update.
"""
while True:
try:
await self._image_service.prepuller_wait()
await self._prepuller.prepull_images()
except Exception as e:
self._logger.exception("Uncaught exception prepulling images")
if self._slack:
await self._slack.post_uncaught_exception(e)
pause = IMAGE_REFRESH_INTERVAL.total_seconds()
self._logger.warning("Pausing failed prepuller for {pause}s")
await asyncio.sleep(pause)
8 changes: 4 additions & 4 deletions controller/src/controller/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
"DOCKER_CREDENTIALS_PATH",
"DROPDOWN_SENTINEL_VALUE",
"GROUPNAME_REGEX",
"FILE_SERVER_REFRESH_INTERVAL",
"FILE_SERVER_RECONCILE_INTERVAL",
"IMAGE_REFRESH_INTERVAL",
"KUBERNETES_NAME_PATTERN",
"KUBERNETES_REQUEST_TIMEOUT",
"LAB_COMMAND",
"LAB_STATE_REFRESH_INTERVAL",
"LAB_RECONCILE_INTERVAL",
"LIMIT_TO_REQUEST_RATIO",
"METADATA_PATH",
"MOUNT_PATH_DOWNWARD_API",
Expand Down Expand Up @@ -47,7 +47,7 @@
DROPDOWN_SENTINEL_VALUE = "use_image_from_dropdown"
"""Used in the lab form for ``image_list`` when ``image_dropdown`` is used."""

FILE_SERVER_REFRESH_INTERVAL = timedelta(minutes=60)
FILE_SERVER_RECONCILE_INTERVAL = timedelta(minutes=60)
"""How frequently to refresh file server state from Kubernetes.
This will detect when file servers disappear out from under us, such as being
Expand Down Expand Up @@ -75,7 +75,7 @@
This should be configurable but isn't yet.
"""

LAB_STATE_REFRESH_INTERVAL = timedelta(minutes=60)
LAB_RECONCILE_INTERVAL = timedelta(minutes=60)
"""How frequently to refresh user lab state from Kubernetes.
This will detect when user labs disappear out from under us without user
Expand Down
60 changes: 33 additions & 27 deletions controller/src/controller/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from safir.slack.webhook import SlackWebhookClient
from structlog.stdlib import BoundLogger

from .background import BackgroundTaskManager
from .config import Config
from .exceptions import NotConfiguredError
from .models.v1.prepuller_config import DockerSourceConfig, GARSourceConfig
Expand Down Expand Up @@ -75,6 +76,9 @@ class ProcessContext:
_fileserver_manager: FileserverManager | None
"""State management for user file servers."""

background: BackgroundTaskManager
"""Manager for background tasks."""

@classmethod
async def from_config(cls, config: Config) -> Self:
"""Create a new process context from the controller configuration.
Expand Down Expand Up @@ -148,32 +152,42 @@ async def from_config(cls, config: Config) -> Self:
slack_client=slack_client,
logger=logger,
)
prepuller = Prepuller(
image_service=image_service,
prepuller_builder=PrepullerBuilder(
metadata_storage=metadata_storage,
pull_secret=config.lab.pull_secret,
),
metadata_storage=metadata_storage,
pod_storage=PodStorage(kubernetes_client, logger),
slack_client=slack_client,
logger=logger,
)
lab_manager = LabManager(
config=config.lab,
image_service=image_service,
lab_builder=LabBuilder(config.lab, config.base_url, logger),
metadata_storage=metadata_storage,
lab_storage=LabStorage(kubernetes_client, logger),
slack_client=slack_client,
logger=logger,
)
return cls(
config=config,
http_client=http_client,
image_service=image_service,
kubernetes_client=kubernetes_client,
prepuller=Prepuller(
image_service=image_service,
prepuller_builder=PrepullerBuilder(
metadata_storage=metadata_storage,
pull_secret=config.lab.pull_secret,
),
metadata_storage=metadata_storage,
pod_storage=PodStorage(kubernetes_client, logger),
slack_client=slack_client,
logger=logger,
),
lab_manager=LabManager(
config=config.lab,
prepuller=prepuller,
lab_manager=lab_manager,
_fileserver_manager=fileserver_manager,
background=BackgroundTaskManager(
image_service=image_service,
lab_builder=LabBuilder(config.lab, config.base_url, logger),
metadata_storage=metadata_storage,
lab_storage=LabStorage(kubernetes_client, logger),
prepuller=prepuller,
lab_manager=lab_manager,
fileserver_manager=fileserver_manager,
slack_client=slack_client,
logger=logger,
),
_fileserver_manager=fileserver_manager,
)

@property
Expand All @@ -189,23 +203,15 @@ async def aclose(self) -> None:

async def start(self) -> None:
"""Start the background threads running."""
await self.image_service.start()
await self.prepuller.start()
await self.lab_manager.start()
if self._fileserver_manager:
await self._fileserver_manager.start()
await self.background.start()

async def stop(self) -> None:
"""Clean up a process context.
Called during shutdown, or before recreating the process context using
a different configuration.
"""
if self._fileserver_manager:
await self._fileserver_manager.stop()
await self.prepuller.stop()
await self.image_service.stop()
await self.lab_manager.stop()
await self.background.stop()


class Factory:
Expand Down
Loading

0 comments on commit 44063d0

Please sign in to comment.