Skip to content

Commit

Permalink
Merge pull request #220 from lsst-sqre/tickets/DM-41708a
Browse files Browse the repository at this point in the history
DM-41708: Add more mocks for Kubernetes PVCs
  • Loading branch information
rra authored Nov 16, 2023
2 parents d8c5996 + 67f13dd commit b12fd22
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 5 deletions.
3 changes: 3 additions & 0 deletions changelog.d/20231114_171851_rra_DM_41708a.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Add delete, list, and watch support for persistent volume claims to the Kubernetes mock.
149 changes: 144 additions & 5 deletions src/safir/testing/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
V1ObjectMeta,
V1ObjectReference,
V1PersistentVolumeClaim,
V1PersistentVolumeClaimList,
V1Pod,
V1PodList,
V1PodStatus,
Expand Down Expand Up @@ -689,12 +690,13 @@ async def list_namespaced_custom_object(
version
API version for this custom object.
namespace
Namespace of ingresss to list.
Namespace of custom object to list.
plural
API plural for this custom object.
field_selector
Only ``metadata.name=...`` is supported. It is parsed to find the
ingress name and only ingresss matching that name will be returned.
custom object name and only custom objects matching that name will
be returned.
label_selector
Which objects to retrieve. All labels must match.
resource_version
Expand Down Expand Up @@ -1167,10 +1169,11 @@ async def list_namespaced_ingress(
Parameters
----------
namespace
Namespace of ingresss to list.
Namespace of ingresses to list.
field_selector
Only ``metadata.name=...`` is supported. It is parsed to find the
ingress name and only ingresss matching that name will be returned.
ingress name and only ingresses matching that name will be
returned.
label_selector
Which objects to retrieve. All labels must match.
resource_version
Expand All @@ -1189,7 +1192,7 @@ async def list_namespaced_ingress(
Returns
-------
kubernetes_asyncio.client.V1IngressList or unittest.mock.Mock
List of ingresss in that namespace, when not called as a watch. If
List of ingresses in that namespace, when not called as a watch. If
called as a watch, returns a mock ``aiohttp.Response`` with a
``readline`` metehod that yields the events.
Expand Down Expand Up @@ -1850,9 +1853,145 @@ async def create_namespaced_persistent_volume_claim(
"create_namespaced_persistent_volume_claim", namespace, body
)
self._update_metadata(body, "v1", "PersistentVolumeClaim", namespace)
stream = self._event_streams[namespace]["PersistentVolumeClaim"]
body.metadata.resource_version = stream.next_resource_version
self._store_object(
namespace, "PersistentVolumeClaim", body.metadata.name, body
)
stream.add_event("ADDED", body)

async def delete_namespaced_persistent_volume_claim(
self,
name: str,
namespace: str,
*,
grace_period_seconds: int | None = None,
propagation_policy: str = "Foreground",
body: V1DeleteOptions | None = None,
_request_timeout: float | None = None,
) -> V1Status:
"""Delete a persistent volume claim object.
Parameters
----------
name
Name of persistent volume claim to delete.
namespace
Namespace of persistent volume claim to delete.
grace_period_seconds
Grace period for object deletion (currently ignored).
propagation_policy
Propagation policy for deletion. Has no effect on the mock.
body
Delete options (currently ignored).
_request_timeout
Ignored, accepted for compatibility with the Kubernetes API.
Returns
-------
kubernetes_asyncio.client.V1Status
Success status.
Raises
------
kubernetes_asyncio.client.ApiException
Raised with 404 status if the ingress was not found.
"""
self._maybe_error(
"delete_namespaced_persistent_volume_claim", name, namespace
)
pvc = self._get_object(namespace, "PersistentVolumeClaim", name)
stream = self._event_streams[namespace]["PersistentVolumeClaim"]
stream.add_event("DELETED", pvc)
return self._delete_object(
namespace, "PersistentVolumeClaim", name, propagation_policy
)

async def list_namespaced_persistent_volume_claim(
self,
namespace: str,
*,
field_selector: str | None = None,
label_selector: str | None = None,
resource_version: str | None = None,
timeout_seconds: int | None = None,
watch: bool = False,
_preload_content: bool = True,
_request_timeout: float | None = None,
) -> V1IngressList | Mock:
"""List persistent volume claim objects in a namespace.
This does support watches.
Parameters
----------
namespace
Namespace of persistent volume claim to list.
field_selector
Only ``metadata.name=...`` is supported. It is parsed to find the
persistent volume claim name and only persistent volume claims
matching that name will be returned.
label_selector
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
timeout_seconds
How long to return events for before exiting when performing a
watch.
watch
Whether to act as a watch.
_preload_content
Verified to be `False` when performing a watch.
_request_timeout
Ignored, accepted for compatibility with the Kubernetes API.
Returns
-------
kubernetes_asyncio.client.V1PersistentVolumeClaimList
List of persistent volume claims in that namespace, when not
called as a watch. If called as a watch, returns a mock
``aiohttp.Response`` with a ``readline`` metehod that yields the
events.
Raises
------
AssertionError
Some other ``field_selector`` was provided.
kubernetes_asyncio.client.ApiException
Raised with 404 status if the namespace does not exist.
"""
self._maybe_error(
"list_namespaced_persistent_volume_claim",
namespace,
field_selector,
)
if namespace not in self._objects:
msg = f"Namespace {namespace} not found"
raise ApiException(status=404, reason=msg)
if not watch:
pvcs = self._list_objects(
namespace,
"PersistentVolumeClaim",
field_selector,
label_selector,
)
return V1PersistentVolumeClaimList(
kind="PersistentVolumeClaim", items=pvcs
)

# All watches must not preload content since we're returning raw JSON.
# This is done by the Kubernetes API Watch object.
assert not _preload_content

# Return the mock response expected by the Kubernetes API.
stream = self._event_streams[namespace]["PersistentVolumeClaim"]
return stream.build_watch_response(
resource_version,
timeout_seconds,
field_selector=field_selector,
label_selector=label_selector,
)

async def read_namespaced_persistent_volume_claim(
self,
Expand Down

0 comments on commit b12fd22

Please sign in to comment.