Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime module for building controllers #11

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion easykube/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from .kubernetes import (
PRESENT,
ABSENT,
Api,
ApiError,
AsyncClient,
Configuration,
DeletePropagationPolicy,
LabelSelector,
ListResponseIterator,
resources,
Resource,
ResourceSpec,
SyncClient,
resources
WatchEvents
)
8 changes: 7 additions & 1 deletion easykube/kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from .client import (
PRESENT,
ABSENT,
Api,
ApiError,
AsyncClient,
DeletePropagationPolicy,
LabelSelector,
ListResponseIterator,
Resource,
ResourceSpec,
SyncClient
SyncClient,
WatchEvents
)
from .config import Configuration
from . import resources
2 changes: 1 addition & 1 deletion easykube/kubernetes/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
from .client import AsyncClient, SyncClient
from .errors import ApiError
from .iterators import ListResponseIterator, WatchEvents
from .resource import PRESENT, ABSENT, DeletePropagationPolicy, Resource
from .resource import PRESENT, ABSENT, DeletePropagationPolicy, LabelSelector, Resource
from .spec import ResourceSpec
14 changes: 10 additions & 4 deletions easykube/kubernetes/client/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@
from .iterators import ListResponseIterator, WatchEvents


#: Sentinel object indicating that the presence of a label is required with any value
PRESENT = object()
#: Sentinel object indicating that a label must not be present
ABSENT = object()
class LabelSelector(str, enum.Enum):
#: Indicates that the presence of a label is required
PRESENT = "present"
#: Indicates that a label must not be present
ABSENT = "absent"


# Make these instances available at the top-level for backwards compatibility
PRESENT = LabelSelector.PRESENT
ABSENT = LabelSelector.ABSENT


class DeletePropagationPolicy(str, enum.Enum):
Expand Down
4 changes: 4 additions & 0 deletions easykube/kubernetes/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .controller import Controller
from .manager import Manager
from .reconcile import ReconcileFunc, Request, Result
from .worker_pool import Worker, WorkerPool
201 changes: 201 additions & 0 deletions easykube/kubernetes/runtime/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import asyncio
import logging
import random
import typing as t

from ..client import AsyncClient, LabelSelector

from .queue import Queue
from .reconcile import ReconcileFunc, Request, Result
from .util import run_tasks
from .watch import Watch
from .worker_pool import WorkerPool


logger = logging.getLogger(__name__)


LabelValue = t.Union[LabelSelector, t.List[str], str]


class Controller:
"""
Class for a controller that watches a resource and its related resources and calls
a reconciler whenever an object needs to be reconciled.
"""
def __init__(
self,
api_version: str,
kind: str,
reconcile_func: ReconcileFunc,
*,
labels: t.Optional[t.Dict[str, LabelValue]] = None,
namespace: t.Optional[str] = None,
worker_count: int = 10,
worker_pool: t.Optional[WorkerPool] = None,
requeue_max_backoff: int = 120
):
self._api_version = api_version
self._kind = kind
self._namespace = namespace
self._worker_pool = worker_pool or WorkerPool(worker_count)
self._requeue_max_backoff = requeue_max_backoff
self._reconcile_func = reconcile_func
self._watches: t.List[Watch] = [
Watch(
api_version,
kind,
lambda obj: [
Request(
obj["metadata"]["name"],
obj["metadata"].get("namespace")
),
],
labels = labels,
namespace = namespace
),
]

def owns(
self,
api_version: str,
kind: str,
*,
controller_only: bool = True
) -> 'Controller':
"""
Specifies child objects that the controller objects owns and that should trigger
reconciliation of the parent object.
"""
self._watches.append(
Watch(
api_version,
kind,
lambda obj: [
Request(ref["name"], obj["metadata"].get("namespace"))
for ref in obj["metadata"].get("ownerReferences", [])
if (
ref["apiVersion"] == self._api_version and
ref["kind"] == self._kind and
(not controller_only or ref.get("controller", False))
)
],
namespace = self._namespace
)
)
return self

def watches(
self,
api_version: str,
kind: str,
mapper: t.Callable[[t.Dict[str, t.Any]], t.Iterable[Request]],
*,
labels: t.Optional[t.Dict[str, LabelValue]] = None,
namespace: t.Optional[str] = None
) -> 'Controller':
"""
Watches the specified resource and uses the given mapper function to produce
reconciliation requests for the controller resource.
"""
self._watches.append(
Watch(
api_version,
kind,
mapper,
labels = labels,
namespace = namespace or self._namespace
)
)
return self

def _request_logger(self, request: Request, worker_id: int):
"""
Returns a logger for the given request.
"""
return logging.LoggerAdapter(
logger,
{
"api_version": self._api_version,
"kind": self._kind,
"instance": request.key,
"request_id": request.id,
"worker_id": worker_id,
}
)

async def _handle_request(
self,
client: AsyncClient,
queue: Queue,
worker_id: int,
request: Request,
attempt: int
):
"""
Start a worker that processes reconcile requests.
"""
# Get a logger that populates parameters for the request
logger = self._request_logger(request, worker_id)
logger.info("Handling reconcile request (attempt %d)", attempt + 1)
# Try to reconcile the request
try:
result = await self._reconcile_func(client, request)
except asyncio.CancelledError:
# Propagate cancellations with no further action
raise
except Exception:
logger.exception("Error handling reconcile request")
result = Result(True)
else:
# If the result is None, use the default result
result = result or Result()
# Work out whether we need to requeue or whether we are done
if result.requeue:
if result.requeue_after:
delay = result.requeue_after
# If a specific delay is requested, reset the attempts
attempt = -1
else:
delay = min(2**attempt, self._requeue_max_backoff)
# Add some jitter to the requeue
delay = delay + random.uniform(0, 1)
logger.info("Requeuing request after %.3fs", delay)
queue.requeue(request, attempt + 1, delay)
else:
logger.info("Successfully handled reconcile request")
# Mark the processing for the request as complete
queue.processing_complete(request)

async def _dispatch(self, client: AsyncClient, queue: Queue):
"""
Pulls requests from the queue and dispatches them to the worker pool.
"""
while True:
# Spin until there is a request in the queue that is eligible to be dequeued
while not queue.has_eligible_request():
await asyncio.sleep(0.1)
# Once we know there is an eligible request, wait to reserve a worker
worker = await self._worker_pool.reserve()
# Once we know we have a worker reserved, pull the request from the queue
# and give the task to the worker to process asynchronously
request, attempt = await queue.dequeue()
worker.set_task(self._handle_request, client, queue, worker.id, request, attempt)

async def run(self, client: AsyncClient):
"""
Run the controller using the given client.
"""
# The queue is used to spread work between the workers
queue = Queue()
# Run the tasks that make up the controller
await run_tasks(
[
# Tasks to push requests onto the queue
asyncio.create_task(watch.run(client, queue))
for watch in self._watches
] + [
# Task to dispatch requests to the worker pool
asyncio.create_task(self._dispatch(client, queue)),
]
)
78 changes: 78 additions & 0 deletions easykube/kubernetes/runtime/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
import typing as t

from ..client import AsyncClient, LabelSelector

from .controller import Controller, ReconcileFunc
from .util import run_tasks
from .worker_pool import WorkerPool


LabelValue = t.Union[LabelSelector, t.List[str], str]


class Manager:
"""
Manages the execution of multiple controllers with shared resources.
"""
def __init__(
self,
*,
namespace: t.Optional[str] = None,
worker_count: int = 10,
worker_pool: t.Optional[WorkerPool] = None,
requeue_max_backoff: int = 120
):
self._namespace = namespace
self._worker_pool = worker_pool or WorkerPool(worker_count)
self._requeue_max_backoff = requeue_max_backoff
self._controllers: t.List[Controller] = []

def register_controller(self, controller: Controller) -> 'Manager':
"""
Register the given controller with this manager.
"""
self._controllers.append(controller)
return self

def create_controller(
self,
api_version: str,
kind: str,
reconcile_func: ReconcileFunc,
*,
labels: t.Optional[t.Dict[str, LabelValue]] = None,
namespace: t.Optional[str] = None,
worker_pool: t.Optional[WorkerPool] = None,
requeue_max_backoff: t.Optional[int] = None
) -> Controller:
"""
Creates a new controller that is registered with this manager.
"""
controller = Controller(
api_version,
kind,
reconcile_func,
labels = labels,
namespace = namespace or self._namespace,
worker_pool = worker_pool or self._worker_pool,
requeue_max_backoff = requeue_max_backoff or self._requeue_max_backoff
)
self.register_controller(controller)
return controller

async def run(self, client: AsyncClient):
"""
Run all the controllers registered with the manager using the given client.
"""
assert len(self._controllers) > 0, "no controllers registered"
await run_tasks(
[
# Run a task for each controller and one for the worker pool
asyncio.create_task(controller.run(client))
for controller in self._controllers
] + [
# Run the worker pool
asyncio.create_task(self._worker_pool.run()),
]
)
Loading