From 615aadf290deaac9957ef795b31972e7722f1d10 Mon Sep 17 00:00:00 2001 From: Matt Pryor Date: Thu, 18 Jul 2024 10:10:49 +0100 Subject: [PATCH 1/4] First pass at controller --- easykube/__init__.py | 2 + easykube/kubernetes/__init__.py | 4 +- easykube/kubernetes/client/__init__.py | 2 +- easykube/kubernetes/client/resource.py | 14 +- easykube/kubernetes/runtime/__init__.py | 2 + easykube/kubernetes/runtime/controller.py | 263 ++++++++++++++++++++++ easykube/kubernetes/runtime/queue.py | 154 +++++++++++++ easykube/kubernetes/runtime/reconciler.py | 54 +++++ 8 files changed, 489 insertions(+), 6 deletions(-) create mode 100644 easykube/kubernetes/runtime/__init__.py create mode 100644 easykube/kubernetes/runtime/controller.py create mode 100644 easykube/kubernetes/runtime/queue.py create mode 100644 easykube/kubernetes/runtime/reconciler.py diff --git a/easykube/__init__.py b/easykube/__init__.py index f04f86d..477f669 100644 --- a/easykube/__init__.py +++ b/easykube/__init__.py @@ -6,5 +6,7 @@ Configuration, ResourceSpec, SyncClient, + LabelSelector, + DeletePropagationPolicy, resources ) diff --git a/easykube/kubernetes/__init__.py b/easykube/kubernetes/__init__.py index ad59664..84f92f0 100644 --- a/easykube/kubernetes/__init__.py +++ b/easykube/kubernetes/__init__.py @@ -4,7 +4,9 @@ ApiError, AsyncClient, ResourceSpec, - SyncClient + SyncClient, + LabelSelector, + DeletePropagationPolicy ) from .config import Configuration from . import resources diff --git a/easykube/kubernetes/client/__init__.py b/easykube/kubernetes/client/__init__.py index 83003b1..8fcb1b6 100644 --- a/easykube/kubernetes/client/__init__.py +++ b/easykube/kubernetes/client/__init__.py @@ -2,5 +2,5 @@ from .client import AsyncClient, SyncClient from .errors import ApiError from .iterators import ListResponseIterator, WatchEvents -from .resource import PRESENT, ABSENT, DeletePropagationPolicy, Resource +from .resource import PRESENT, ABSENT, LabelSelector, DeletePropagationPolicy, Resource from .spec import ResourceSpec diff --git a/easykube/kubernetes/client/resource.py b/easykube/kubernetes/client/resource.py index 5364338..e121b24 100644 --- a/easykube/kubernetes/client/resource.py +++ b/easykube/kubernetes/client/resource.py @@ -8,10 +8,16 @@ from .iterators import ListResponseIterator, WatchEvents -#: Sentinel object indicating that the presence of a label is required with any value -PRESENT = object() -#: Sentinel object indicating that a label must not be present -ABSENT = object() +class LabelSelector(str, enum.Enum): + #: Indicates that the presence of a label is required + PRESENT = "present" + #: Indicates that a label must not be present + ABSENT = "absent" + + +# Make these instances available at the top-level for backwards compatibility +PRESENT = LabelSelector.PRESENT +ABSENT = LabelSelector.ABSENT class DeletePropagationPolicy(str, enum.Enum): diff --git a/easykube/kubernetes/runtime/__init__.py b/easykube/kubernetes/runtime/__init__.py new file mode 100644 index 0000000..d659869 --- /dev/null +++ b/easykube/kubernetes/runtime/__init__.py @@ -0,0 +1,2 @@ +from .controller import Controller +from .reconciler import Reconciler, Request, Result diff --git a/easykube/kubernetes/runtime/controller.py b/easykube/kubernetes/runtime/controller.py new file mode 100644 index 0000000..0c35ee4 --- /dev/null +++ b/easykube/kubernetes/runtime/controller.py @@ -0,0 +1,263 @@ +import asyncio +import logging +import random +import typing as t + +from ..client import AsyncClient, LabelSelector + +from .queue import Queue +from .reconciler import Request, Result, Reconciler + + +logger = logging.getLogger(__name__) + + +LabelValue = t.Union[LabelSelector, t.List[str], str] + + +class Watch: + """ + Watches a Kubernetes resource and produces reconcile requests. + """ + def __init__( + self, + api_version: str, + kind: str, + mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]], + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None + ): + self._api_version = api_version + self._kind = kind + self._mapper = mapper + self._labels = labels + self._namespace = namespace + + async def run(self, client: AsyncClient, queue: Queue): + """ + Run the watch, pushing requests onto the specified queue. + """ + resource = await client.api(self._api_version).resource(self._kind) + watch_kwargs = {} + if self._labels: + watch_kwargs["labels"] = self._labels + if self._namespace: + watch_kwargs["namespace"] = self._namespace + logger.info( + "Starting watch", + extra = { + "api_version": self._api_version, + "kind": self._kind, + } + ) + initial, events = await resource.watch_list(**watch_kwargs) + for obj in initial: + for request in self._mapper(obj): + queue.enqueue(request) + async for event in events: + for request in self._mapper(event["object"]): + queue.enqueue(request) + + +class Controller: + """ + Class for a controller that watches a resource and its related resources and calls + a reconciler whenever an object needs to be reconciled. + """ + def __init__( + self, + api_version: str, + kind: str, + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None, + worker_count: int = 10, + requeue_max_backoff: int = 120, + ): + self._api_version = api_version + self._kind = kind + self._namespace = namespace + self._worker_count = worker_count + self._requeue_max_backoff = requeue_max_backoff + self._watches: t.List[Watch] = [ + # Start with a watch for the controller resource that produces reconciliation + # requests using the name and namespace from the metadata + Watch( + api_version, + kind, + lambda obj: [ + Request( + obj["metadata"]["name"], + obj["metadata"].get("namespace") + ), + ], + labels = labels, + namespace = namespace + ), + ] + + def owns( + self, + api_version: str, + kind: str, + *, + controller_only: bool = True + ): + """ + Specifies child objects that the controller objects owns and that should trigger + reconciliation of the parent object. + """ + self._watches.append( + Watch( + api_version, + kind, + lambda obj: [ + Request(ref["name"], obj["metadata"].get("namespace")) + for ref in obj["metadata"].get("ownerReferences", []) + if ( + ref["apiVersion"] == self._api_version and + ref["kind"] == self._kind and + (not controller_only or ref.get("controller", False)) + ) + ], + namespace = self._namespace + ) + ) + + def watches( + self, + api_version: str, + kind: str, + mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]], + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None + ): + """ + Watches the specified resource and uses the given mapper function to produce + reconciliation requests for the controller resource. + """ + self._watches.append( + Watch( + api_version, + kind, + mapper, + labels = labels, + namespace = namespace or self._namespace + ) + ) + + def _request_logger(self, request: Request, worker_idx: int): + """ + Returns a logger for the given request. + """ + return logging.LoggerAdapter( + self._logger, + { + "api_version": self._api_version, + "kind": self._kind, + "instance": request.key, + "request_id": request.id, + "worker_idx": worker_idx, + } + ) + + async def _worker( + self, + client: AsyncClient, + reconciler: Reconciler, + queue: Queue, + worker_idx: int + ): + """ + Start a worker that processes reconcile requests using the given reconciler. + """ + while True: + request, attempt = await queue.dequeue() + # Get a logger that populates parameters for the request + logger = self._request_logger(request) + logger.info("Handling reconcile request (attempt %d)", attempt + 1) + # Try to reconcile the request + try: + result = reconciler.reconcile(client, request) + except asyncio.CancelledError: + # Propagate cancellations with no further action + raise + except Exception as exc: + # Log the exception before doing anything + logger.exception("Error handling reconcile request") + # Allow the reconciler to handle the exception + # By returning a result, the reconciler can choose not to requeue the request or + # opt for a fixed delay rather than the exponential backoff + # If it raises an exception, the request is requeued with an exponential backoff + try: + result = reconciler.handle_exception(client, request, exc) + except NotImplementedError: + # If the method is not implemented, we just want to requeue with a backoff + result = Result(True) + except Exception: + # If a different exception is raised, log that as well before requeuing + logger.exception("Error handling reconcile exception") + result = Result(True) + # Work out whether we need to requeue or whether we are done + if result.requeue: + if result.requeue_after: + delay = result.requeue_after + # If a specific delay is requested, reset the attempts + attempt = -1 + else: + delay = min(2**attempt + random.uniform(0, 1), self._requeue_max_backoff) + logger.info("Requeuing request after %ds", delay) + queue.requeue(request, attempt + 1, delay) + else: + logger.info("Successfully handled reconcile request") + # Mark the processing for the request as complete + queue.processing_complete(request) + + async def _task_cancel_and_wait(self, task: asyncio.Task): + """ + Cancels a task and waits for it to be done. + """ + # We cannot wait on the task directly as we want this function to be cancellable + # e.g. the task might be shielded from cancellation + # Instead, we make a future that completes when the task completes and wait on that + future = asyncio.get_running_loop().create_future() + def callback(task: asyncio.Task): + if not future.done(): + try: + # Try to resolve the future with the result of the task + future.set_result(task.result()) + except BaseException as exc: + # If the task raises an exception, resolve with that + future.set_exception(exc) + task.add_done_callback(callback) + # Cancel the task, but wait on our proxy future + try: + task.cancel() + await future + finally: + task.remove_done_callback(callback) + + async def run(self, client: AsyncClient, reconciler: Reconciler): + """ + Run the controller with reconciliation using the given reconciler. + """ + # The queue is used to spread work between the workers + queue = Queue() + # Create the tasks that we will coordinate + tasks = [ + # Tasks to push requests onto the queue + asyncio.create_task(watch.run(client, queue)) + for watch in self._watches + ] + [ + # Worker tasks to process requests + asyncio.create_task(self._worker(client, reconciler, queue, idx)) + for idx in range(self._worker_count) + ] + # All of the tasks should run forever, so we exit when the first one completes + done, not_done = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED) + for task in not_done: + await self._task_cancel_and_wait(task) + for task in done: + task.result() diff --git a/easykube/kubernetes/runtime/queue.py b/easykube/kubernetes/runtime/queue.py new file mode 100644 index 0000000..e1a0626 --- /dev/null +++ b/easykube/kubernetes/runtime/queue.py @@ -0,0 +1,154 @@ +import asyncio +import collections +import typing + +from .reconciler import Request + + +class Queue: + """ + Queue of (request, attempt) tuples representing requests to reconcile objects. + + The queue is "smart" in a few ways: + + 1. It has explicit operations for enqueuing a new request and requeuing a request that + has previously been attempted. + + 2. Requeuing of a request that has been previously attempted only happens after a delay. + This happens asynchronously so that it does not block the worker from moving on to the + next request. + + 3. At most one request per object can be in the queue at any given time. + + 4. Only one request per object is allowed to be "active" at any given time. + The queue records when a request leaves the queue and does not allow any more requests + for the same object to leave the queue until it has been notified that the + previous request has been completed (either explicitly or by requeuing). + + Note that this means that requests for objects that are changing often will be pushed to + the back of the queue. + """ + def __init__(self): + # The main queue of (request, attempt) tuples + self._queue: typing.List[typing.Tuple[Request, int]] = [] + # A queue of futures + # Each waiting "dequeuer" adds a future to the queue and waits on it + # When a request becomes available, the first future in the queue is resolved, which + # "wakes up" the corresponding dequeuer to read the request from the queue + self._futures: typing.Deque[asyncio.Future] = collections.deque() + # A map of request key to request ID for active requests + self._active: typing.Dict[str, str] = {} + # A map of request key to handles for requeue callbacks + self._handles: typing.Dict[str, asyncio.TimerHandle] = {} + + def _wakeup_next_dequeue(self): + # Wake up the next eligible dequeuer by resolving the first future in the queue + while self._futures: + future = self._futures.popleft() + if not future.done(): + future.set_result(None) + break + + async def dequeue(self) -> typing.Tuple[Request, int]: + """ + Remove and return a request from the queue. + + If there are no requests that are eligible to leave the queue, wait until there is one. + """ + while True: + # Find the index of the first request in the queue for which there is no active task + idx = next( + ( + i + for i, (req, _) in enumerate(self._queue) + if req.key not in self._active + ), + -1 + ) + # If there is such a request, extract it from the queue and return it + if idx >= 0: + request, attempt = self._queue.pop(idx) + # Register the request as having an active processing task + self._active[request.key] = request.id + return (request, attempt) + # If there is no such request, wait to be woken up when the situation changes + future = asyncio.get_running_loop().create_future() + self._futures.append(future) + await future + + def _do_enqueue(self, request: Request, attempt: int = 0): + # Cancel any pending requeues for the same request + self._cancel_requeue(request) + # Append the request to the queue + self._queue.append((request, attempt)) + # Wake up the next waiting dequeuer + self._wakeup_next_dequeue() + + def enqueue(self, request: Request): + """ + Add a new request to the queue. + """ + # If a request with the same key is in the queue, discard it + idx = next( + ( + i + for i, (req, _) in enumerate(self._queue) + if req.key == request.key + ), + -1 + ) + if idx >= 0: + self._queue.pop(idx) + # Add the new request to the end of the queue + self._do_enqueue(request) + + def _do_requeue(self, request: Request, attempt: int): + # If a request with the same key is already in the queue, discard this one + # If not, enqueue it + if not any(req.key == request.key for req, _ in self._queue): + self._do_enqueue(request, attempt) + else: + self._cancel_requeue(request) + + def _cancel_requeue(self, request: Request): + # Cancel and discard any requeue handle for the request + handle = self._handles.pop(request.key, None) + if handle: + handle.cancel() + + def requeue(self, request: Request, attempt: int, delay: int): + """ + Requeue a request after the specified delay. + + If a request with the same key is already in the queue when the delay has elapsed, + the request is discarded. + """ + # If there is already an existing requeue handle, cancel it + self._cancel_requeue(request) + # If there is already a request with the same key on the queue, there is nothing to do + # If not, schedule a requeue after a delay + # + # NOTE(mkjpryor) + # We use a callback rather than a task to schedule the requeue + # This is because it allows us to cancel the requeue cleanly without trapping + # CancelledError, allowing the controller as a whole to be cancelled reliably + if not any(req.key == request.key for req, _ in self._queue): + # Schedule the requeue for the future and stash the handle + self._handles[request.key] = asyncio.get_running_loop().call_later( + delay, + self._do_requeue, + request, + attempt + ) + # If a request is being requeued, assume the processing is complete + self.processing_complete(request) + + def processing_complete(self, request: Request): + """ + Indicates to the queue that processing for the given request is complete. + """ + # Only clear the active record if the request ID matches + if request.key in self._active and self._active[request.key] == request.id: + self._active.pop(request.key) + # Clearing the key may make another request eligible for processing + self._wakeup_next_dequeue() diff --git a/easykube/kubernetes/runtime/reconciler.py b/easykube/kubernetes/runtime/reconciler.py new file mode 100644 index 0000000..b861d6b --- /dev/null +++ b/easykube/kubernetes/runtime/reconciler.py @@ -0,0 +1,54 @@ +import dataclasses +import typing +import uuid + +from ..client import AsyncClient + + +@dataclasses.dataclass(frozen = True) +class Request: + """ + Represents a request to reconcile an object. + """ + #: The name of the object to reconcile + name: str + #: The namespace of the object to reconcile, or none for cluster-scoped objects + namespace: typing.Optional[str] = None + #: The ID of the request + id: str = dataclasses.field(default_factory = lambda: str(uuid.uuid4())) + + @property + def key(self): + """ + The key for the request. + """ + return f"{self.namespace}/{self.name}" + + +@dataclasses.dataclass(frozen = True) +class Result: + """ + Represents the result of a reconciliation. + """ + #: Indicates whether the request should be requeued + requeue: bool = False + #: Indicates the time in seconds after which the request should be requeued + #: If not given, a clamped exponential backoff is used + requeue_after: typing.Optional[int] = None + + +class Reconciler: + """ + Base class for a reconciler. + """ + def reconcile(self, client: AsyncClient, request: Request) -> Result: + """ + Reconcile the given request. + """ + raise NotImplementedError + + def handle_exception(self, client: AsyncClient, request: Request, exc: Exception) -> Result: + """ + Handle an exception that occured while processing a request. + """ + raise NotImplementedError From 2e6e87fae945831b779fb71d5bd00780ab70f79f Mon Sep 17 00:00:00 2001 From: Matt Pryor Date: Thu, 18 Jul 2024 10:10:49 +0100 Subject: [PATCH 2/4] First pass at controller --- easykube/__init__.py | 2 + easykube/kubernetes/__init__.py | 4 +- easykube/kubernetes/client/__init__.py | 2 +- easykube/kubernetes/client/resource.py | 14 +- easykube/kubernetes/runtime/__init__.py | 2 + easykube/kubernetes/runtime/controller.py | 263 ++++++++++++++++++++++ easykube/kubernetes/runtime/queue.py | 154 +++++++++++++ easykube/kubernetes/runtime/reconciler.py | 54 +++++ 8 files changed, 489 insertions(+), 6 deletions(-) create mode 100644 easykube/kubernetes/runtime/__init__.py create mode 100644 easykube/kubernetes/runtime/controller.py create mode 100644 easykube/kubernetes/runtime/queue.py create mode 100644 easykube/kubernetes/runtime/reconciler.py diff --git a/easykube/__init__.py b/easykube/__init__.py index f04f86d..477f669 100644 --- a/easykube/__init__.py +++ b/easykube/__init__.py @@ -6,5 +6,7 @@ Configuration, ResourceSpec, SyncClient, + LabelSelector, + DeletePropagationPolicy, resources ) diff --git a/easykube/kubernetes/__init__.py b/easykube/kubernetes/__init__.py index ad59664..84f92f0 100644 --- a/easykube/kubernetes/__init__.py +++ b/easykube/kubernetes/__init__.py @@ -4,7 +4,9 @@ ApiError, AsyncClient, ResourceSpec, - SyncClient + SyncClient, + LabelSelector, + DeletePropagationPolicy ) from .config import Configuration from . import resources diff --git a/easykube/kubernetes/client/__init__.py b/easykube/kubernetes/client/__init__.py index 83003b1..8fcb1b6 100644 --- a/easykube/kubernetes/client/__init__.py +++ b/easykube/kubernetes/client/__init__.py @@ -2,5 +2,5 @@ from .client import AsyncClient, SyncClient from .errors import ApiError from .iterators import ListResponseIterator, WatchEvents -from .resource import PRESENT, ABSENT, DeletePropagationPolicy, Resource +from .resource import PRESENT, ABSENT, LabelSelector, DeletePropagationPolicy, Resource from .spec import ResourceSpec diff --git a/easykube/kubernetes/client/resource.py b/easykube/kubernetes/client/resource.py index 5364338..e121b24 100644 --- a/easykube/kubernetes/client/resource.py +++ b/easykube/kubernetes/client/resource.py @@ -8,10 +8,16 @@ from .iterators import ListResponseIterator, WatchEvents -#: Sentinel object indicating that the presence of a label is required with any value -PRESENT = object() -#: Sentinel object indicating that a label must not be present -ABSENT = object() +class LabelSelector(str, enum.Enum): + #: Indicates that the presence of a label is required + PRESENT = "present" + #: Indicates that a label must not be present + ABSENT = "absent" + + +# Make these instances available at the top-level for backwards compatibility +PRESENT = LabelSelector.PRESENT +ABSENT = LabelSelector.ABSENT class DeletePropagationPolicy(str, enum.Enum): diff --git a/easykube/kubernetes/runtime/__init__.py b/easykube/kubernetes/runtime/__init__.py new file mode 100644 index 0000000..d659869 --- /dev/null +++ b/easykube/kubernetes/runtime/__init__.py @@ -0,0 +1,2 @@ +from .controller import Controller +from .reconciler import Reconciler, Request, Result diff --git a/easykube/kubernetes/runtime/controller.py b/easykube/kubernetes/runtime/controller.py new file mode 100644 index 0000000..0c35ee4 --- /dev/null +++ b/easykube/kubernetes/runtime/controller.py @@ -0,0 +1,263 @@ +import asyncio +import logging +import random +import typing as t + +from ..client import AsyncClient, LabelSelector + +from .queue import Queue +from .reconciler import Request, Result, Reconciler + + +logger = logging.getLogger(__name__) + + +LabelValue = t.Union[LabelSelector, t.List[str], str] + + +class Watch: + """ + Watches a Kubernetes resource and produces reconcile requests. + """ + def __init__( + self, + api_version: str, + kind: str, + mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]], + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None + ): + self._api_version = api_version + self._kind = kind + self._mapper = mapper + self._labels = labels + self._namespace = namespace + + async def run(self, client: AsyncClient, queue: Queue): + """ + Run the watch, pushing requests onto the specified queue. + """ + resource = await client.api(self._api_version).resource(self._kind) + watch_kwargs = {} + if self._labels: + watch_kwargs["labels"] = self._labels + if self._namespace: + watch_kwargs["namespace"] = self._namespace + logger.info( + "Starting watch", + extra = { + "api_version": self._api_version, + "kind": self._kind, + } + ) + initial, events = await resource.watch_list(**watch_kwargs) + for obj in initial: + for request in self._mapper(obj): + queue.enqueue(request) + async for event in events: + for request in self._mapper(event["object"]): + queue.enqueue(request) + + +class Controller: + """ + Class for a controller that watches a resource and its related resources and calls + a reconciler whenever an object needs to be reconciled. + """ + def __init__( + self, + api_version: str, + kind: str, + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None, + worker_count: int = 10, + requeue_max_backoff: int = 120, + ): + self._api_version = api_version + self._kind = kind + self._namespace = namespace + self._worker_count = worker_count + self._requeue_max_backoff = requeue_max_backoff + self._watches: t.List[Watch] = [ + # Start with a watch for the controller resource that produces reconciliation + # requests using the name and namespace from the metadata + Watch( + api_version, + kind, + lambda obj: [ + Request( + obj["metadata"]["name"], + obj["metadata"].get("namespace") + ), + ], + labels = labels, + namespace = namespace + ), + ] + + def owns( + self, + api_version: str, + kind: str, + *, + controller_only: bool = True + ): + """ + Specifies child objects that the controller objects owns and that should trigger + reconciliation of the parent object. + """ + self._watches.append( + Watch( + api_version, + kind, + lambda obj: [ + Request(ref["name"], obj["metadata"].get("namespace")) + for ref in obj["metadata"].get("ownerReferences", []) + if ( + ref["apiVersion"] == self._api_version and + ref["kind"] == self._kind and + (not controller_only or ref.get("controller", False)) + ) + ], + namespace = self._namespace + ) + ) + + def watches( + self, + api_version: str, + kind: str, + mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]], + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None + ): + """ + Watches the specified resource and uses the given mapper function to produce + reconciliation requests for the controller resource. + """ + self._watches.append( + Watch( + api_version, + kind, + mapper, + labels = labels, + namespace = namespace or self._namespace + ) + ) + + def _request_logger(self, request: Request, worker_idx: int): + """ + Returns a logger for the given request. + """ + return logging.LoggerAdapter( + self._logger, + { + "api_version": self._api_version, + "kind": self._kind, + "instance": request.key, + "request_id": request.id, + "worker_idx": worker_idx, + } + ) + + async def _worker( + self, + client: AsyncClient, + reconciler: Reconciler, + queue: Queue, + worker_idx: int + ): + """ + Start a worker that processes reconcile requests using the given reconciler. + """ + while True: + request, attempt = await queue.dequeue() + # Get a logger that populates parameters for the request + logger = self._request_logger(request) + logger.info("Handling reconcile request (attempt %d)", attempt + 1) + # Try to reconcile the request + try: + result = reconciler.reconcile(client, request) + except asyncio.CancelledError: + # Propagate cancellations with no further action + raise + except Exception as exc: + # Log the exception before doing anything + logger.exception("Error handling reconcile request") + # Allow the reconciler to handle the exception + # By returning a result, the reconciler can choose not to requeue the request or + # opt for a fixed delay rather than the exponential backoff + # If it raises an exception, the request is requeued with an exponential backoff + try: + result = reconciler.handle_exception(client, request, exc) + except NotImplementedError: + # If the method is not implemented, we just want to requeue with a backoff + result = Result(True) + except Exception: + # If a different exception is raised, log that as well before requeuing + logger.exception("Error handling reconcile exception") + result = Result(True) + # Work out whether we need to requeue or whether we are done + if result.requeue: + if result.requeue_after: + delay = result.requeue_after + # If a specific delay is requested, reset the attempts + attempt = -1 + else: + delay = min(2**attempt + random.uniform(0, 1), self._requeue_max_backoff) + logger.info("Requeuing request after %ds", delay) + queue.requeue(request, attempt + 1, delay) + else: + logger.info("Successfully handled reconcile request") + # Mark the processing for the request as complete + queue.processing_complete(request) + + async def _task_cancel_and_wait(self, task: asyncio.Task): + """ + Cancels a task and waits for it to be done. + """ + # We cannot wait on the task directly as we want this function to be cancellable + # e.g. the task might be shielded from cancellation + # Instead, we make a future that completes when the task completes and wait on that + future = asyncio.get_running_loop().create_future() + def callback(task: asyncio.Task): + if not future.done(): + try: + # Try to resolve the future with the result of the task + future.set_result(task.result()) + except BaseException as exc: + # If the task raises an exception, resolve with that + future.set_exception(exc) + task.add_done_callback(callback) + # Cancel the task, but wait on our proxy future + try: + task.cancel() + await future + finally: + task.remove_done_callback(callback) + + async def run(self, client: AsyncClient, reconciler: Reconciler): + """ + Run the controller with reconciliation using the given reconciler. + """ + # The queue is used to spread work between the workers + queue = Queue() + # Create the tasks that we will coordinate + tasks = [ + # Tasks to push requests onto the queue + asyncio.create_task(watch.run(client, queue)) + for watch in self._watches + ] + [ + # Worker tasks to process requests + asyncio.create_task(self._worker(client, reconciler, queue, idx)) + for idx in range(self._worker_count) + ] + # All of the tasks should run forever, so we exit when the first one completes + done, not_done = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED) + for task in not_done: + await self._task_cancel_and_wait(task) + for task in done: + task.result() diff --git a/easykube/kubernetes/runtime/queue.py b/easykube/kubernetes/runtime/queue.py new file mode 100644 index 0000000..e1a0626 --- /dev/null +++ b/easykube/kubernetes/runtime/queue.py @@ -0,0 +1,154 @@ +import asyncio +import collections +import typing + +from .reconciler import Request + + +class Queue: + """ + Queue of (request, attempt) tuples representing requests to reconcile objects. + + The queue is "smart" in a few ways: + + 1. It has explicit operations for enqueuing a new request and requeuing a request that + has previously been attempted. + + 2. Requeuing of a request that has been previously attempted only happens after a delay. + This happens asynchronously so that it does not block the worker from moving on to the + next request. + + 3. At most one request per object can be in the queue at any given time. + + 4. Only one request per object is allowed to be "active" at any given time. + The queue records when a request leaves the queue and does not allow any more requests + for the same object to leave the queue until it has been notified that the + previous request has been completed (either explicitly or by requeuing). + + Note that this means that requests for objects that are changing often will be pushed to + the back of the queue. + """ + def __init__(self): + # The main queue of (request, attempt) tuples + self._queue: typing.List[typing.Tuple[Request, int]] = [] + # A queue of futures + # Each waiting "dequeuer" adds a future to the queue and waits on it + # When a request becomes available, the first future in the queue is resolved, which + # "wakes up" the corresponding dequeuer to read the request from the queue + self._futures: typing.Deque[asyncio.Future] = collections.deque() + # A map of request key to request ID for active requests + self._active: typing.Dict[str, str] = {} + # A map of request key to handles for requeue callbacks + self._handles: typing.Dict[str, asyncio.TimerHandle] = {} + + def _wakeup_next_dequeue(self): + # Wake up the next eligible dequeuer by resolving the first future in the queue + while self._futures: + future = self._futures.popleft() + if not future.done(): + future.set_result(None) + break + + async def dequeue(self) -> typing.Tuple[Request, int]: + """ + Remove and return a request from the queue. + + If there are no requests that are eligible to leave the queue, wait until there is one. + """ + while True: + # Find the index of the first request in the queue for which there is no active task + idx = next( + ( + i + for i, (req, _) in enumerate(self._queue) + if req.key not in self._active + ), + -1 + ) + # If there is such a request, extract it from the queue and return it + if idx >= 0: + request, attempt = self._queue.pop(idx) + # Register the request as having an active processing task + self._active[request.key] = request.id + return (request, attempt) + # If there is no such request, wait to be woken up when the situation changes + future = asyncio.get_running_loop().create_future() + self._futures.append(future) + await future + + def _do_enqueue(self, request: Request, attempt: int = 0): + # Cancel any pending requeues for the same request + self._cancel_requeue(request) + # Append the request to the queue + self._queue.append((request, attempt)) + # Wake up the next waiting dequeuer + self._wakeup_next_dequeue() + + def enqueue(self, request: Request): + """ + Add a new request to the queue. + """ + # If a request with the same key is in the queue, discard it + idx = next( + ( + i + for i, (req, _) in enumerate(self._queue) + if req.key == request.key + ), + -1 + ) + if idx >= 0: + self._queue.pop(idx) + # Add the new request to the end of the queue + self._do_enqueue(request) + + def _do_requeue(self, request: Request, attempt: int): + # If a request with the same key is already in the queue, discard this one + # If not, enqueue it + if not any(req.key == request.key for req, _ in self._queue): + self._do_enqueue(request, attempt) + else: + self._cancel_requeue(request) + + def _cancel_requeue(self, request: Request): + # Cancel and discard any requeue handle for the request + handle = self._handles.pop(request.key, None) + if handle: + handle.cancel() + + def requeue(self, request: Request, attempt: int, delay: int): + """ + Requeue a request after the specified delay. + + If a request with the same key is already in the queue when the delay has elapsed, + the request is discarded. + """ + # If there is already an existing requeue handle, cancel it + self._cancel_requeue(request) + # If there is already a request with the same key on the queue, there is nothing to do + # If not, schedule a requeue after a delay + # + # NOTE(mkjpryor) + # We use a callback rather than a task to schedule the requeue + # This is because it allows us to cancel the requeue cleanly without trapping + # CancelledError, allowing the controller as a whole to be cancelled reliably + if not any(req.key == request.key for req, _ in self._queue): + # Schedule the requeue for the future and stash the handle + self._handles[request.key] = asyncio.get_running_loop().call_later( + delay, + self._do_requeue, + request, + attempt + ) + # If a request is being requeued, assume the processing is complete + self.processing_complete(request) + + def processing_complete(self, request: Request): + """ + Indicates to the queue that processing for the given request is complete. + """ + # Only clear the active record if the request ID matches + if request.key in self._active and self._active[request.key] == request.id: + self._active.pop(request.key) + # Clearing the key may make another request eligible for processing + self._wakeup_next_dequeue() diff --git a/easykube/kubernetes/runtime/reconciler.py b/easykube/kubernetes/runtime/reconciler.py new file mode 100644 index 0000000..b861d6b --- /dev/null +++ b/easykube/kubernetes/runtime/reconciler.py @@ -0,0 +1,54 @@ +import dataclasses +import typing +import uuid + +from ..client import AsyncClient + + +@dataclasses.dataclass(frozen = True) +class Request: + """ + Represents a request to reconcile an object. + """ + #: The name of the object to reconcile + name: str + #: The namespace of the object to reconcile, or none for cluster-scoped objects + namespace: typing.Optional[str] = None + #: The ID of the request + id: str = dataclasses.field(default_factory = lambda: str(uuid.uuid4())) + + @property + def key(self): + """ + The key for the request. + """ + return f"{self.namespace}/{self.name}" + + +@dataclasses.dataclass(frozen = True) +class Result: + """ + Represents the result of a reconciliation. + """ + #: Indicates whether the request should be requeued + requeue: bool = False + #: Indicates the time in seconds after which the request should be requeued + #: If not given, a clamped exponential backoff is used + requeue_after: typing.Optional[int] = None + + +class Reconciler: + """ + Base class for a reconciler. + """ + def reconcile(self, client: AsyncClient, request: Request) -> Result: + """ + Reconcile the given request. + """ + raise NotImplementedError + + def handle_exception(self, client: AsyncClient, request: Request, exc: Exception) -> Result: + """ + Handle an exception that occured while processing a request. + """ + raise NotImplementedError From 5bad7df8cb48a03b0c0cae8a1baaf7b101bad3df Mon Sep 17 00:00:00 2001 From: Matt Pryor Date: Mon, 22 Jul 2024 17:33:04 +0100 Subject: [PATCH 3/4] Runtime working in a real controller --- easykube/__init__.py | 10 +- easykube/kubernetes/__init__.py | 8 +- easykube/kubernetes/client/__init__.py | 2 +- easykube/kubernetes/runtime/__init__.py | 3 +- easykube/kubernetes/runtime/controller.py | 161 +++++------------- easykube/kubernetes/runtime/manager.py | 72 ++++++++ easykube/kubernetes/runtime/queue.py | 14 +- .../runtime/{reconciler.py => reconcile.py} | 25 +-- easykube/kubernetes/runtime/util.py | 47 +++++ easykube/kubernetes/runtime/watch.py | 60 +++++++ easykube/runtime.py | 1 + 11 files changed, 248 insertions(+), 155 deletions(-) create mode 100644 easykube/kubernetes/runtime/manager.py rename easykube/kubernetes/runtime/{reconciler.py => reconcile.py} (58%) create mode 100644 easykube/kubernetes/runtime/util.py create mode 100644 easykube/kubernetes/runtime/watch.py create mode 100644 easykube/runtime.py diff --git a/easykube/__init__.py b/easykube/__init__.py index 477f669..7bc640b 100644 --- a/easykube/__init__.py +++ b/easykube/__init__.py @@ -1,12 +1,16 @@ from .kubernetes import ( PRESENT, ABSENT, + Api, ApiError, AsyncClient, Configuration, + DeletePropagationPolicy, + LabelSelector, + ListResponseIterator, + resources, + Resource, ResourceSpec, SyncClient, - LabelSelector, - DeletePropagationPolicy, - resources + WatchEvents ) diff --git a/easykube/kubernetes/__init__.py b/easykube/kubernetes/__init__.py index 84f92f0..68e2a20 100644 --- a/easykube/kubernetes/__init__.py +++ b/easykube/kubernetes/__init__.py @@ -1,12 +1,16 @@ from .client import ( PRESENT, ABSENT, + Api, ApiError, AsyncClient, + DeletePropagationPolicy, + LabelSelector, + ListResponseIterator, + Resource, ResourceSpec, SyncClient, - LabelSelector, - DeletePropagationPolicy + WatchEvents ) from .config import Configuration from . import resources diff --git a/easykube/kubernetes/client/__init__.py b/easykube/kubernetes/client/__init__.py index 8fcb1b6..2dbb35e 100644 --- a/easykube/kubernetes/client/__init__.py +++ b/easykube/kubernetes/client/__init__.py @@ -2,5 +2,5 @@ from .client import AsyncClient, SyncClient from .errors import ApiError from .iterators import ListResponseIterator, WatchEvents -from .resource import PRESENT, ABSENT, LabelSelector, DeletePropagationPolicy, Resource +from .resource import PRESENT, ABSENT, DeletePropagationPolicy, LabelSelector, Resource from .spec import ResourceSpec diff --git a/easykube/kubernetes/runtime/__init__.py b/easykube/kubernetes/runtime/__init__.py index d659869..3dd36aa 100644 --- a/easykube/kubernetes/runtime/__init__.py +++ b/easykube/kubernetes/runtime/__init__.py @@ -1,2 +1,3 @@ from .controller import Controller -from .reconciler import Reconciler, Request, Result +from .manager import Manager +from .reconcile import ReconcileFunc, Request, Result diff --git a/easykube/kubernetes/runtime/controller.py b/easykube/kubernetes/runtime/controller.py index 0c35ee4..86f7688 100644 --- a/easykube/kubernetes/runtime/controller.py +++ b/easykube/kubernetes/runtime/controller.py @@ -6,7 +6,9 @@ from ..client import AsyncClient, LabelSelector from .queue import Queue -from .reconciler import Request, Result, Reconciler +from .reconcile import ReconcileFunc, Request, Result +from .util import run_tasks +from .watch import Watch logger = logging.getLogger(__name__) @@ -15,51 +17,6 @@ LabelValue = t.Union[LabelSelector, t.List[str], str] -class Watch: - """ - Watches a Kubernetes resource and produces reconcile requests. - """ - def __init__( - self, - api_version: str, - kind: str, - mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]], - *, - labels: t.Optional[t.Dict[str, LabelValue]] = None, - namespace: t.Optional[str] = None - ): - self._api_version = api_version - self._kind = kind - self._mapper = mapper - self._labels = labels - self._namespace = namespace - - async def run(self, client: AsyncClient, queue: Queue): - """ - Run the watch, pushing requests onto the specified queue. - """ - resource = await client.api(self._api_version).resource(self._kind) - watch_kwargs = {} - if self._labels: - watch_kwargs["labels"] = self._labels - if self._namespace: - watch_kwargs["namespace"] = self._namespace - logger.info( - "Starting watch", - extra = { - "api_version": self._api_version, - "kind": self._kind, - } - ) - initial, events = await resource.watch_list(**watch_kwargs) - for obj in initial: - for request in self._mapper(obj): - queue.enqueue(request) - async for event in events: - for request in self._mapper(event["object"]): - queue.enqueue(request) - - class Controller: """ Class for a controller that watches a resource and its related resources and calls @@ -69,20 +26,20 @@ def __init__( self, api_version: str, kind: str, + reconcile_func: ReconcileFunc, *, labels: t.Optional[t.Dict[str, LabelValue]] = None, namespace: t.Optional[str] = None, worker_count: int = 10, - requeue_max_backoff: int = 120, + requeue_max_backoff: int = 120 ): self._api_version = api_version self._kind = kind self._namespace = namespace self._worker_count = worker_count self._requeue_max_backoff = requeue_max_backoff + self._reconcile_func = reconcile_func self._watches: t.List[Watch] = [ - # Start with a watch for the controller resource that produces reconciliation - # requests using the name and namespace from the metadata Watch( api_version, kind, @@ -94,7 +51,7 @@ def __init__( ], labels = labels, namespace = namespace - ), + ) ] def owns( @@ -103,7 +60,7 @@ def owns( kind: str, *, controller_only: bool = True - ): + ) -> 'Controller': """ Specifies child objects that the controller objects owns and that should trigger reconciliation of the parent object. @@ -124,6 +81,7 @@ def owns( namespace = self._namespace ) ) + return self def watches( self, @@ -133,7 +91,7 @@ def watches( *, labels: t.Optional[t.Dict[str, LabelValue]] = None, namespace: t.Optional[str] = None - ): + ) -> 'Controller': """ Watches the specified resource and uses the given mapper function to produce reconciliation requests for the controller resource. @@ -147,13 +105,14 @@ def watches( namespace = namespace or self._namespace ) ) + return self def _request_logger(self, request: Request, worker_idx: int): """ Returns a logger for the given request. """ return logging.LoggerAdapter( - self._logger, + logger, { "api_version": self._api_version, "kind": self._kind, @@ -163,43 +122,27 @@ def _request_logger(self, request: Request, worker_idx: int): } ) - async def _worker( - self, - client: AsyncClient, - reconciler: Reconciler, - queue: Queue, - worker_idx: int - ): + async def _worker(self, client: AsyncClient, queue: Queue, worker_idx: int): """ - Start a worker that processes reconcile requests using the given reconciler. + Start a worker that processes reconcile requests. """ while True: request, attempt = await queue.dequeue() # Get a logger that populates parameters for the request - logger = self._request_logger(request) + logger = self._request_logger(request, worker_idx) logger.info("Handling reconcile request (attempt %d)", attempt + 1) # Try to reconcile the request try: - result = reconciler.reconcile(client, request) + result = await self._reconcile_func(client, request) except asyncio.CancelledError: # Propagate cancellations with no further action raise - except Exception as exc: - # Log the exception before doing anything + except Exception: logger.exception("Error handling reconcile request") - # Allow the reconciler to handle the exception - # By returning a result, the reconciler can choose not to requeue the request or - # opt for a fixed delay rather than the exponential backoff - # If it raises an exception, the request is requeued with an exponential backoff - try: - result = reconciler.handle_exception(client, request, exc) - except NotImplementedError: - # If the method is not implemented, we just want to requeue with a backoff - result = Result(True) - except Exception: - # If a different exception is raised, log that as well before requeuing - logger.exception("Error handling reconcile exception") - result = Result(True) + result = Result(True) + else: + # If the result is None, use the default result + result = result or Result() # Work out whether we need to requeue or whether we are done if result.requeue: if result.requeue_after: @@ -207,57 +150,31 @@ async def _worker( # If a specific delay is requested, reset the attempts attempt = -1 else: - delay = min(2**attempt + random.uniform(0, 1), self._requeue_max_backoff) - logger.info("Requeuing request after %ds", delay) + delay = min(2**attempt, self._requeue_max_backoff) + # Add some jitter to the requeue + delay = delay + random.uniform(0, 1) + logger.info("Requeuing request after %.3fs", delay) queue.requeue(request, attempt + 1, delay) else: logger.info("Successfully handled reconcile request") # Mark the processing for the request as complete queue.processing_complete(request) - async def _task_cancel_and_wait(self, task: asyncio.Task): - """ - Cancels a task and waits for it to be done. - """ - # We cannot wait on the task directly as we want this function to be cancellable - # e.g. the task might be shielded from cancellation - # Instead, we make a future that completes when the task completes and wait on that - future = asyncio.get_running_loop().create_future() - def callback(task: asyncio.Task): - if not future.done(): - try: - # Try to resolve the future with the result of the task - future.set_result(task.result()) - except BaseException as exc: - # If the task raises an exception, resolve with that - future.set_exception(exc) - task.add_done_callback(callback) - # Cancel the task, but wait on our proxy future - try: - task.cancel() - await future - finally: - task.remove_done_callback(callback) - - async def run(self, client: AsyncClient, reconciler: Reconciler): + async def run(self, client: AsyncClient): """ - Run the controller with reconciliation using the given reconciler. + Run the controller using the given client. """ # The queue is used to spread work between the workers queue = Queue() - # Create the tasks that we will coordinate - tasks = [ - # Tasks to push requests onto the queue - asyncio.create_task(watch.run(client, queue)) - for watch in self._watches - ] + [ - # Worker tasks to process requests - asyncio.create_task(self._worker(client, reconciler, queue, idx)) - for idx in range(self._worker_count) - ] - # All of the tasks should run forever, so we exit when the first one completes - done, not_done = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED) - for task in not_done: - await self._task_cancel_and_wait(task) - for task in done: - task.result() + # Run the tasks that make up the controller + await run_tasks( + [ + # Tasks to push requests onto the queue + asyncio.create_task(watch.run(client, queue)) + for watch in self._watches + ] + [ + # Worker tasks to process requests + asyncio.create_task(self._worker(client, queue, idx)) + for idx in range(self._worker_count) + ] + ) diff --git a/easykube/kubernetes/runtime/manager.py b/easykube/kubernetes/runtime/manager.py new file mode 100644 index 0000000..0a19fac --- /dev/null +++ b/easykube/kubernetes/runtime/manager.py @@ -0,0 +1,72 @@ +import asyncio +import typing as t + +from ..client import AsyncClient, LabelSelector + +from .controller import Controller, ReconcileFunc +from .util import run_tasks + + +LabelValue = t.Union[LabelSelector, t.List[str], str] + + +class Manager: + """ + Manages the execution of multiple controllers with shared resources. + """ + def __init__( + self, + *, + namespace: t.Optional[str] = None, + worker_count: int = 10, + requeue_max_backoff: int = 120 + ): + self._namespace = namespace + self._worker_count = worker_count + self._requeue_max_backoff = requeue_max_backoff + self._controllers: t.List[Controller] = [] + + def register_controller(self, controller: Controller) -> 'Manager': + """ + Register the given controller with this manager. + """ + self._controllers.append(controller) + return self + + def create_controller( + self, + api_version: str, + kind: str, + reconcile_func: ReconcileFunc, + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None, + worker_count: t.Optional[int] = None, + requeue_max_backoff: t.Optional[int] = None + ) -> Controller: + """ + Creates a new controller that is registered with this manager. + """ + controller = Controller( + api_version, + kind, + reconcile_func, + labels = labels, + namespace = namespace or self._namespace, + worker_count = worker_count or self._worker_count, + requeue_max_backoff = requeue_max_backoff or self._requeue_max_backoff + ) + self.register_controller(controller) + return controller + + async def run(self, client: AsyncClient): + """ + Run all the controllers registered with the manager using the given client. + """ + assert len(self._controllers) > 0, "no controllers registered" + + # Run a task for each controller + await run_tasks([ + asyncio.create_task(controller.run(client)) + for controller in self._controllers + ]) diff --git a/easykube/kubernetes/runtime/queue.py b/easykube/kubernetes/runtime/queue.py index e1a0626..2e18315 100644 --- a/easykube/kubernetes/runtime/queue.py +++ b/easykube/kubernetes/runtime/queue.py @@ -1,8 +1,8 @@ import asyncio import collections -import typing +import typing as t -from .reconciler import Request +from .reconcile import Request class Queue: @@ -30,16 +30,16 @@ class Queue: """ def __init__(self): # The main queue of (request, attempt) tuples - self._queue: typing.List[typing.Tuple[Request, int]] = [] + self._queue: t.List[t.Tuple[Request, int]] = [] # A queue of futures # Each waiting "dequeuer" adds a future to the queue and waits on it # When a request becomes available, the first future in the queue is resolved, which # "wakes up" the corresponding dequeuer to read the request from the queue - self._futures: typing.Deque[asyncio.Future] = collections.deque() + self._futures: t.Deque[asyncio.Future] = collections.deque() # A map of request key to request ID for active requests - self._active: typing.Dict[str, str] = {} + self._active: t.Dict[str, str] = {} # A map of request key to handles for requeue callbacks - self._handles: typing.Dict[str, asyncio.TimerHandle] = {} + self._handles: t.Dict[str, asyncio.TimerHandle] = {} def _wakeup_next_dequeue(self): # Wake up the next eligible dequeuer by resolving the first future in the queue @@ -49,7 +49,7 @@ def _wakeup_next_dequeue(self): future.set_result(None) break - async def dequeue(self) -> typing.Tuple[Request, int]: + async def dequeue(self) -> t.Tuple[Request, int]: """ Remove and return a request from the queue. diff --git a/easykube/kubernetes/runtime/reconciler.py b/easykube/kubernetes/runtime/reconcile.py similarity index 58% rename from easykube/kubernetes/runtime/reconciler.py rename to easykube/kubernetes/runtime/reconcile.py index b861d6b..49e95a8 100644 --- a/easykube/kubernetes/runtime/reconciler.py +++ b/easykube/kubernetes/runtime/reconcile.py @@ -1,5 +1,5 @@ import dataclasses -import typing +import typing as t import uuid from ..client import AsyncClient @@ -13,7 +13,7 @@ class Request: #: The name of the object to reconcile name: str #: The namespace of the object to reconcile, or none for cluster-scoped objects - namespace: typing.Optional[str] = None + namespace: t.Optional[str] = None #: The ID of the request id: str = dataclasses.field(default_factory = lambda: str(uuid.uuid4())) @@ -22,7 +22,7 @@ def key(self): """ The key for the request. """ - return f"{self.namespace}/{self.name}" + return f"{self.namespace}/{self.name}" if self.namespace else self.name @dataclasses.dataclass(frozen = True) @@ -34,21 +34,8 @@ class Result: requeue: bool = False #: Indicates the time in seconds after which the request should be requeued #: If not given, a clamped exponential backoff is used - requeue_after: typing.Optional[int] = None + requeue_after: t.Optional[int] = None -class Reconciler: - """ - Base class for a reconciler. - """ - def reconcile(self, client: AsyncClient, request: Request) -> Result: - """ - Reconcile the given request. - """ - raise NotImplementedError - - def handle_exception(self, client: AsyncClient, request: Request, exc: Exception) -> Result: - """ - Handle an exception that occured while processing a request. - """ - raise NotImplementedError +#: Type for a reconciliation function +ReconcileFunc = t.Callable[[AsyncClient, Request], t.Awaitable[t.Optional[Result]]] diff --git a/easykube/kubernetes/runtime/util.py b/easykube/kubernetes/runtime/util.py new file mode 100644 index 0000000..15249d7 --- /dev/null +++ b/easykube/kubernetes/runtime/util.py @@ -0,0 +1,47 @@ +import asyncio +import functools +import typing + + +def _resolve_future(future: asyncio.Future, task: asyncio.Task): + if not future.done(): + try: + # Try to resolve the future with the result of the task + future.set_result(task.result()) + except BaseException as exc: + # If the task raises an exception, resolve with that + future.set_exception(exc) + + +async def task_cancel_and_wait(task: asyncio.Task): + """ + Cancel the task and wait for it to exit. + """ + # We cannot wait on the task directly as we want this function to be cancellable + # e.g. the task might be shielded from cancellation + # Instead, we make a future that completes when the task completes and wait on that + loop = asyncio.get_running_loop() + future = loop.create_future() + callback = functools.partial(_resolve_future, future) + task.add_done_callback(callback) + + try: + # Cancel the task, but wait on our proxy future + task.cancel() + await future + except asyncio.CancelledError: + # Suppress the cancelled exception as we no longer need it + pass + finally: + task.remove_done_callback(callback) + + +async def run_tasks(tasks: typing.List[asyncio.Task]): + """ + Run the specified tasks until one exits. + """ + done, not_done = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED) + for task in not_done: + await task_cancel_and_wait(task) + for task in done: + task.result() diff --git a/easykube/kubernetes/runtime/watch.py b/easykube/kubernetes/runtime/watch.py new file mode 100644 index 0000000..5bdc0e7 --- /dev/null +++ b/easykube/kubernetes/runtime/watch.py @@ -0,0 +1,60 @@ +import logging +import typing as t + +from ..client import AsyncClient, LabelSelector + +from .queue import Queue +from .reconcile import Request + + +logger = logging.getLogger(__name__) + + +LabelValue = t.Union[LabelSelector, t.List[str], str] + + +class Watch: + """ + Watches a Kubernetes resource and produces reconcile requests. + """ + def __init__( + self, + api_version: str, + kind: str, + mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]], + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None + ): + self._api_version = api_version + self._kind = kind + self._mapper = mapper + self._labels = labels + self._namespace = namespace + + async def run(self, client: AsyncClient, queue: Queue): + """ + Run the watch, pushing requests onto the specified queue. + """ + resource = await client.api(self._api_version).resource(self._kind) + watch_kwargs = {} + if self._labels: + watch_kwargs["labels"] = self._labels + if self._namespace: + watch_kwargs["namespace"] = self._namespace + else: + watch_kwargs["all_namespaces"] = True + logger.info( + "Starting watch", + extra = { + "api_version": self._api_version, + "kind": self._kind, + } + ) + initial, events = await resource.watch_list(**watch_kwargs) + for obj in initial: + for request in self._mapper(obj): + queue.enqueue(request) + async for event in events: + for request in self._mapper(event["object"]): + queue.enqueue(request) diff --git a/easykube/runtime.py b/easykube/runtime.py new file mode 100644 index 0000000..543f3e5 --- /dev/null +++ b/easykube/runtime.py @@ -0,0 +1 @@ +from .kubernetes.runtime import Controller, Manager, ReconcileFunc, Request, Result From 905fa87d0fa684a356bf9471a62a8401abec460f Mon Sep 17 00:00:00 2001 From: Matt Pryor Date: Tue, 23 Jul 2024 11:33:59 +0100 Subject: [PATCH 4/4] Changes to allow a shared worker pool between all controllers --- easykube/kubernetes/runtime/__init__.py | 1 + easykube/kubernetes/runtime/controller.py | 97 +++++++++++-------- easykube/kubernetes/runtime/manager.py | 24 +++-- easykube/kubernetes/runtime/queue.py | 32 ++++-- easykube/kubernetes/runtime/worker_pool.py | 107 +++++++++++++++++++++ easykube/runtime.py | 10 +- 6 files changed, 214 insertions(+), 57 deletions(-) create mode 100644 easykube/kubernetes/runtime/worker_pool.py diff --git a/easykube/kubernetes/runtime/__init__.py b/easykube/kubernetes/runtime/__init__.py index 3dd36aa..9a95507 100644 --- a/easykube/kubernetes/runtime/__init__.py +++ b/easykube/kubernetes/runtime/__init__.py @@ -1,3 +1,4 @@ from .controller import Controller from .manager import Manager from .reconcile import ReconcileFunc, Request, Result +from .worker_pool import Worker, WorkerPool diff --git a/easykube/kubernetes/runtime/controller.py b/easykube/kubernetes/runtime/controller.py index 865ae36..2fb99d7 100644 --- a/easykube/kubernetes/runtime/controller.py +++ b/easykube/kubernetes/runtime/controller.py @@ -9,6 +9,7 @@ from .reconcile import ReconcileFunc, Request, Result from .util import run_tasks from .watch import Watch +from .worker_pool import WorkerPool logger = logging.getLogger(__name__) @@ -31,12 +32,13 @@ def __init__( labels: t.Optional[t.Dict[str, LabelValue]] = None, namespace: t.Optional[str] = None, worker_count: int = 10, + worker_pool: t.Optional[WorkerPool] = None, requeue_max_backoff: int = 120 ): self._api_version = api_version self._kind = kind self._namespace = namespace - self._worker_count = worker_count + self._worker_pool = worker_pool or WorkerPool(worker_count) self._requeue_max_backoff = requeue_max_backoff self._reconcile_func = reconcile_func self._watches: t.List[Watch] = [ @@ -107,7 +109,7 @@ def watches( ) return self - def _request_logger(self, request: Request, worker_idx: int): + def _request_logger(self, request: Request, worker_id: int): """ Returns a logger for the given request. """ @@ -118,47 +120,67 @@ def _request_logger(self, request: Request, worker_idx: int): "kind": self._kind, "instance": request.key, "request_id": request.id, - "worker_idx": worker_idx, + "worker_id": worker_id, } ) - async def _worker(self, client: AsyncClient, queue: Queue, worker_idx: int): + async def _handle_request( + self, + client: AsyncClient, + queue: Queue, + worker_id: int, + request: Request, + attempt: int + ): """ Start a worker that processes reconcile requests. """ + # Get a logger that populates parameters for the request + logger = self._request_logger(request, worker_id) + logger.info("Handling reconcile request (attempt %d)", attempt + 1) + # Try to reconcile the request + try: + result = await self._reconcile_func(client, request) + except asyncio.CancelledError: + # Propagate cancellations with no further action + raise + except Exception: + logger.exception("Error handling reconcile request") + result = Result(True) + else: + # If the result is None, use the default result + result = result or Result() + # Work out whether we need to requeue or whether we are done + if result.requeue: + if result.requeue_after: + delay = result.requeue_after + # If a specific delay is requested, reset the attempts + attempt = -1 + else: + delay = min(2**attempt, self._requeue_max_backoff) + # Add some jitter to the requeue + delay = delay + random.uniform(0, 1) + logger.info("Requeuing request after %.3fs", delay) + queue.requeue(request, attempt + 1, delay) + else: + logger.info("Successfully handled reconcile request") + # Mark the processing for the request as complete + queue.processing_complete(request) + + async def _dispatch(self, client: AsyncClient, queue: Queue): + """ + Pulls requests from the queue and dispatches them to the worker pool. + """ while True: + # Spin until there is a request in the queue that is eligible to be dequeued + while not queue.has_eligible_request(): + await asyncio.sleep(0.1) + # Once we know there is an eligible request, wait to reserve a worker + worker = await self._worker_pool.reserve() + # Once we know we have a worker reserved, pull the request from the queue + # and give the task to the worker to process asynchronously request, attempt = await queue.dequeue() - # Get a logger that populates parameters for the request - logger = self._request_logger(request, worker_idx) - logger.info("Handling reconcile request (attempt %d)", attempt + 1) - # Try to reconcile the request - try: - result = await self._reconcile_func(client, request) - except asyncio.CancelledError: - # Propagate cancellations with no further action - raise - except Exception: - logger.exception("Error handling reconcile request") - result = Result(True) - else: - # If the result is None, use the default result - result = result or Result() - # Work out whether we need to requeue or whether we are done - if result.requeue: - if result.requeue_after: - delay = result.requeue_after - # If a specific delay is requested, reset the attempts - attempt = -1 - else: - delay = min(2**attempt, self._requeue_max_backoff) - # Add some jitter to the requeue - delay = delay + random.uniform(0, 1) - logger.info("Requeuing request after %.3fs", delay) - queue.requeue(request, attempt + 1, delay) - else: - logger.info("Successfully handled reconcile request") - # Mark the processing for the request as complete - queue.processing_complete(request) + worker.set_task(self._handle_request, client, queue, worker.id, request, attempt) async def run(self, client: AsyncClient): """ @@ -173,8 +195,7 @@ async def run(self, client: AsyncClient): asyncio.create_task(watch.run(client, queue)) for watch in self._watches ] + [ - # Worker tasks to process requests - asyncio.create_task(self._worker(client, queue, idx)) - for idx in range(self._worker_count) + # Task to dispatch requests to the worker pool + asyncio.create_task(self._dispatch(client, queue)), ] ) diff --git a/easykube/kubernetes/runtime/manager.py b/easykube/kubernetes/runtime/manager.py index 0a19fac..16c476d 100644 --- a/easykube/kubernetes/runtime/manager.py +++ b/easykube/kubernetes/runtime/manager.py @@ -5,6 +5,7 @@ from .controller import Controller, ReconcileFunc from .util import run_tasks +from .worker_pool import WorkerPool LabelValue = t.Union[LabelSelector, t.List[str], str] @@ -19,10 +20,11 @@ def __init__( *, namespace: t.Optional[str] = None, worker_count: int = 10, + worker_pool: t.Optional[WorkerPool] = None, requeue_max_backoff: int = 120 ): self._namespace = namespace - self._worker_count = worker_count + self._worker_pool = worker_pool or WorkerPool(worker_count) self._requeue_max_backoff = requeue_max_backoff self._controllers: t.List[Controller] = [] @@ -41,7 +43,7 @@ def create_controller( *, labels: t.Optional[t.Dict[str, LabelValue]] = None, namespace: t.Optional[str] = None, - worker_count: t.Optional[int] = None, + worker_pool: t.Optional[WorkerPool] = None, requeue_max_backoff: t.Optional[int] = None ) -> Controller: """ @@ -53,7 +55,7 @@ def create_controller( reconcile_func, labels = labels, namespace = namespace or self._namespace, - worker_count = worker_count or self._worker_count, + worker_pool = worker_pool or self._worker_pool, requeue_max_backoff = requeue_max_backoff or self._requeue_max_backoff ) self.register_controller(controller) @@ -64,9 +66,13 @@ async def run(self, client: AsyncClient): Run all the controllers registered with the manager using the given client. """ assert len(self._controllers) > 0, "no controllers registered" - - # Run a task for each controller - await run_tasks([ - asyncio.create_task(controller.run(client)) - for controller in self._controllers - ]) + await run_tasks( + [ + # Run a task for each controller and one for the worker pool + asyncio.create_task(controller.run(client)) + for controller in self._controllers + ] + [ + # Run the worker pool + asyncio.create_task(self._worker_pool.run()), + ] + ) diff --git a/easykube/kubernetes/runtime/queue.py b/easykube/kubernetes/runtime/queue.py index 2e18315..62fefe4 100644 --- a/easykube/kubernetes/runtime/queue.py +++ b/easykube/kubernetes/runtime/queue.py @@ -41,8 +41,29 @@ def __init__(self): # A map of request key to handles for requeue callbacks self._handles: t.Dict[str, asyncio.TimerHandle] = {} + def _eligible_idx(self): + """ + Returns the index of the first request in the queue that is eligible to be dequeued. + """ + return next( + ( + i + for i, (req, _) in enumerate(self._queue) + if req.key not in self._active + ), + -1 + ) + + def has_eligible_request(self): + """ + Indicates if the queue has a request that is eligible to be dequeued. + """ + return self._eligible_idx() >= 0 + def _wakeup_next_dequeue(self): - # Wake up the next eligible dequeuer by resolving the first future in the queue + """ + Wake up the next eligible dequeuer by resolving the first future in the queue. + """ while self._futures: future = self._futures.popleft() if not future.done(): @@ -57,14 +78,7 @@ async def dequeue(self) -> t.Tuple[Request, int]: """ while True: # Find the index of the first request in the queue for which there is no active task - idx = next( - ( - i - for i, (req, _) in enumerate(self._queue) - if req.key not in self._active - ), - -1 - ) + idx = self._eligible_idx() # If there is such a request, extract it from the queue and return it if idx >= 0: request, attempt = self._queue.pop(idx) diff --git a/easykube/kubernetes/runtime/worker_pool.py b/easykube/kubernetes/runtime/worker_pool.py new file mode 100644 index 0000000..24d5cf5 --- /dev/null +++ b/easykube/kubernetes/runtime/worker_pool.py @@ -0,0 +1,107 @@ +import asyncio +import typing as t + +from .util import run_tasks + + +class WorkerNotAvailable(RuntimeError): + """ + Raised when a task is scheduled to a worker that already has a task. + """ + + +P = t.ParamSpec("P") +T = t.TypeVar("T") + + +Task = t.Tuple[t.Callable[P, t.Awaitable[T]], t.ParamSpecArgs, t.ParamSpecKwargs] + + +class Worker: + """ + Represents a worker in a pool. + """ + def __init__(self, pool: 'WorkerPool', id: int): + self._pool = pool + self._id = id + self._available = True + self._task: t.Optional[Task] = None + + @property + def id(self): + """ + The ID of the worker. + """ + return self._id + + @property + def available(self): + """ + Indicates whether the worker is available. + """ + return self._available and not self._task + + def reserve(self) -> 'Worker': + """ + Reserve the worker by marking it as unavailable, even if no task is set yet. + """ + self._available = False + return self + + def set_task( + self, + func: t.Callable[P, t.Awaitable[T]], + *args: P.args, + **kwargs: P.kwargs + ): + """ + Run the specified function call using the worker. + """ + if self._task: + raise WorkerNotAvailable + else: + self._task = (func, args, kwargs) + return self + + async def run(self): + """ + Run the worker. + """ + # We run forever, picking up and executing our task as it is set + while True: + if self._task: + func, args, kwargs = self._task + try: + await func(*args, **kwargs) + finally: + self._available = True + self._task = None + else: + # Just relinquish control for now to allow another coroutine to run + await asyncio.sleep(0.1) + + +class WorkerPool: + """ + Represents a worker pool. + """ + def __init__(self, worker_count): + self._workers = [Worker(self, idx) for idx in range(worker_count)] + + async def reserve(self) -> Worker: + """ + Returns an available worker or spins until one becomes available. + """ + while True: + try: + worker = next(w for w in self._workers if w.available) + except StopIteration: + await asyncio.sleep(0.1) + else: + return worker.reserve() + + async def run(self): + """ + Run the workers in the pool. + """ + await run_tasks([w.run() for w in self._workers]) diff --git a/easykube/runtime.py b/easykube/runtime.py index 543f3e5..142b8f9 100644 --- a/easykube/runtime.py +++ b/easykube/runtime.py @@ -1 +1,9 @@ -from .kubernetes.runtime import Controller, Manager, ReconcileFunc, Request, Result +from .kubernetes.runtime import ( + Controller, + Manager, + ReconcileFunc, + Request, + Result, + Worker, + WorkerPool +)