Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40638: Fix serialization of objects in watch events #197

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/20230907_110606_rra_DM_40638b.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Bug fixes

- Kubernetes objects included in events are now serialized properly using the Kubernetes camel-case field names instead of the Python snake-case names. In addition to matching Kubernetes behavior more closely, this allows a watch configured with the Kubernetes model type to deserialize the object in the `object` key of the event dictionary. The type must be passed explicitly to the `Watch` constructor, since kubernetes_asyncio's type autodetection does not work with Safir's mock.
- `safir.testing.kubernetes.patch_kubernetes` no longer mocks the entire `ApiClient` class since it is required for deserialization of objects in Kubernetes events. It instead mocks the `request` method of that class for safety, to prevent any network requests to Kubernetes clusters when Kubernetes is mocked.
1 change: 1 addition & 0 deletions docs/_rst_epilog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
.. _Gafaelfawr: https://gafaelfawr.lsst.io/
.. _Gidgethub: https://gidgethub.readthedocs.io/en/latest/
.. _HTTPX: https://www.python-httpx.org/
.. _kubernetes_asyncio: https://github.com/tomplus/kubernetes_asyncio
.. _mypy: https://www.mypy-lang.org
.. _Phalanx: https://phalanx.lsst.io
.. _pre-commit: https://pre-commit.com
Expand Down
20 changes: 11 additions & 9 deletions docs/user-guide/kubernetes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,16 @@ Limitations of the mock
-----------------------

Only a limited subset of the API is supported, and only the most commonly-used parameters of those APIs are supported.
Expect to need to add additional APIs and parameters, either by subclassing this mock or by contributing them back to Safir, when testing a new application.
Expect to need to add additional APIs and parameters, when testing a new application.
Contributions of those additional APIs and parameters will be gratefully reviewed and normally merged.

Namespaces are only partially modeled.
A namespace can be explicitly created with `~safir.testing.kubernetes.MockKubernetesApi.create_namespace`, in which case the provided ``V1Namespace`` object will be stored and returned by a subsequent `~safir.testing.kubernetes.MockKubernetesApi.read_namespace` or similar call.
However, namespace creation is optional.
If an object is created in a namespace, that namespace will magically come into existence, and a subsequent `~safir.testing.kubernetes.MockKubernetesApi.list_namespace` or `~safir.testing.kubernetes.MockKubernetesApi.read_namespace` call will return a synthetic namespace object.

Most mock APIs do not support watches.
The only exception is `~safir.testing.kubernetes.MockKubernetesApi.list_namespaced_event` (see :ref:`kubernetes-testing-events`).
When creating Kubernetes watches, the caller will have to pass the expected model type explicitly as the first argument to the constructor of the ``Watch`` object in order to ensure correct deserialization of the raw object when using the mock.
Unfortunately, the type autodetection support in kubernetes_asyncio_ does not work with our mock since it relies on docstring inspection.

.. warning::

Expand Down Expand Up @@ -140,11 +141,7 @@ If this is any value other than ``Running``, the pod startup event for the names
Testing events
--------------

Currently, `~safir.testing.kubernetes.MockKubernetesApi.list_namespaced_event` is the only API that supports watches.
Multiple watchers and timeouts are supported.
The ``field_selector`` parameter is accepted, but is currently ignored.

The only event that will be posted automatically by the mock is a pod started event when creating a pod with `~safir.testing.kubernetes.MockKubernetesApi.create_namespaced_pod`, provided that the ``initial_pod_phase`` attribute on the mock is set to its default value of ``Running``.
The only event that will be posted automatically by the mock Kubernetes API is a pod started event when creating a pod with `~safir.testing.kubernetes.MockKubernetesApi.create_namespaced_pod`, provided that the ``initial_pod_phase`` attribute on the mock is set to its default value of ``Running``.
All other events must be injected manually with `~safir.testing.kubernetes.MockKubernetesApi.create_namespaced_event`.

Testing node state
Expand Down Expand Up @@ -181,6 +178,11 @@ Here is an example of how this function could be used in a test:
pod = await mock_kubernetes.read_namespaced_pod("pod", "namespace")
data_path = Path(__name__).parent / "data" / "pod.json"
expected = json.loads(data_path.read_text())
assert strip_none(pod.to_dict()) == expected
assert strip_none(pod.to_dict(serialize=True)) == expected

The data stored in :file:`tests/data/pod.json` can then contain only the interesting elements of the data model (the ones that are not `None`).

.. note::

As in the above example, consider passing ``serialize=True`` whenever calling the ``to_dict`` method on a Kubernetes model.
This tells the Kubernetes library to use the correct Kubernetes camel-case attribute names rather than the Python snake-case attribute names.
39 changes: 26 additions & 13 deletions src/safir/testing/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
V1ServiceList,
V1Status,
)
from typing_extensions import Protocol

from ..asyncio import AsyncMultiQueue

Expand Down Expand Up @@ -158,6 +159,20 @@ def strip_none(model: dict[str, Any]) -> dict[str, Any]:
return result


class _KubernetesModel(Protocol):
"""Protocol describing common features of Kubernetes objects.

The kubernetes_asyncio_ library doesn't provide full typing of its methods
or objects, so use a protocol to describe the functionality that we rely
on when passing a generic object around.
"""

def to_dict(
self, serialize: bool = False # noqa: FBT001, FBT002
) -> dict[str, Any]:
...


class _EventStream:
"""Holds the data for a stream of watchable events.

Expand All @@ -184,14 +199,15 @@ def next_resource_version(self) -> str:
"""
return str(self._queue.qsize() + 1)

def add_event(self, event: dict[str, Any]) -> None:
def add_event(self, action: str, obj: _KubernetesModel) -> None:
"""Add a new event and notify all watchers.

Parameters
----------
event
New event.
"""
event = {"type": action, "object": obj.to_dict(serialize=True)}
self._queue.put(event)

def build_watch_response(
Expand Down Expand Up @@ -801,7 +817,7 @@ async def create_namespaced_event(
self._update_metadata(body, "v1", "Event", namespace)
stream = self._event_streams[namespace]["Event"]
body.metadata.resource_version = stream.next_resource_version
stream.add_event({"type": "ADDED", "object": body.to_dict()})
stream.add_event("ADDED", body)
self._events[namespace].append(body)

async def list_namespaced_event(
Expand Down Expand Up @@ -895,7 +911,7 @@ async def create_namespaced_ingress(
)
self._store_object(namespace, "Ingress", name, body)
stream = self._event_streams[namespace]["Ingress"]
stream.add_event({"type": "ADDED", "object": body.to_dict()})
stream.add_event("ADDED", body)

async def delete_namespaced_ingress(
self,
Expand Down Expand Up @@ -931,7 +947,7 @@ async def delete_namespaced_ingress(
self._maybe_error("delete_namespaced_ingress", name, namespace)
ingress = self._get_object(namespace, "Ingress", name)
stream = self._event_streams[namespace]["Ingress"]
stream.add_event({"type": "DELETED", "object": ingress.to_dict()})
stream.add_event("DELETED", ingress)
return self._delete_object(namespace, "Ingress", name)

async def read_namespaced_ingress(
Expand Down Expand Up @@ -1072,7 +1088,7 @@ async def patch_namespaced_ingress_status(
stream = self._event_streams[namespace]["Ingress"]
ingress.metadata.resource_version = stream.next_resource_version
self._store_object(namespace, "Ingress", name, ingress, replace=True)
stream.add_event({"type": "MODIFIED", "object": ingress.to_dict()})
stream.add_event("MODIFIED", ingress)
return ingress

# JOB API
Expand Down Expand Up @@ -1181,7 +1197,7 @@ async def delete_namespaced_job(

job = self._get_object(namespace, "Job", name)
stream = self._event_streams[namespace]["Job"]
stream.add_event({"type": "DELETED", "object": job.to_dict()})
stream.add_event("DELETED", job)
return self._delete_object(namespace, "Job", name)

async def list_namespaced_job(
Expand Down Expand Up @@ -1481,7 +1497,7 @@ async def create_namespaced_pod(self, namespace: str, body: V1Pod) -> None:
stream = self._event_streams[namespace]["Pod"]
body.metadata.resource_version = stream.next_resource_version
self._store_object(namespace, "Pod", body.metadata.name, body)
stream.add_event({"type": "ADDED", "object": body.to_dict()})
stream.add_event("ADDED", body)
if self.initial_pod_phase == "Running":
event = CoreV1Event(
metadata=V1ObjectMeta(
Expand Down Expand Up @@ -1529,7 +1545,7 @@ async def delete_namespaced_pod(
pod = self._get_object(namespace, "Pod", name)
result = self._delete_object(namespace, "Pod", name)
stream = self._event_streams[namespace]["Pod"]
stream.add_event({"type": "DELETED", "object": pod.to_dict()})
stream.add_event("DELETED", pod)
return result

async def list_namespaced_pod(
Expand Down Expand Up @@ -1643,7 +1659,7 @@ async def patch_namespaced_pod_status(
stream = self._event_streams[namespace]["Pod"]
pod.metadata.resource_version = stream.next_resource_version
self._store_object(namespace, "Pod", name, pod, replace=True)
stream.add_event({"type": "MODIFIED", "object": pod.to_dict()})
stream.add_event("MODIFIED", pod)
return pod

async def read_namespaced_pod(self, name: str, namespace: str) -> V1Pod:
Expand Down Expand Up @@ -2282,10 +2298,7 @@ def mock_kubernetes() -> Iterator[MockKubernetesApi]:
mock_class = patcher.start()
mock_class.return_value = mock_api
patchers.append(patcher)
mock_api_client = Mock(spec=client.ApiClient)
mock_api_client.close = AsyncMock()
with patch.object(client, "ApiClient") as mock_client:
mock_client.return_value = mock_api_client
with patch.object(client.ApiClient, "request"):
os.environ["KUBERNETES_PORT"] = "tcp://10.0.0.1:443"
yield mock_api
del os.environ["KUBERNETES_PORT"]
Expand Down
15 changes: 8 additions & 7 deletions tests/testing/kubernetes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,14 @@ async def test_mock_events(mock_kubernetes: MockKubernetesApi) -> None:
assert results[0] == results[1]
assert results[0] == results[2]
result = results[0]
assert result[1] == some_event.to_dict()
assert result[2] == done_event.to_dict()
assert result[1] == some_event.to_dict(serialize=True)
assert result[2] == done_event.to_dict(serialize=True)

# The first event should have been the pod running event.
assert result[0]["message"] == "Pod foo started"
assert result[0]["involved_object"]["kind"] == "Pod"
assert result[0]["involved_object"]["name"] == "foo"
assert result[0]["involved_object"]["namespace"] == "stuff"
assert result[0]["involvedObject"]["kind"] == "Pod"
assert result[0]["involvedObject"]["name"] == "foo"
assert result[0]["involvedObject"]["namespace"] == "stuff"

# Starting with resource version "1" should skip the first event, since
# the semantics of a watch are to show any events *after* the provided
Expand Down Expand Up @@ -223,10 +223,11 @@ async def watch_pod_events(
"namespace": namespace,
"timeout_seconds": 10, # Just in case, so tests don't hang
}
async with Watch().stream(method, **watch_args) as stream:
async with Watch(V1Pod).stream(method, **watch_args) as stream:
seen = []
async for event in stream:
seen.append(event)
assert isinstance(event["object"], V1Pod)
if event["type"] == "DELETED":
return seen
return seen
Expand Down Expand Up @@ -295,6 +296,6 @@ async def test_pod_status(mock_kubernetes: MockKubernetesApi) -> None:
assert results[0] == results[2]
result = results[0]
assert result[0]["type"] == "MODIFIED"
assert result[0]["object"]["status"]["phase"] == "Running"
assert result[0]["object"].status.phase == "Running"
assert result[1]["type"] == "DELETED"
assert result[1]["object"] == result[0]["object"]