Skip to content

Commit

Permalink
Add ignore feature (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkupferer authored Feb 5, 2024
1 parent 0776cfc commit 2e90c1f
Show file tree
Hide file tree
Showing 6 changed files with 602 additions and 16 deletions.
78 changes: 63 additions & 15 deletions operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,18 @@ async def cleanup(logger: kopf.ObjectLogger, **_):
await Poolboy.on_cleanup()


@kopf.on.create(Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims', id='resource_claim_create')
@kopf.on.resume(Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims', id='resource_claim_resume')
@kopf.on.update(Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims', id='resource_claim_update')
@kopf.on.create(
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
id='resource_claim_create', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@kopf.on.resume(
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
id='resource_claim_resume', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@kopf.on.update(
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
id='resource_claim_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
async def resource_claim_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand All @@ -81,7 +90,10 @@ async def resource_claim_event(
await resource_claim.manage(logger=logger)


@kopf.on.delete(Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims')
@kopf.on.delete(
Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
async def resource_claim_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down Expand Up @@ -111,6 +123,7 @@ async def resource_claim_delete(
@kopf.daemon(Poolboy.operator_domain, Poolboy.operator_version, 'resourceclaims',
cancellation_timeout = 1,
initial_delay = Poolboy.manage_handles_interval,
labels = {Poolboy.ignore_label: kopf.ABSENT},
)
async def resource_claim_daemon(
annotations: kopf.Annotations,
Expand All @@ -137,15 +150,29 @@ async def resource_claim_daemon(
)
try:
while not stopped:
await resource_claim.manage(logger=logger)
resource_claim = await resource_claim.refetch()
if not resource_claim:
logger.info(f"{resource_claim} found deleted in daemon")
return
if not resource_claim.ignore:
await resource_claim.manage(logger=logger)
await asyncio.sleep(Poolboy.manage_claims_interval)
except asyncio.CancelledError:
pass


@kopf.on.create(Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles', id='resource_handle_create')
@kopf.on.resume(Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles', id='resource_handle_resume')
@kopf.on.update(Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles', id='resource_handle_update')
@kopf.on.create(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
id='resource_handle_create', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@kopf.on.resume(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
id='resource_handle_resume', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@kopf.on.update(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
id='resource_handle_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
async def resource_handle_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand All @@ -171,7 +198,10 @@ async def resource_handle_event(
await resource_handle.manage(logger=logger)


@kopf.on.delete(Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles')
@kopf.on.delete(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
async def resource_handle_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand All @@ -184,7 +214,7 @@ async def resource_handle_delete(
uid: str,
**_
):
await ResourceHandle.unregister(name, logger=logger)
await ResourceHandle.unregister(name)
resource_handle = ResourceHandle(
annotations = annotations,
labels = labels,
Expand All @@ -201,6 +231,7 @@ async def resource_handle_delete(
@kopf.daemon(Poolboy.operator_domain, Poolboy.operator_version, 'resourcehandles',
cancellation_timeout = 1,
initial_delay = Poolboy.manage_handles_interval,
labels = {Poolboy.ignore_label: kopf.ABSENT},
)
async def resource_handle_daemon(
annotations: kopf.Annotations,
Expand All @@ -227,15 +258,29 @@ async def resource_handle_daemon(
)
try:
while not stopped:
await resource_handle.manage(logger=logger)
resource_handle = await resource_handle.refetch()
if not resource_handle:
logger.info(f"{resource_handle} found deleted in daemon")
return
if not resource_handle.ignore:
await resource_handle.manage(logger=logger)
await asyncio.sleep(Poolboy.manage_handles_interval)
except asyncio.CancelledError:
pass


@kopf.on.create(Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', id='resource_pool_create')
@kopf.on.resume(Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', id='resource_pool_resume')
@kopf.on.update(Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools', id='resource_pool_update')
@kopf.on.create(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
id='resource_pool_create', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@kopf.on.resume(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
id='resource_pool_resume', labels={Poolboy.ignore_label: kopf.ABSENT},
)
@kopf.on.update(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
id='resource_pool_update', labels={Poolboy.ignore_label: kopf.ABSENT},
)
async def resource_pool_event(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand All @@ -261,7 +306,10 @@ async def resource_pool_event(
await resource_pool.manage(logger=logger)


@kopf.on.delete(Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools')
@kopf.on.delete(
Poolboy.operator_domain, Poolboy.operator_version, 'resourcepools',
labels={Poolboy.ignore_label: kopf.ABSENT},
)
async def resource_pool_delete(
annotations: kopf.Annotations,
labels: kopf.Labels,
Expand Down
1 change: 1 addition & 0 deletions operator/poolboy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Poolboy():
operator_version = os.environ.get('OPERATOR_VERSION', 'v1')
operator_api_version = f"{operator_domain}/{operator_version}"
resource_refresh_interval = int(os.environ.get('RESOURCE_REFRESH_INTERVAL', 600))
ignore_label = f"{operator_domain}/ignore"

@classmethod
async def on_cleanup(cls):
Expand Down
18 changes: 18 additions & 0 deletions operator/resourceclaim.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ def have_resource_providers(self) -> bool:
return False
return True

@property
def ignore(self) -> bool:
return Poolboy.ignore_label in self.labels

@property
def is_approved(self) -> bool:
"""Return whether this ResourceClaim has been approved.
Expand Down Expand Up @@ -796,6 +800,20 @@ async def merge_patch_status(self, patch: Mapping) -> None:
)
self.refresh_from_definition(definition)

async def refetch(self) -> Optional[ResourceClaimT]:
try:
definition = await Poolboy.custom_objects_api.get_namespaced_custom_object(
Poolboy.operator_domain, Poolboy.operator_version, self.namespace, 'resourceclaims', self.name
)
self.refresh_from_definition(definition)
return self
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status == 404:
ResourceClaim.unregister(name=self.name, namespace=self.namespace)
return None
else:
raise

async def validate(self,
logger: kopf.ObjectLogger,
resource_handle: Optional[ResourceHandleT]
Expand Down
20 changes: 19 additions & 1 deletion operator/resourcehandle.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ async def register_definition(definition: Mapping) -> ResourceHandleT:
return ResourceHandle.__register_definition(definition)

@staticmethod
async def unregister(name: str, logger: kopf.ObjectLogger) -> Optional[ResourceHandleT]:
async def unregister(name: str) -> Optional[ResourceHandleT]:
async with ResourceHandle.lock:
resource_handle = ResourceHandle.all_instances.pop(name, None)
if resource_handle:
Expand Down Expand Up @@ -549,6 +549,10 @@ def guid(self) -> str:
def has_lifespan_end(self) -> bool:
'end' in self.spec.get('lifespan', {})

@property
def ignore(self) -> bool:
return Poolboy.ignore_label in self.labels

@property
def is_bound(self) -> bool:
return 'resourceClaim' in self.spec
Expand Down Expand Up @@ -1011,3 +1015,17 @@ async def manage(self, logger: kopf.ObjectLogger) -> None:
changes = await poolboy_k8s.create_object(resource_definition)
if changes:
logger.info(f"Created {resource_description} for ResourceHandle {self.name}")

async def refetch(self) -> Optional[ResourceHandleT]:
try:
definition = await Poolboy.custom_objects_api.get_namespaced_custom_object(
Poolboy.operator_domain, Poolboy.operator_version, Poolboy.namespace, 'resourcehandles', self.name
)
self.refresh_from_definition(definition)
return self
except kubernetes_asyncio.client.exceptions.ApiException as e:
if e.status == 404:
ResourceHandle.unregister(name=self.name)
return None
else:
raise
Loading

0 comments on commit 2e90c1f

Please sign in to comment.