Skip to content

Commit

Permalink
[DPE-5591] refactor: Rework status handling (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
Batalex authored Nov 13, 2024
1 parent 808c702 commit fe4b61c
Show file tree
Hide file tree
Showing 22 changed files with 2,222 additions and 1,787 deletions.
133 changes: 68 additions & 65 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ package-mode = false

[tool.poetry.dependencies]
python = ">=3.8,<4.0"
ops = ">=2.4.1"
ops = ">=2.17.0"
kazoo = ">=2.8.0"

# The cosl dep could be removed from here once PYDEPS is released:
Expand Down Expand Up @@ -88,7 +88,7 @@ optional = true
pytest = ">=7.2"
coverage = { extras = ["toml"], version = ">7.0" }
pytest-mock = "^3.11.1"
ops-scenario = "^7.0.0"
ops = { version = ">=2.17.0", extras = ["testing"] }

[tool.poetry.group.integration]
optional = true
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ certifi==2024.8.30 ; python_version >= "3.8" and python_version < "4.0"
cffi==1.17.1 ; python_version >= "3.8" and python_version < "4.0" and platform_python_implementation != "PyPy"
charset-normalizer==3.4.0 ; python_version >= "3.8" and python_version < "4.0"
cosl==0.0.24 ; python_version >= "3.8" and python_version < "4.0"
cryptography==43.0.1 ; python_version >= "3.8" and python_version < "4.0"
cryptography==43.0.3 ; python_version >= "3.8" and python_version < "4.0"
exceptiongroup==1.2.2 ; python_version >= "3.8" and python_version < "3.11"
h11==0.14.0 ; python_version >= "3.8" and python_version < "4.0"
httpcore==1.0.6 ; python_version >= "3.8" and python_version < "4.0"
Expand Down
23 changes: 5 additions & 18 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from charms.operator_libs_linux.v0 import sysctl
from charms.rolling_ops.v0.rollingops import RollingOpsManager, RunWithLock
from ops import (
ActiveStatus,
CollectStatusEvent,
EventBase,
StatusBase,
Expand Down Expand Up @@ -51,6 +50,7 @@ def __init__(self, *args):
super().__init__(*args)
self.name = CHARM_KEY
self.substrate: Substrates = SUBSTRATE
self.pending_inactive_statuses: list[Status] = []

# Common attrs init
self.state = ClusterState(self, substrate=self.substrate)
Expand All @@ -75,6 +75,7 @@ def __init__(self, *args):
self.framework.observe(getattr(self.on, "install"), self._on_install)
self.framework.observe(getattr(self.on, "remove"), self._on_remove)
self.framework.observe(getattr(self.on, "config_changed"), self._on_roles_changed)
self.framework.observe(self.on.collect_unit_status, self._on_collect_status)
self.framework.observe(self.on.collect_app_status, self._on_collect_status)

# peer-cluster events are shared between all roles, so necessary to init here to avoid instantiating multiple times
Expand Down Expand Up @@ -112,7 +113,7 @@ def _set_status(self, key: Status) -> None:
log_level: DebugLevel = key.value.log_level

getattr(logger, log_level.lower())(status.message)
self.unit.status = status
self.pending_inactive_statuses.append(key)

def _on_roles_changed(self, _):
"""Handler for `config_changed` events.
Expand Down Expand Up @@ -169,22 +170,8 @@ def _disable_enable_restart_broker(self, event: RunWithLock) -> None:
return

def _on_collect_status(self, event: CollectStatusEvent):
ready_to_start = self.state.ready_to_start.value.status
event.add_status(ready_to_start)

if not isinstance(ready_to_start, ActiveStatus):
return

if not self.state.runs_broker:
# early return, the next checks only concern the broker
return

if not self.broker.workload.active():
event.add_status(Status.BROKER_NOT_RUNNING.value.status)

if not self.state.kraft_mode:
if not self.state.zookeeper.broker_active():
event.add_status(Status.ZK_NOT_CONNECTED.value.status)
for status in self.pending_inactive_statuses + [Status.ACTIVE]:
event.add_status(status.value.status)


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,23 +603,23 @@ def pod(self) -> Pod:
K8s-only.
"""
return self.k8s.get_pod(pod_name=self.pod_name)
return self.k8s.get_pod(self.pod_name)

@cached_property
def node(self) -> Node:
"""The Node the unit is scheduled on.
K8s-only.
"""
return self.k8s.get_node(pod=self.pod)
return self.k8s.get_node(self.pod_name)

@cached_property
def node_ip(self) -> str:
"""The IPV4/IPV6 IP address the Node the unit is on.
K8s-only.
"""
return self.k8s.get_node_ip(node=self.node)
return self.k8s.get_node_ip(self.pod_name)


class ZooKeeper(RelationState):
Expand Down
35 changes: 19 additions & 16 deletions src/events/balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from ops import (
ActionEvent,
ActiveStatus,
EventBase,
InstallEvent,
Object,
Expand Down Expand Up @@ -61,6 +60,9 @@ def __init__(self, charm) -> None:
config=self.charm.config,
)

# Before fast exit to avoid silently ignoring the action
self.framework.observe(getattr(self.charm.on, "rebalance_action"), self.rebalance)

# Fast exit after workload instantiation, but before any event observer
if BALANCER.value not in self.charm.config.roles or not self.charm.unit.is_leader():
return
Expand All @@ -82,8 +84,6 @@ def __init__(self, charm) -> None:
self.framework.observe(self.charm.on.update_status, self._on_config_changed)
self.framework.observe(self.charm.on.config_changed, self._on_config_changed)

self.framework.observe(getattr(self.charm.on, "rebalance_action"), self.rebalance)

def _on_install(self, event: InstallEvent) -> None:
"""Handler for `install` event."""
if not self.workload.container_can_connect:
Expand All @@ -101,8 +101,9 @@ def _on_install(self, event: InstallEvent) -> None:

def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None:
"""Handler for `start` or `pebble-ready` events."""
self.charm._set_status(self.charm.state.ready_to_start)
if not isinstance(self.charm.unit.status, ActiveStatus):
current_status = self.charm.state.ready_to_start
if current_status is not Status.ACTIVE:
self.charm._set_status(current_status)
event.defer()
return

Expand Down Expand Up @@ -207,33 +208,36 @@ def rebalance(self, event: ActionEvent) -> None:
available_brokers = [int(broker.split("/")[1]) for broker in brokers]

failure_conditions = [
(not self.charm.unit.is_leader(), "Action must be ran on the application leader"),
(
not self.balancer_manager.cruise_control.monitoring,
lambda: not self.charm.unit.is_leader(),
"Action must be ran on the application leader",
),
(
lambda: not self.balancer_manager.cruise_control.monitoring,
"CruiseControl balancer service is not yet ready",
),
(
self.balancer_manager.cruise_control.executing,
lambda: self.balancer_manager.cruise_control.executing,
"CruiseControl balancer service is currently executing a task, please try again later",
),
(
not self.balancer_manager.cruise_control.ready,
lambda: not self.balancer_manager.cruise_control.ready,
"CruiseControl balancer service has not yet collected enough data to provide a partition reallocation proposal",
),
(
event.params["mode"] in (MODE_ADD, MODE_REMOVE)
lambda: event.params["mode"] in (MODE_ADD, MODE_REMOVE)
and event.params.get("brokerid", None) is None,
"'add' and 'remove' rebalance action require passing the 'brokerid' parameter",
),
(
event.params["mode"] in (MODE_ADD, MODE_REMOVE)
lambda: event.params["mode"] in (MODE_ADD, MODE_REMOVE)
and event.params.get("brokerid") not in available_brokers,
"invalid brokerid",
),
]

for check, msg in failure_conditions:
if check:
if check():
logging.error(msg)
event.set_results({"error": msg})
event.fail(msg)
Expand Down Expand Up @@ -261,8 +265,6 @@ def rebalance(self, event: ActionEvent) -> None:

event.set_results(sanitised_response)

self.charm._set_status(Status.ACTIVE)

@property
def healthy(self) -> bool:
"""Checks and updates various charm lifecycle states.
Expand All @@ -274,8 +276,9 @@ def healthy(self) -> bool:
if not self.charm.state.runs_balancer:
return True

self.charm._set_status(self.charm.state.ready_to_start)
if not isinstance(self.charm.unit.status, ActiveStatus):
current_status = self.charm.state.ready_to_start
if current_status is not Status.ACTIVE:
self.charm._set_status(current_status)
return False

if not self.workload.active() and self.charm.unit.is_leader():
Expand Down
15 changes: 7 additions & 8 deletions src/events/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from charms.operator_libs_linux.v1.snap import SnapError
from ops import (
ActiveStatus,
EventBase,
InstallEvent,
Object,
Expand Down Expand Up @@ -172,8 +171,9 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901
self._init_kraft_mode()

# FIXME ready to start probably needs to account for credentials being created beforehand
self.charm._set_status(self.charm.state.ready_to_start)
if not isinstance(self.charm.unit.status, ActiveStatus):
current_status = self.charm.state.ready_to_start
if current_status is not Status.ACTIVE:
self.charm._set_status(current_status)
event.defer()
return

Expand Down Expand Up @@ -223,7 +223,7 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901
self.charm.on.update_status.emit()

# only log once on successful 'on-start' run
if isinstance(self.charm.unit.status, ActiveStatus):
if not self.charm.pending_inactive_statuses:
logger.info(f'Broker {self.charm.unit.name.split("/")[1]} connected')

def _on_config_changed(self, event: EventBase) -> None:
Expand Down Expand Up @@ -339,8 +339,6 @@ def _on_update_status(self, _: UpdateStatusEvent) -> None:
self.charm._set_status(Status.BROKER_NOT_RUNNING)
return

self.charm._set_status(Status.ACTIVE)

def _on_secret_changed(self, event: SecretChangedEvent) -> None:
"""Handler for `secret_changed` events."""
if not event.secret.label or not self.charm.state.cluster.relation:
Expand Down Expand Up @@ -408,8 +406,9 @@ def healthy(self) -> bool:
Returns:
True if service is alive and active. Otherwise False
"""
self.charm._set_status(self.charm.state.ready_to_start)
if not isinstance(self.charm.unit.status, ActiveStatus):
current_status = self.charm.state.ready_to_start
if current_status is not Status.ACTIVE:
self.charm._set_status(current_status)
return False

if not self.workload.active():
Expand Down
2 changes: 0 additions & 2 deletions src/events/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
RelationJoinedEvent,
)
from ops.framework import Object
from ops.model import ActiveStatus

from literals import TLS_RELATION, TRUSTED_CA_RELATION, TRUSTED_CERTIFICATE_RELATION, Status

Expand Down Expand Up @@ -139,7 +138,6 @@ def _trusted_relation_created(self, event: EventBase) -> None:

# Create a "mtls" flag so a new listener (CLIENT_SSL) is created
self.charm.state.cluster.update({"mtls": "enabled"})
self.charm.app.status = ActiveStatus()

def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None:
"""Generate a CSR so the tls-certificates operator works as expected."""
Expand Down
9 changes: 7 additions & 2 deletions src/events/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ def _on_zookeeper_broken(self, _: RelationEvent) -> None:
# Kafka keeps a meta.properties in every log.dir with a unique ClusterID
# this ID is provided by ZK, and removing it on relation-broken allows
# re-joining to another ZK cluster.
for storage in self.charm.model.storages["data"]:
self.charm.workload.exec(["rm", f"{storage.location}/meta.properties"])
self.charm.workload.exec(
[
"bash",
"-c",
f"""find {self.charm.workload.paths.data_path} -type f -name meta.properties -delete || true""",
]
)

if not self.charm.unit.is_leader():
return
Expand Down
Loading

0 comments on commit fe4b61c

Please sign in to comment.