Skip to content

Commit

Permalink
Merge pull request #300 from lsst-sqre/tickets/DM-41907
Browse files Browse the repository at this point in the history
DM-41907: Fix race conditions for labs and file servers
  • Loading branch information
rra authored Dec 8, 2023
2 parents 44063d0 + 66ac013 commit ee59779
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 34 deletions.
46 changes: 38 additions & 8 deletions controller/src/controller/services/fileserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import asyncio
import contextlib
from dataclasses import dataclass, field
from datetime import datetime

from safir.datetime import current_datetime
from safir.slack.blockkit import SlackException
from safir.slack.webhook import SlackWebhookClient
from structlog.stdlib import BoundLogger
Expand Down Expand Up @@ -36,6 +38,23 @@ class _State:
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
"""Lock to prevent two operations from happening at once."""

last_modified: datetime = field(
default_factory=lambda: current_datetime(microseconds=True)
)
"""Last time an operation was started or completed.
This is required to prevent race conditions such as the following:
#. New file server starts being created.
#. Reconcile gathers information about the partially created lab and finds
that it is incomplete.
#. Reconcile deletes the file server objects created so far.
#. File server creation finishes and then the file server doesn't work.
With this setting, reconcile can check if a file server operation has
started or recently finished and skip the reconcile.
"""


class FileserverManager:
"""Manage user file servers.
Expand Down Expand Up @@ -115,6 +134,7 @@ async def create(self, user: GafaelfawrUserInfo) -> None:
if state.running:
return
try:
state.last_modified = current_datetime(microseconds=True)
async with timeout.enforce():
await self._create_file_server(user, timeout)
except Exception as e:
Expand All @@ -125,6 +145,8 @@ async def create(self, user: GafaelfawrUserInfo) -> None:
raise
else:
state.running = True
finally:
state.last_modified = current_datetime(microseconds=True)

async def delete(self, username: str) -> None:
"""Delete the file server for a user.
Expand All @@ -146,8 +168,12 @@ async def delete(self, username: str) -> None:
if not state.running:
msg = f"File server for {username} not running"
raise UnknownUserError(msg)
await self._delete_file_server(username)
state.running = False
state.last_modified = current_datetime(microseconds=True)
try:
await self._delete_file_server(username)
state.running = False
finally:
state.last_modified = current_datetime(microseconds=True)

async def list(self) -> list[str]:
"""List users with running file servers."""
Expand All @@ -167,6 +193,7 @@ async def reconcile(self) -> None:
timeout = Timeout(
"Reading file server state", KUBERNETES_REQUEST_TIMEOUT
)
start = current_datetime(microseconds=True)
seen = await self._storage.read_fileserver_state(namespace, timeout)
known_users = {k for k, v in self._servers.items() if v.running}

Expand All @@ -176,22 +203,25 @@ async def reconcile(self) -> None:
invalid = set()
for username, state in seen.items():
valid = self._builder.is_valid(username, state)
if valid:
if valid and username not in known_users:
self._servers[username] = _State(running=valid)
elif username in self._servers:
to_delete.add(username)
else:
invalid.add(username)
seen = {k: v for k, v in seen.items() if k not in to_delete}

# Also tidy up any supposedly-running users that we didn't find. They
# may have some objects remaining. This should only be possible if
# something outside of the controller deleted resources.
seen_users = set(seen.keys())
for user in (known_users - seen_users) | to_delete:
seen_users = {u for u in seen if u not in to_delete}
for username in (known_users - seen_users) | to_delete:
if username in self._servers:
last_modified = self._servers[username].last_modified
if last_modified > start:
continue
msg = "Removing broken fileserver for user"
self._logger.warning(msg, user=user)
await self.delete(user)
self._logger.warning(msg, user=username)
await self.delete(username)
for username in invalid:
if username in self._servers:
continue
Expand Down
96 changes: 70 additions & 26 deletions controller/src/controller/services/lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from base64 import b64encode
from collections.abc import AsyncIterator, Coroutine
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Self

from safir.asyncio import AsyncMultiQueue
from safir.datetime import current_datetime
from safir.slack.blockkit import (
SlackException,
)
Expand Down Expand Up @@ -58,6 +60,45 @@ class _State:
events: AsyncMultiQueue[Event] = field(default_factory=AsyncMultiQueue)
"""Events from the current or most recent lab operation."""

last_modified: datetime = field(
default_factory=lambda: current_datetime(microseconds=True)
)
"""Last time the state was changed.
This is required to prevent race conditions such as the following:
#. New lab starts being created.
#. Reconcile gathers information about the partially created lab and finds
that it is incomplete.
#. Lab finishes being spawned and the operation ends.
#. Reconcile checks to see if there is an operation in progress, sees that
there isn't, and therefore thinks it's safe to delete supposedly
malformed lab.
With this setting, reconcile can check if either a lab operation is in
progress or if an operation completed since reconcile started and skip the
reconcile either way.
"""

def modified_since(self, date: datetime) -> bool:
"""Whether the lab state has been modified since the given time.
Any lab that has a current in-progress operation is counted as
modified.
Parameters
----------
date
Reference time.
Returns
-------
bool
`True` if the internal last-modified time is after the provided
time, `False` otherwise.
"""
return bool(self.monitor.in_progress or self.last_modified > date)


class _LabOperation(Enum):
"""Possible operations on a lab that could be in progress."""
Expand Down Expand Up @@ -337,6 +378,7 @@ async def delete_lab(self, username: str) -> None:
# do something safe in case one appears in the future.
if lab.monitor.in_progress == _LabOperation.DELETE:
await lab.monitor.wait()
lab.last_modified = current_datetime(microseconds=True)
elif lab.monitor.in_progress in (_LabOperation.SPAWN, None):
if lab.monitor.in_progress == _LabOperation.SPAWN:
await lab.monitor.cancel()
Expand All @@ -361,6 +403,7 @@ async def delete_lab(self, username: str) -> None:
)
await lab.monitor.monitor(operation, timeout)
await lab.monitor.wait()
lab.last_modified = current_datetime(microseconds=True)
if lab.state.status == LabStatus.TERMINATED:
lab.state = None
else:
Expand Down Expand Up @@ -501,6 +544,9 @@ async def reap_spawners(self) -> None:
cleanly finalized and to catch any uncaught exceptions. That function
is performed by a background task running this method.
This method is also responsible for updating the last modified time in
the lab state.
Notes
-----
Doing this properly is a bit tricky, since we have to avoid both
Expand All @@ -518,6 +564,7 @@ async def reap_spawners(self) -> None:
if lab.monitor.in_progress and lab.monitor.is_done():
try:
await lab.monitor.wait()
lab.last_modified = current_datetime(microseconds=True)
except NoOperationError:
# There is a race condition with deletes, since the
# task doing the delete kicks it off and then
Expand All @@ -528,6 +575,7 @@ async def reap_spawners(self) -> None:
# and ignorable.
pass
except Exception as e:
lab.last_modified = current_datetime(microseconds=True)
msg = "Uncaught exception in monitor thread"
self._logger.exception(msg, user=username)
await self._maybe_post_slack_exception(e, username)
Expand All @@ -548,33 +596,20 @@ async def reconcile(self) -> None:
Raised if there is some failure in a Kubernetes API call.
"""
self._logger.info("Reconciling user lab state with Kubernetes")
known_users = set(self._labs.keys())
start = current_datetime(microseconds=True)

# Gather information about all extant Kubernetes namespaces and delete
# any malformed namespaces for which no operation is in progress.
observed = await self._gather_current_state()

# If the set of users we expected to see changed during
# reconciliation, that means someone added a new user while we were
# reconciling. Play it safe and skip this background update; we'll
# catch any inconsistencies the next time around.
#
# From this point forward, make sure not to do any asyncio operations
# until we've finished reconciling state, since if we yield control
# our state may change out from under us.
if set(self._labs.keys()) != known_users:
msg = "Known users changed during reconciliation, skipping"
self._logger.info(msg)
return
observed = await self._gather_current_state(start)

# First pass: check all users already recorded in internal state
# against Kubernetes and correct them (or remove them) if needed.
to_monitor = self._reconcile_known_users(observed)
to_monitor = self._reconcile_known_users(observed, start)

# Second pass: take observed state and create any missing internal
# state. This is the normal case after a restart of the lab
# controller.
for username in set(observed.keys()) - known_users:
for username in set(observed.keys()) - set(self._labs.keys()):
msg = f"Creating record for user {username} from Kubernetes"
self._logger.info(msg)
self._labs[username] = _State(
Expand Down Expand Up @@ -676,13 +711,21 @@ async def _delete_completed_labs(self) -> None:
with contextlib.suppress(UnknownUserError):
await self.delete_lab(username)

async def _gather_current_state(self) -> dict[str, UserLabState]:
async def _gather_current_state(
self, cutoff: datetime
) -> dict[str, UserLabState]:
"""Gather lab state from extant Kubernetes resources.
Called during reconciliation, this method determines the current lab
state by scanning the resources in Kubernetes. Malformed labs that do
not have an existing entry in our state mapping will be deleted.
Parameters
----------
cutoff
Do not delete incomplete namespaces for users modified after this
date, since our data may be stale.
Returns
-------
dict of UserLabState
Expand All @@ -704,14 +747,12 @@ async def _gather_current_state(self) -> dict[str, UserLabState]:
objects = await self._storage.read_lab_objects(names, timeout)
state = await self._builder.recreate_lab_state(username, objects)

# Only delete malformed labs with no entry or no current operation
# in progress. Do this check immediately before the deletion since
# the above await calls yield control and the internal state may
# change during that time.
# Only delete malformed labs with no entry or that have not been
# modified since we started gathering data.
lab = self._labs.get(username)
if state:
observed[username] = state
elif not lab or not lab.monitor.in_progress:
elif not lab or not lab.modified_since(cutoff):
msg = "Deleting incomplete namespace"
self._logger.warning(msg, user=username, namespace=namespace)
timeout = Timeout(msg, KUBERNETES_REQUEST_TIMEOUT)
Expand Down Expand Up @@ -829,7 +870,7 @@ async def _monitor_pending_spawn(self, username: str) -> None:
await lab.monitor.monitor(operation, timeout, self._spawner_done)

def _reconcile_known_users(
self, observed: dict[str, UserLabState]
self, observed: dict[str, UserLabState], cutoff: datetime
) -> set[str]:
"""Reconcile observed lab state against already-known users.
Expand All @@ -840,6 +881,9 @@ def _reconcile_known_users(
----------
observed
Observed lab state.
cutoff
Do not change labs that have been modified since this date, since
our gathered data about the lab may be stale.
Returns
-------
Expand All @@ -860,7 +904,7 @@ def _reconcile_known_users(
"""
to_monitor = set()
for username, lab in self._labs.items():
if lab.monitor.in_progress or not lab.state:
if not lab.state or lab.modified_since(cutoff):
continue
if lab.state.status == LabStatus.FAILED:
continue
Expand Down Expand Up @@ -1269,7 +1313,7 @@ async def _monitor_operation(
timeout: Timeout,
done_event: asyncio.Event | None = None,
) -> None:
"""Monitor the deletion of a lab.
"""Monitor an operation on a lab.
Parameters
----------
Expand Down

0 comments on commit ee59779

Please sign in to comment.