diff --git a/changelog.d/20231114_171851_rra_DM_41708a.md b/changelog.d/20231114_171851_rra_DM_41708a.md new file mode 100644 index 00000000..0d38dbcc --- /dev/null +++ b/changelog.d/20231114_171851_rra_DM_41708a.md @@ -0,0 +1,3 @@ +### New features + +- Add delete, list, and watch support for persistent volume claims to the Kubernetes mock. diff --git a/src/safir/testing/kubernetes.py b/src/safir/testing/kubernetes.py index 9b4e902a..1765996e 100644 --- a/src/safir/testing/kubernetes.py +++ b/src/safir/testing/kubernetes.py @@ -34,6 +34,7 @@ V1ObjectMeta, V1ObjectReference, V1PersistentVolumeClaim, + V1PersistentVolumeClaimList, V1Pod, V1PodList, V1PodStatus, @@ -689,12 +690,13 @@ async def list_namespaced_custom_object( version API version for this custom object. namespace - Namespace of ingresss to list. + Namespace of custom object to list. plural API plural for this custom object. field_selector Only ``metadata.name=...`` is supported. It is parsed to find the - ingress name and only ingresss matching that name will be returned. + custom object name and only custom objects matching that name will + be returned. label_selector Which objects to retrieve. All labels must match. resource_version @@ -1167,10 +1169,11 @@ async def list_namespaced_ingress( Parameters ---------- namespace - Namespace of ingresss to list. + Namespace of ingresses to list. field_selector Only ``metadata.name=...`` is supported. It is parsed to find the - ingress name and only ingresss matching that name will be returned. + ingress name and only ingresses matching that name will be + returned. label_selector Which objects to retrieve. All labels must match. resource_version @@ -1189,7 +1192,7 @@ async def list_namespaced_ingress( Returns ------- kubernetes_asyncio.client.V1IngressList or unittest.mock.Mock - List of ingresss in that namespace, when not called as a watch. If + List of ingresses in that namespace, when not called as a watch. If called as a watch, returns a mock ``aiohttp.Response`` with a ``readline`` metehod that yields the events. @@ -1842,9 +1845,145 @@ async def create_namespaced_persistent_volume_claim( "create_namespaced_persistent_volume_claim", namespace, body ) self._update_metadata(body, "v1", "PersistentVolumeClaim", namespace) + stream = self._event_streams[namespace]["PersistentVolumeClaim"] + body.metadata.resource_version = stream.next_resource_version self._store_object( namespace, "PersistentVolumeClaim", body.metadata.name, body ) + stream.add_event("ADDED", body) + + async def delete_namespaced_persistent_volume_claim( + self, + name: str, + namespace: str, + *, + grace_period_seconds: int | None = None, + propagation_policy: str = "Foreground", + body: V1DeleteOptions | None = None, + _request_timeout: float | None = None, + ) -> V1Status: + """Delete a persistent volume claim object. + + Parameters + ---------- + name + Name of persistent volume claim to delete. + namespace + Namespace of persistent volume claim to delete. + grace_period_seconds + Grace period for object deletion (currently ignored). + propagation_policy + Propagation policy for deletion. Has no effect on the mock. + body + Delete options (currently ignored). + _request_timeout + Ignored, accepted for compatibility with the Kubernetes API. + + Returns + ------- + kubernetes_asyncio.client.V1Status + Success status. + + Raises + ------ + kubernetes_asyncio.client.ApiException + Raised with 404 status if the ingress was not found. + """ + self._maybe_error( + "delete_namespaced_persistent_volume_claim", name, namespace + ) + pvc = self._get_object(namespace, "PersistentVolumeClaim", name) + stream = self._event_streams[namespace]["PersistentVolumeClaim"] + stream.add_event("DELETED", pvc) + return self._delete_object( + namespace, "PersistentVolumeClaim", name, propagation_policy + ) + + async def list_namespaced_persistent_volume_claim( + self, + namespace: str, + *, + field_selector: str | None = None, + label_selector: str | None = None, + resource_version: str | None = None, + timeout_seconds: int | None = None, + watch: bool = False, + _preload_content: bool = True, + _request_timeout: float | None = None, + ) -> V1IngressList | Mock: + """List persistent volume claim objects in a namespace. + + This does support watches. + + Parameters + ---------- + namespace + Namespace of persistent volume claim to list. + field_selector + Only ``metadata.name=...`` is supported. It is parsed to find the + persistent volume claim name and only persistent volume claims + matching that name will be returned. + label_selector + Which objects to retrieve. All labels must match. + resource_version + Where to start in the event stream when performing a watch. If + `None`, starts with the next change. + timeout_seconds + How long to return events for before exiting when performing a + watch. + watch + Whether to act as a watch. + _preload_content + Verified to be `False` when performing a watch. + _request_timeout + Ignored, accepted for compatibility with the Kubernetes API. + + Returns + ------- + kubernetes_asyncio.client.V1PersistentVolumeClaimList + List of persistent volume claims in that namespace, when not + called as a watch. If called as a watch, returns a mock + ``aiohttp.Response`` with a ``readline`` metehod that yields the + events. + + Raises + ------ + AssertionError + Some other ``field_selector`` was provided. + kubernetes_asyncio.client.ApiException + Raised with 404 status if the namespace does not exist. + """ + self._maybe_error( + "list_namespaced_persistent_volume_claim", + namespace, + field_selector, + ) + if namespace not in self._objects: + msg = f"Namespace {namespace} not found" + raise ApiException(status=404, reason=msg) + if not watch: + pvcs = self._list_objects( + namespace, + "PersistentVolumeClaim", + field_selector, + label_selector, + ) + return V1PersistentVolumeClaimList( + kind="PersistentVolumeClaim", items=pvcs + ) + + # All watches must not preload content since we're returning raw JSON. + # This is done by the Kubernetes API Watch object. + assert not _preload_content + + # Return the mock response expected by the Kubernetes API. + stream = self._event_streams[namespace]["PersistentVolumeClaim"] + return stream.build_watch_response( + resource_version, + timeout_seconds, + field_selector=field_selector, + label_selector=label_selector, + ) async def read_namespaced_persistent_volume_claim( self,