Skip to content

Commit

Permalink
Merge pull request #200 from lsst-sqre/tickets/DM-40691
Browse files Browse the repository at this point in the history
DM-40691: Add list_namespaced_custom_object to mock
  • Loading branch information
rra authored Sep 8, 2023
2 parents 0846030 + 355a3ab commit f2b88fe
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 6 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20230908_143247_rra_DM_40691.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Add `list_namespaced_custom_object` with watch support to the Kubernetes mock.
135 changes: 129 additions & 6 deletions src/safir/testing/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,27 @@ def next_resource_version(self) -> str:
"""
return str(self._queue.qsize() + 1)

def add_custom_event(self, action: str, obj: dict[str, Any]) -> None:
"""Add a new event for a custom object and notify all watchers.
Parameters
----------
action
Action of the event.
obj
Custom object triggering the event.
"""
self._queue.put({"type": action, "object": obj})

def add_event(self, action: str, obj: _KubernetesModel) -> None:
"""Add a new event and notify all watchers.
Parameters
----------
event
New event.
action
Action of the event.
obj
Object triggering the event.
"""
event = {"type": action, "object": obj.to_dict(serialize=True)}
self._queue.put(event)
Expand Down Expand Up @@ -358,9 +372,9 @@ class MockKubernetesApi:
desired behavior, sometimes it isn't; we had to pick one and this is the
approach we picked.)
Most APIs do not support watches. The current exceptions are
`list_namespaced_event`, `list_namespaced_ingress`, `list_namespaced_job`,
`list_namespaced_pod`, and `list_namespaced_service`.
Not all methods are implemented for each kind, and not all ``list_*`` APIs
support watches. In general, APIs are only implemented when some other
package needs them.
Attributes
----------
Expand Down Expand Up @@ -496,8 +510,14 @@ async def create_namespaced_custom_object(
assert key == self._custom_kinds[body["kind"]]
else:
self._custom_kinds[body["kind"]] = key
assert namespace == body["metadata"]["namespace"]
if body["metadata"].get("namespace"):
assert namespace == body["metadata"]["namespace"]
else:
body["metadata"]["namespace"] = namespace
stream = self._event_streams[namespace][key]
body["metadata"]["resourceVersion"] = stream.next_resource_version
self._store_object(namespace, key, body["metadata"]["name"], body)
stream.add_custom_event("ADDED", body)

async def delete_namespaced_custom_object(
self,
Expand Down Expand Up @@ -541,6 +561,9 @@ async def delete_namespaced_custom_object(
"""
self._maybe_error("delete_namespaced_custom_object", name, namespace)
key = f"{group}/{version}/{plural}"
obj = self._get_object(namespace, key, name)
stream = self._event_streams[namespace][key]
stream.add_custom_event("DELETED", obj)
return self._delete_object(namespace, key, name)

async def get_namespaced_custom_object(
Expand Down Expand Up @@ -613,6 +636,100 @@ async def list_cluster_custom_object(
results.extend(self._objects[namespace].get(key, {}).values())
return {"items": results}

async def list_namespaced_custom_object(
self,
group: str,
version: str,
namespace: str,
plural: str,
*,
field_selector: str | None = None,
label_selector: str | None = None,
resource_version: str | None = None,
timeout_seconds: int | None = None,
watch: bool = False,
_preload_content: bool = True,
_request_timeout: int | None = None,
) -> V1IngressList | Mock:
"""List custom objects in a namespace.
This does support watches.
Parameters
----------
group
API group for this custom object.
version
API version for this custom object.
namespace
Namespace of ingresss to list.
plural
API plural for this custom object.
field_selector
Only ``metadata.name=...`` is supported. It is parsed to find the
ingress name and only ingresss matching that name will be returned.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
timeout_seconds
How long to return events for before exiting when performing a
watch.
watch
Whether to act as a watch.
_preload_content
Verified to be `False` when performing a watch.
_request_timeout
Ignored, accepted for compatibility with the watch API.
Returns
-------
dict or unittest.mock.Mock
List of custom objects in that namespace as a dict with one key,
``items``, when not called as a watch. If called as a watch,
returns a mock ``aiohttp.Response`` with a ``readline`` metehod
that yields the events.
Raises
------
AssertionError
Some other ``field_selector`` was provided.
kubernetes_asyncio.client.ApiException
Raised with 404 status if the namespace does not exist.
"""
self._maybe_error(
"list_namespaced_custom_object",
group,
version,
namespace,
plural,
field_selector,
)
key = f"{group}/{version}/{plural}"
if namespace not in self._objects:
msg = f"Namespace {namespace} not found"
raise ApiException(status=404, reason=msg)
if not watch:
objs = self._list_objects(
namespace, key, field_selector, label_selector
)
return {"items": objs}

# All watches must not preload content since we're returning raw JSON.
# This is done by the Kubernetes API Watch object.
assert not _preload_content

# Return the mock response expected by the Kubernetes API.
stream = self._event_streams[namespace][key]
return stream.build_watch_response(
resource_version,
timeout_seconds,
field_selector=field_selector,
label_selector=label_selector,
)

async def patch_namespaced_custom_object_status(
self,
group: str,
Expand Down Expand Up @@ -666,7 +783,10 @@ async def patch_namespaced_custom_object_status(
assert change["op"] == "replace"
assert change["path"] == "/status"
obj["status"] = change["value"]
stream = self._event_streams[namespace][key]
obj["metadata"]["resourceVersion"] = stream.next_resource_version
self._store_object(namespace, key, name, obj, replace=True)
stream.add_custom_event("MODIFIED", obj)
return obj

async def replace_namespaced_custom_object(
Expand Down Expand Up @@ -712,7 +832,10 @@ async def replace_namespaced_custom_object(
key = f"{group}/{version}/{plural}"
assert key == self._custom_kinds[body["kind"]]
assert namespace == body["metadata"]["namespace"]
stream = self._event_streams[namespace][key]
body["metadata"]["resourceVersion"] = stream.next_resource_version
self._store_object(namespace, key, name, body, replace=True)
stream.add_custom_event("MODIFIED", body)

# CONFIGMAP API

Expand Down

0 comments on commit f2b88fe

Please sign in to comment.