Skip to content

Commit

Permalink
Add watch support for namespaces
Browse files Browse the repository at this point in the history
Add watch, field selector, and label selector support to
list_namespace in the Kubernetes mock.

read_namespace and list_namespace in the Kubernetes mock now
only return namespace objects that have been explicitly created,
not implicit namespaces created by creating another object without
making a namespace first. This more closely matches the behavior
of Kubernetes while still making it easy to use the mock in a test
environment simulating a pre-existing namespace.

Fix the documentation for label_selector in list_* functions to
reflect that it is applied to the regular list return, not just the
watch.
  • Loading branch information
rra committed Sep 12, 2023
1 parent 40b2f88 commit 5408afb
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 39 deletions.
7 changes: 7 additions & 0 deletions changelog.d/20230912_145508_rra_DM_40691.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### New features

- Add watch, field selector, and label selector support to `list_namespace` in the Kubernetes mock.

### Bug fixes

- `read_namespace` and `list_namespace` in the Kubernetes mock now only return namespace objects that have been explicitly created, not implicit namespaces created by creating another object without making a namespace first. This more closely matches the behavior of Kubernetes while still making it easy to use the mock in a test environment simulating a pre-existing namespace.
164 changes: 125 additions & 39 deletions src/safir/testing/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,12 @@ def build_watch_response(
between events. It matches the corresponding parameter in the
Kubernetes API. If `None`, watch forever.
field_selector
Which events to retrieve when performing a watch. If set, it must
be set to ``metadata.name=...`` to match a specific object name.
Limit the returned events to the objects matching the selector.
If set, it must be set to ``metadata.name=...`` to match a
specific object name.
label_selector
Limit the returned events to the objects matching this label
selector.
Returns
-------
Expand Down Expand Up @@ -294,11 +298,12 @@ def _build_watcher(
between events. It matches the corresponding parameter in the
Kubernetes API. If `None`, watch forever.
field_selector
Which events to retrieve when performing a watch. If set, it must
be set to ``metadata.name=...`` to match a specific object name.
Limit the returned events to the objects matching the selector.
If set, it must be set to ``metadata.name=...`` to match a
specific object name.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
Limit the returned events to the objects matching this label
selector.
Returns
-------
Expand Down Expand Up @@ -359,10 +364,10 @@ class MockKubernetesApi:
This mock does not enforce namespace creation before creating objects in a
namespace. Creating an object in a namespace will implicitly create that
namespace if it doesn't exist. However, it will not store a
``V1Namespace`` object, so to verify that a namespace was properly created
(although not the order of creation), retrieve all the objects in the
namespace with `get_namespace_objects_for_test` and one of them will be
the ``V1Namespace`` object.
``V1Namespace`` object or emit events for `list_namespace` watches. To
verify that a namespace was properly created (although not the order of
creation), check for the namespace object explicitly with `read_namespace`
or `list_namespace`.
Objects stored with ``create_*`` or ``replace_*`` methods are **NOT**
copied. The object provided will be stored, so changing that object will
Expand Down Expand Up @@ -404,7 +409,8 @@ def __init__(self) -> None:
self._nodes = V1NodeList(items=[])
self._objects: dict[str, dict[str, dict[str, Any]]] = {}
self._events: defaultdict[str, list[CoreV1Event]] = defaultdict(list)
self._event_streams: defaultdict[str, dict[str, _EventStream]]
self._namespace_stream = _EventStream()
self._event_streams: defaultdict[str, defaultdict[str, _EventStream]]
self._event_streams = defaultdict(lambda: defaultdict(_EventStream))

def get_all_objects_for_test(self, kind: str) -> list[Any]:
Expand Down Expand Up @@ -669,8 +675,7 @@ async def list_namespaced_custom_object(
Only ``metadata.name=...`` is supported. It is parsed to find the
ingress name and only ingresss matching that name will be returned.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down Expand Up @@ -1098,8 +1103,7 @@ async def list_namespaced_ingress(
Only ``metadata.name=...`` is supported. It is parsed to find the
ingress name and only ingresss matching that name will be returned.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down Expand Up @@ -1349,8 +1353,7 @@ async def list_namespaced_job(
Only ``metadata.name=...`` is supported. It is parsed to find the
job name and only jobs matching that name will be returned.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down Expand Up @@ -1451,6 +1454,7 @@ async def create_namespace(self, body: V1Namespace) -> None:
msg = f"Namespace {name} already exists"
raise ApiException(status=409, reason=msg)
self._store_object(name, "Namespace", name, body)
self._namespace_stream.add_event("ADDED", body)

async def delete_namespace(
self,
Expand Down Expand Up @@ -1480,11 +1484,20 @@ async def delete_namespace(
self._maybe_error("delete_namespace")
if name not in self._objects:
raise ApiException(status=404, reason=f"{name} not found")
try:
body = await self.read_namespace(name)
self._namespace_stream.add_event("DELETED", body)
except ApiException:
pass
del self._objects[name]

async def read_namespace(self, name: str) -> V1Namespace:
"""Return the namespace object for a namespace.
This will only return a namespace object if one was created via
`create_namespace`, not if one was implicitly added by creating some
other object.
Parameters
----------
name
Expand All @@ -1493,38 +1506,112 @@ async def read_namespace(self, name: str) -> V1Namespace:
Returns
-------
kubernetes_asyncio.client.V1Namespace
Corresponding namespace object. If `create_namespace` has
been called, will return the stored object. Otherwise, returns a
synthesized ``V1Namespace`` object if the namespace has been
implicitly created.
Corresponding namespace object. If `create_namespace` has not been
called, but the namespace was added implicitly, an exception will
be raised.
Raises
------
kubernetes_asyncio.client.ApiException
Raised with 404 status if the namespace does not exist.
"""
self._maybe_error("read_namespace", name)
if name not in self._objects:
msg = f"Namespace {name} not found"
raise ApiException(status=404, reason=msg)
try:
return self._get_object(name, "Namespace", name)
except ApiException:
return V1Namespace(metadata=V1ObjectMeta(name=name))
return self._get_object(name, "Namespace", name)

async def list_namespace(self) -> V1NamespaceList:
"""List known namespaces.
async def list_namespace(
self,
*,
field_selector: str | None = None,
label_selector: str | None = None,
resource_version: str | None = None,
timeout_seconds: int | None = None,
watch: bool = False,
_preload_content: bool = True,
_request_timeout: int | None = None,
) -> V1NamespaceList | Mock:
"""List namespaces.
This does support watches. Only namespaces that are explicitly
created with `create_namespace` will be shown, not ones that were
implicitly created by creating some other object.
Parameters
----------
namespace
Namespace of jobs to list.
field_selector
Only ``metadata.name=...`` is supported. It is parsed to find the
namespace name and only the namespace matching that name will be
returned.
label_selector
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
timeout_seconds
How long to return events for before exiting when performing a
watch.
watch
Whether to act as a watch.
_preload_content
Verified to be `False` when performing a watch.
_request_timeout
Ignored, accepted for compatibility with the watch API.
Returns
-------
kubernetes_asyncio.client.V1NamespaceList
All namespaces, whether implicitly created or not. These will be
the actual ``V1Namespace`` objects if one was stored, otherwise
synthesized namespace objects.
kubernetes_asyncio.client.V1JobList or unittest.mock.Mock
List of namespaces, when not called as a watch. If called as a
watch, returns a mock ``aiohttp.Response`` with a ``readline``
metehod that yields the events.
Raises
------
AssertionError
Some other ``field_selector`` was provided.
kubernetes_asyncio.client.ApiException
Raised with 404 status if the namespace does not exist.
"""
self._maybe_error("list_namespace")
namespaces = [await self.read_namespace(n) for n in self._objects]
return V1NamespaceList(items=namespaces)

# We annoyingly have to duplicate a bunch of logic from _list_objects
# because namespaces aren't stored under a single namespace.
if not watch:
if field_selector:
match = re.match(r"metadata\.name=(.*)$", field_selector)
if not match or not match.group(1):
msg = f"Field selector {field_selector} not supported"
raise ValueError(msg)
try:
name = match.group(1)
obj = self._get_object(name, "Namespace", name)
if _check_labels(obj.metadata.labels, label_selector):
return [obj]
else:
return []
except ApiException:
return []
namespaces = []
for name in self._objects:
try:
namespace = await self.read_namespace(name)
except ApiException:
continue
if _check_labels(namespace.metadata.labels, label_selector):
namespaces.append(namespace)
return V1NamespaceList(items=namespaces)

# All watches must not preload content since we're returning raw JSON.
# This is done by the Kubernetes API Watch object.
assert not _preload_content

# Return the mock response expected by the Kubernetes API.
return self._namespace_stream.build_watch_response(
resource_version,
timeout_seconds,
field_selector=field_selector,
label_selector=label_selector,
)

# NETWORKPOLICY API

Expand Down Expand Up @@ -1749,7 +1836,7 @@ async def list_namespaced_pod(
Only ``metadata.name=...`` is supported. It is parsed to find the
pod name and only pods matching that name will be returned.
label_selector
Only pods with labels matching all of these will be returned.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down Expand Up @@ -2136,8 +2223,7 @@ async def list_namespaced_service(
Only ``metadata.name=...`` is supported. It is parsed to find the
service name and only services matching that name will be returned.
label_selector
Which matching objects to retrieve by label. All labels must
match.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down

0 comments on commit 5408afb

Please sign in to comment.