diff --git a/easykube/__init__.py b/easykube/__init__.py index f04f86d..7bc640b 100644 --- a/easykube/__init__.py +++ b/easykube/__init__.py @@ -1,10 +1,16 @@ from .kubernetes import ( PRESENT, ABSENT, + Api, ApiError, AsyncClient, Configuration, + DeletePropagationPolicy, + LabelSelector, + ListResponseIterator, + resources, + Resource, ResourceSpec, SyncClient, - resources + WatchEvents ) diff --git a/easykube/kubernetes/__init__.py b/easykube/kubernetes/__init__.py index ad59664..68e2a20 100644 --- a/easykube/kubernetes/__init__.py +++ b/easykube/kubernetes/__init__.py @@ -1,10 +1,16 @@ from .client import ( PRESENT, ABSENT, + Api, ApiError, AsyncClient, + DeletePropagationPolicy, + LabelSelector, + ListResponseIterator, + Resource, ResourceSpec, - SyncClient + SyncClient, + WatchEvents ) from .config import Configuration from . import resources diff --git a/easykube/kubernetes/client/__init__.py b/easykube/kubernetes/client/__init__.py index 83003b1..2dbb35e 100644 --- a/easykube/kubernetes/client/__init__.py +++ b/easykube/kubernetes/client/__init__.py @@ -2,5 +2,5 @@ from .client import AsyncClient, SyncClient from .errors import ApiError from .iterators import ListResponseIterator, WatchEvents -from .resource import PRESENT, ABSENT, DeletePropagationPolicy, Resource +from .resource import PRESENT, ABSENT, DeletePropagationPolicy, LabelSelector, Resource from .spec import ResourceSpec diff --git a/easykube/kubernetes/client/resource.py b/easykube/kubernetes/client/resource.py index 5364338..e121b24 100644 --- a/easykube/kubernetes/client/resource.py +++ b/easykube/kubernetes/client/resource.py @@ -8,10 +8,16 @@ from .iterators import ListResponseIterator, WatchEvents -#: Sentinel object indicating that the presence of a label is required with any value -PRESENT = object() -#: Sentinel object indicating that a label must not be present -ABSENT = object() +class LabelSelector(str, enum.Enum): + #: Indicates that the presence of a label is required + PRESENT = "present" + #: Indicates that a label must not be present + ABSENT = "absent" + + +# Make these instances available at the top-level for backwards compatibility +PRESENT = LabelSelector.PRESENT +ABSENT = LabelSelector.ABSENT class DeletePropagationPolicy(str, enum.Enum): diff --git a/easykube/kubernetes/runtime/__init__.py b/easykube/kubernetes/runtime/__init__.py new file mode 100644 index 0000000..9a95507 --- /dev/null +++ b/easykube/kubernetes/runtime/__init__.py @@ -0,0 +1,4 @@ +from .controller import Controller +from .manager import Manager +from .reconcile import ReconcileFunc, Request, Result +from .worker_pool import Worker, WorkerPool diff --git a/easykube/kubernetes/runtime/controller.py b/easykube/kubernetes/runtime/controller.py new file mode 100644 index 0000000..2fb99d7 --- /dev/null +++ b/easykube/kubernetes/runtime/controller.py @@ -0,0 +1,201 @@ +import asyncio +import logging +import random +import typing as t + +from ..client import AsyncClient, LabelSelector + +from .queue import Queue +from .reconcile import ReconcileFunc, Request, Result +from .util import run_tasks +from .watch import Watch +from .worker_pool import WorkerPool + + +logger = logging.getLogger(__name__) + + +LabelValue = t.Union[LabelSelector, t.List[str], str] + + +class Controller: + """ + Class for a controller that watches a resource and its related resources and calls + a reconciler whenever an object needs to be reconciled. + """ + def __init__( + self, + api_version: str, + kind: str, + reconcile_func: ReconcileFunc, + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None, + worker_count: int = 10, + worker_pool: t.Optional[WorkerPool] = None, + requeue_max_backoff: int = 120 + ): + self._api_version = api_version + self._kind = kind + self._namespace = namespace + self._worker_pool = worker_pool or WorkerPool(worker_count) + self._requeue_max_backoff = requeue_max_backoff + self._reconcile_func = reconcile_func + self._watches: t.List[Watch] = [ + Watch( + api_version, + kind, + lambda obj: [ + Request( + obj["metadata"]["name"], + obj["metadata"].get("namespace") + ), + ], + labels = labels, + namespace = namespace + ), + ] + + def owns( + self, + api_version: str, + kind: str, + *, + controller_only: bool = True + ) -> 'Controller': + """ + Specifies child objects that the controller objects owns and that should trigger + reconciliation of the parent object. + """ + self._watches.append( + Watch( + api_version, + kind, + lambda obj: [ + Request(ref["name"], obj["metadata"].get("namespace")) + for ref in obj["metadata"].get("ownerReferences", []) + if ( + ref["apiVersion"] == self._api_version and + ref["kind"] == self._kind and + (not controller_only or ref.get("controller", False)) + ) + ], + namespace = self._namespace + ) + ) + return self + + def watches( + self, + api_version: str, + kind: str, + mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]], + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None + ) -> 'Controller': + """ + Watches the specified resource and uses the given mapper function to produce + reconciliation requests for the controller resource. + """ + self._watches.append( + Watch( + api_version, + kind, + mapper, + labels = labels, + namespace = namespace or self._namespace + ) + ) + return self + + def _request_logger(self, request: Request, worker_id: int): + """ + Returns a logger for the given request. + """ + return logging.LoggerAdapter( + logger, + { + "api_version": self._api_version, + "kind": self._kind, + "instance": request.key, + "request_id": request.id, + "worker_id": worker_id, + } + ) + + async def _handle_request( + self, + client: AsyncClient, + queue: Queue, + worker_id: int, + request: Request, + attempt: int + ): + """ + Start a worker that processes reconcile requests. + """ + # Get a logger that populates parameters for the request + logger = self._request_logger(request, worker_id) + logger.info("Handling reconcile request (attempt %d)", attempt + 1) + # Try to reconcile the request + try: + result = await self._reconcile_func(client, request) + except asyncio.CancelledError: + # Propagate cancellations with no further action + raise + except Exception: + logger.exception("Error handling reconcile request") + result = Result(True) + else: + # If the result is None, use the default result + result = result or Result() + # Work out whether we need to requeue or whether we are done + if result.requeue: + if result.requeue_after: + delay = result.requeue_after + # If a specific delay is requested, reset the attempts + attempt = -1 + else: + delay = min(2**attempt, self._requeue_max_backoff) + # Add some jitter to the requeue + delay = delay + random.uniform(0, 1) + logger.info("Requeuing request after %.3fs", delay) + queue.requeue(request, attempt + 1, delay) + else: + logger.info("Successfully handled reconcile request") + # Mark the processing for the request as complete + queue.processing_complete(request) + + async def _dispatch(self, client: AsyncClient, queue: Queue): + """ + Pulls requests from the queue and dispatches them to the worker pool. + """ + while True: + # Spin until there is a request in the queue that is eligible to be dequeued + while not queue.has_eligible_request(): + await asyncio.sleep(0.1) + # Once we know there is an eligible request, wait to reserve a worker + worker = await self._worker_pool.reserve() + # Once we know we have a worker reserved, pull the request from the queue + # and give the task to the worker to process asynchronously + request, attempt = await queue.dequeue() + worker.set_task(self._handle_request, client, queue, worker.id, request, attempt) + + async def run(self, client: AsyncClient): + """ + Run the controller using the given client. + """ + # The queue is used to spread work between the workers + queue = Queue() + # Run the tasks that make up the controller + await run_tasks( + [ + # Tasks to push requests onto the queue + asyncio.create_task(watch.run(client, queue)) + for watch in self._watches + ] + [ + # Task to dispatch requests to the worker pool + asyncio.create_task(self._dispatch(client, queue)), + ] + ) diff --git a/easykube/kubernetes/runtime/manager.py b/easykube/kubernetes/runtime/manager.py new file mode 100644 index 0000000..16c476d --- /dev/null +++ b/easykube/kubernetes/runtime/manager.py @@ -0,0 +1,78 @@ +import asyncio +import typing as t + +from ..client import AsyncClient, LabelSelector + +from .controller import Controller, ReconcileFunc +from .util import run_tasks +from .worker_pool import WorkerPool + + +LabelValue = t.Union[LabelSelector, t.List[str], str] + + +class Manager: + """ + Manages the execution of multiple controllers with shared resources. + """ + def __init__( + self, + *, + namespace: t.Optional[str] = None, + worker_count: int = 10, + worker_pool: t.Optional[WorkerPool] = None, + requeue_max_backoff: int = 120 + ): + self._namespace = namespace + self._worker_pool = worker_pool or WorkerPool(worker_count) + self._requeue_max_backoff = requeue_max_backoff + self._controllers: t.List[Controller] = [] + + def register_controller(self, controller: Controller) -> 'Manager': + """ + Register the given controller with this manager. + """ + self._controllers.append(controller) + return self + + def create_controller( + self, + api_version: str, + kind: str, + reconcile_func: ReconcileFunc, + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None, + worker_pool: t.Optional[WorkerPool] = None, + requeue_max_backoff: t.Optional[int] = None + ) -> Controller: + """ + Creates a new controller that is registered with this manager. + """ + controller = Controller( + api_version, + kind, + reconcile_func, + labels = labels, + namespace = namespace or self._namespace, + worker_pool = worker_pool or self._worker_pool, + requeue_max_backoff = requeue_max_backoff or self._requeue_max_backoff + ) + self.register_controller(controller) + return controller + + async def run(self, client: AsyncClient): + """ + Run all the controllers registered with the manager using the given client. + """ + assert len(self._controllers) > 0, "no controllers registered" + await run_tasks( + [ + # Run a task for each controller and one for the worker pool + asyncio.create_task(controller.run(client)) + for controller in self._controllers + ] + [ + # Run the worker pool + asyncio.create_task(self._worker_pool.run()), + ] + ) diff --git a/easykube/kubernetes/runtime/queue.py b/easykube/kubernetes/runtime/queue.py new file mode 100644 index 0000000..62fefe4 --- /dev/null +++ b/easykube/kubernetes/runtime/queue.py @@ -0,0 +1,168 @@ +import asyncio +import collections +import typing as t + +from .reconcile import Request + + +class Queue: + """ + Queue of (request, attempt) tuples representing requests to reconcile objects. + + The queue is "smart" in a few ways: + + 1. It has explicit operations for enqueuing a new request and requeuing a request that + has previously been attempted. + + 2. Requeuing of a request that has been previously attempted only happens after a delay. + This happens asynchronously so that it does not block the worker from moving on to the + next request. + + 3. At most one request per object can be in the queue at any given time. + + 4. Only one request per object is allowed to be "active" at any given time. + The queue records when a request leaves the queue and does not allow any more requests + for the same object to leave the queue until it has been notified that the + previous request has been completed (either explicitly or by requeuing). + + Note that this means that requests for objects that are changing often will be pushed to + the back of the queue. + """ + def __init__(self): + # The main queue of (request, attempt) tuples + self._queue: t.List[t.Tuple[Request, int]] = [] + # A queue of futures + # Each waiting "dequeuer" adds a future to the queue and waits on it + # When a request becomes available, the first future in the queue is resolved, which + # "wakes up" the corresponding dequeuer to read the request from the queue + self._futures: t.Deque[asyncio.Future] = collections.deque() + # A map of request key to request ID for active requests + self._active: t.Dict[str, str] = {} + # A map of request key to handles for requeue callbacks + self._handles: t.Dict[str, asyncio.TimerHandle] = {} + + def _eligible_idx(self): + """ + Returns the index of the first request in the queue that is eligible to be dequeued. + """ + return next( + ( + i + for i, (req, _) in enumerate(self._queue) + if req.key not in self._active + ), + -1 + ) + + def has_eligible_request(self): + """ + Indicates if the queue has a request that is eligible to be dequeued. + """ + return self._eligible_idx() >= 0 + + def _wakeup_next_dequeue(self): + """ + Wake up the next eligible dequeuer by resolving the first future in the queue. + """ + while self._futures: + future = self._futures.popleft() + if not future.done(): + future.set_result(None) + break + + async def dequeue(self) -> t.Tuple[Request, int]: + """ + Remove and return a request from the queue. + + If there are no requests that are eligible to leave the queue, wait until there is one. + """ + while True: + # Find the index of the first request in the queue for which there is no active task + idx = self._eligible_idx() + # If there is such a request, extract it from the queue and return it + if idx >= 0: + request, attempt = self._queue.pop(idx) + # Register the request as having an active processing task + self._active[request.key] = request.id + return (request, attempt) + # If there is no such request, wait to be woken up when the situation changes + future = asyncio.get_running_loop().create_future() + self._futures.append(future) + await future + + def _do_enqueue(self, request: Request, attempt: int = 0): + # Cancel any pending requeues for the same request + self._cancel_requeue(request) + # Append the request to the queue + self._queue.append((request, attempt)) + # Wake up the next waiting dequeuer + self._wakeup_next_dequeue() + + def enqueue(self, request: Request): + """ + Add a new request to the queue. + """ + # If a request with the same key is in the queue, discard it + idx = next( + ( + i + for i, (req, _) in enumerate(self._queue) + if req.key == request.key + ), + -1 + ) + if idx >= 0: + self._queue.pop(idx) + # Add the new request to the end of the queue + self._do_enqueue(request) + + def _do_requeue(self, request: Request, attempt: int): + # If a request with the same key is already in the queue, discard this one + # If not, enqueue it + if not any(req.key == request.key for req, _ in self._queue): + self._do_enqueue(request, attempt) + else: + self._cancel_requeue(request) + + def _cancel_requeue(self, request: Request): + # Cancel and discard any requeue handle for the request + handle = self._handles.pop(request.key, None) + if handle: + handle.cancel() + + def requeue(self, request: Request, attempt: int, delay: int): + """ + Requeue a request after the specified delay. + + If a request with the same key is already in the queue when the delay has elapsed, + the request is discarded. + """ + # If there is already an existing requeue handle, cancel it + self._cancel_requeue(request) + # If there is already a request with the same key on the queue, there is nothing to do + # If not, schedule a requeue after a delay + # + # NOTE(mkjpryor) + # We use a callback rather than a task to schedule the requeue + # This is because it allows us to cancel the requeue cleanly without trapping + # CancelledError, allowing the controller as a whole to be cancelled reliably + if not any(req.key == request.key for req, _ in self._queue): + # Schedule the requeue for the future and stash the handle + self._handles[request.key] = asyncio.get_running_loop().call_later( + delay, + self._do_requeue, + request, + attempt + ) + # If a request is being requeued, assume the processing is complete + self.processing_complete(request) + + def processing_complete(self, request: Request): + """ + Indicates to the queue that processing for the given request is complete. + """ + # Only clear the active record if the request ID matches + if request.key in self._active and self._active[request.key] == request.id: + self._active.pop(request.key) + # Clearing the key may make another request eligible for processing + self._wakeup_next_dequeue() diff --git a/easykube/kubernetes/runtime/reconcile.py b/easykube/kubernetes/runtime/reconcile.py new file mode 100644 index 0000000..49e95a8 --- /dev/null +++ b/easykube/kubernetes/runtime/reconcile.py @@ -0,0 +1,41 @@ +import dataclasses +import typing as t +import uuid + +from ..client import AsyncClient + + +@dataclasses.dataclass(frozen = True) +class Request: + """ + Represents a request to reconcile an object. + """ + #: The name of the object to reconcile + name: str + #: The namespace of the object to reconcile, or none for cluster-scoped objects + namespace: t.Optional[str] = None + #: The ID of the request + id: str = dataclasses.field(default_factory = lambda: str(uuid.uuid4())) + + @property + def key(self): + """ + The key for the request. + """ + return f"{self.namespace}/{self.name}" if self.namespace else self.name + + +@dataclasses.dataclass(frozen = True) +class Result: + """ + Represents the result of a reconciliation. + """ + #: Indicates whether the request should be requeued + requeue: bool = False + #: Indicates the time in seconds after which the request should be requeued + #: If not given, a clamped exponential backoff is used + requeue_after: t.Optional[int] = None + + +#: Type for a reconciliation function +ReconcileFunc = t.Callable[[AsyncClient, Request], t.Awaitable[t.Optional[Result]]] diff --git a/easykube/kubernetes/runtime/util.py b/easykube/kubernetes/runtime/util.py new file mode 100644 index 0000000..15249d7 --- /dev/null +++ b/easykube/kubernetes/runtime/util.py @@ -0,0 +1,47 @@ +import asyncio +import functools +import typing + + +def _resolve_future(future: asyncio.Future, task: asyncio.Task): + if not future.done(): + try: + # Try to resolve the future with the result of the task + future.set_result(task.result()) + except BaseException as exc: + # If the task raises an exception, resolve with that + future.set_exception(exc) + + +async def task_cancel_and_wait(task: asyncio.Task): + """ + Cancel the task and wait for it to exit. + """ + # We cannot wait on the task directly as we want this function to be cancellable + # e.g. the task might be shielded from cancellation + # Instead, we make a future that completes when the task completes and wait on that + loop = asyncio.get_running_loop() + future = loop.create_future() + callback = functools.partial(_resolve_future, future) + task.add_done_callback(callback) + + try: + # Cancel the task, but wait on our proxy future + task.cancel() + await future + except asyncio.CancelledError: + # Suppress the cancelled exception as we no longer need it + pass + finally: + task.remove_done_callback(callback) + + +async def run_tasks(tasks: typing.List[asyncio.Task]): + """ + Run the specified tasks until one exits. + """ + done, not_done = await asyncio.wait(tasks, return_when = asyncio.FIRST_COMPLETED) + for task in not_done: + await task_cancel_and_wait(task) + for task in done: + task.result() diff --git a/easykube/kubernetes/runtime/watch.py b/easykube/kubernetes/runtime/watch.py new file mode 100644 index 0000000..5bdc0e7 --- /dev/null +++ b/easykube/kubernetes/runtime/watch.py @@ -0,0 +1,60 @@ +import logging +import typing as t + +from ..client import AsyncClient, LabelSelector + +from .queue import Queue +from .reconcile import Request + + +logger = logging.getLogger(__name__) + + +LabelValue = t.Union[LabelSelector, t.List[str], str] + + +class Watch: + """ + Watches a Kubernetes resource and produces reconcile requests. + """ + def __init__( + self, + api_version: str, + kind: str, + mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]], + *, + labels: t.Optional[t.Dict[str, LabelValue]] = None, + namespace: t.Optional[str] = None + ): + self._api_version = api_version + self._kind = kind + self._mapper = mapper + self._labels = labels + self._namespace = namespace + + async def run(self, client: AsyncClient, queue: Queue): + """ + Run the watch, pushing requests onto the specified queue. + """ + resource = await client.api(self._api_version).resource(self._kind) + watch_kwargs = {} + if self._labels: + watch_kwargs["labels"] = self._labels + if self._namespace: + watch_kwargs["namespace"] = self._namespace + else: + watch_kwargs["all_namespaces"] = True + logger.info( + "Starting watch", + extra = { + "api_version": self._api_version, + "kind": self._kind, + } + ) + initial, events = await resource.watch_list(**watch_kwargs) + for obj in initial: + for request in self._mapper(obj): + queue.enqueue(request) + async for event in events: + for request in self._mapper(event["object"]): + queue.enqueue(request) diff --git a/easykube/kubernetes/runtime/worker_pool.py b/easykube/kubernetes/runtime/worker_pool.py new file mode 100644 index 0000000..24d5cf5 --- /dev/null +++ b/easykube/kubernetes/runtime/worker_pool.py @@ -0,0 +1,107 @@ +import asyncio +import typing as t + +from .util import run_tasks + + +class WorkerNotAvailable(RuntimeError): + """ + Raised when a task is scheduled to a worker that already has a task. + """ + + +P = t.ParamSpec("P") +T = t.TypeVar("T") + + +Task = t.Tuple[t.Callable[P, t.Awaitable[T]], t.ParamSpecArgs, t.ParamSpecKwargs] + + +class Worker: + """ + Represents a worker in a pool. + """ + def __init__(self, pool: 'WorkerPool', id: int): + self._pool = pool + self._id = id + self._available = True + self._task: t.Optional[Task] = None + + @property + def id(self): + """ + The ID of the worker. + """ + return self._id + + @property + def available(self): + """ + Indicates whether the worker is available. + """ + return self._available and not self._task + + def reserve(self) -> 'Worker': + """ + Reserve the worker by marking it as unavailable, even if no task is set yet. + """ + self._available = False + return self + + def set_task( + self, + func: t.Callable[P, t.Awaitable[T]], + *args: P.args, + **kwargs: P.kwargs + ): + """ + Run the specified function call using the worker. + """ + if self._task: + raise WorkerNotAvailable + else: + self._task = (func, args, kwargs) + return self + + async def run(self): + """ + Run the worker. + """ + # We run forever, picking up and executing our task as it is set + while True: + if self._task: + func, args, kwargs = self._task + try: + await func(*args, **kwargs) + finally: + self._available = True + self._task = None + else: + # Just relinquish control for now to allow another coroutine to run + await asyncio.sleep(0.1) + + +class WorkerPool: + """ + Represents a worker pool. + """ + def __init__(self, worker_count): + self._workers = [Worker(self, idx) for idx in range(worker_count)] + + async def reserve(self) -> Worker: + """ + Returns an available worker or spins until one becomes available. + """ + while True: + try: + worker = next(w for w in self._workers if w.available) + except StopIteration: + await asyncio.sleep(0.1) + else: + return worker.reserve() + + async def run(self): + """ + Run the workers in the pool. + """ + await run_tasks([w.run() for w in self._workers]) diff --git a/easykube/runtime.py b/easykube/runtime.py new file mode 100644 index 0000000..142b8f9 --- /dev/null +++ b/easykube/runtime.py @@ -0,0 +1,9 @@ +from .kubernetes.runtime import ( + Controller, + Manager, + ReconcileFunc, + Request, + Result, + Worker, + WorkerPool +)