From be262f3dc0f460da6bbb882a9968e82aa2cb09f6 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Fri, 29 Nov 2024 14:47:34 +0400 Subject: [PATCH 01/17] feat: apply snap patch --- lib/charms/operator_libs_linux/v1/snap.py | 11 +++++++++-- src/workload.py | 14 ++++++++++---- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/lib/charms/operator_libs_linux/v1/snap.py b/lib/charms/operator_libs_linux/v1/snap.py index 71cdee39..946e5193 100644 --- a/lib/charms/operator_libs_linux/v1/snap.py +++ b/lib/charms/operator_libs_linux/v1/snap.py @@ -103,6 +103,13 @@ def inner(*args, **kwargs): JSONType = Union[Dict[str, Any], List[Any], str, int, float] +def try_int(x): + try: + return int(x) + except ValueError: + return 777 + + class SnapService: """Data wrapper for snap services.""" @@ -828,7 +835,7 @@ def _load_installed_snaps(self) -> None: name=i["name"], state=SnapState.Latest, channel=i["channel"], - revision=int(i["revision"]), + revision=try_int(i["revision"]), confinement=i["confinement"], apps=i.get("apps", None), ) @@ -846,7 +853,7 @@ def _load_info(self, name) -> Snap: name=info["name"], state=SnapState.Available, channel=info["channel"], - revision=int(info["revision"]), + revision=try_int(info["revision"]), confinement=info["confinement"], apps=None, ) diff --git a/src/workload.py b/src/workload.py index 6622602e..26bf286c 100644 --- a/src/workload.py +++ b/src/workload.py @@ -18,13 +18,15 @@ from literals import ( BALANCER, BROKER, - CHARMED_KAFKA_SNAP_REVISION, GROUP, SNAP_NAME, USER, ) logger = logging.getLogger(__name__) +PATCHED_SNAP = ( + "https://custom-built-snaps.s3.eu-north-1.amazonaws.com/charmed-kafka_3.9.0_amd64.snap" +) class Workload(WorkloadBase): @@ -127,9 +129,13 @@ def install(self) -> bool: True if successfully installed. False otherwise. """ try: - self.kafka.ensure(snap.SnapState.Present, revision=CHARMED_KAFKA_SNAP_REVISION) - self.kafka.connect(plug="removable-media") - self.kafka.hold() + import os + + os.system(f"wget {PATCHED_SNAP}") + os.system("sudo snap install --dangerous charmed-kafka_3.6.1_amd64.snap") + # self.kafka.ensure(snap.SnapState.Present, revision=CHARMED_KAFKA_SNAP_REVISION) + # self.kafka.connect(plug="removable-media") + # self.kafka.hold() return True except snap.SnapError as e: From 98afa3b55f78ed155b8640bc52ab8f628318be6a Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Mon, 2 Dec 2024 13:54:04 +0400 Subject: [PATCH 02/17] skip rack awareness integration test --- tests/integration/test_charm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 924abb9b..8fc31545 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -217,6 +217,7 @@ async def test_logs_write_to_storage(ops_test: OpsTest): ) +@pytest.mark.skip async def test_rack_awareness_integration(ops_test: OpsTest): machine_ids = await ops_test.model.get_machines() await ops_test.model.deploy( From dc11ffb691d74e1dda48fbf884f65655bc4a7c76 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Mon, 2 Dec 2024 15:30:28 +0400 Subject: [PATCH 03/17] fix: snap file name --- src/workload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workload.py b/src/workload.py index 26bf286c..364683e8 100644 --- a/src/workload.py +++ b/src/workload.py @@ -132,7 +132,7 @@ def install(self) -> bool: import os os.system(f"wget {PATCHED_SNAP}") - os.system("sudo snap install --dangerous charmed-kafka_3.6.1_amd64.snap") + os.system("sudo snap install --dangerous charmed-kafka_3.9.0_amd64.snap") # self.kafka.ensure(snap.SnapState.Present, revision=CHARMED_KAFKA_SNAP_REVISION) # self.kafka.connect(plug="removable-media") # self.kafka.hold() From cc5c90ae5740482bf2bfc72d53aadd3d285efa78 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Mon, 2 Dec 2024 16:52:36 +0400 Subject: [PATCH 04/17] fix: bump kafka dependency version --- src/literals.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/literals.py b/src/literals.py index 120127d6..a36b406b 100644 --- a/src/literals.py +++ b/src/literals.py @@ -292,6 +292,6 @@ class Status(Enum): "dependencies": {"zookeeper": ">3.6"}, "name": "kafka", "upgrade_supported": "^3", # zk support removed in 4.0 - "version": "3.6.1", + "version": "3.9.0", }, } From f8540f17c2a2622b20ba8cc4b84fa4bf479d1ea7 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Mon, 2 Dec 2024 17:33:35 +0400 Subject: [PATCH 05/17] fix: test_inter_broker_protocol_version logic --- tests/unit/test_config.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 321330dd..9a252ac0 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -679,7 +679,12 @@ def test_inter_broker_protocol_version(ctx: Context, base_state: State, zk_data) charm = cast(KafkaCharm, manager.charm) # Then - assert "inter.broker.protocol.version=3.6" in charm.broker.config_manager.server_properties + kafka_version: str = DEPENDENCIES.get("kafka_service", {}).get("version", "0.0.0") + major_minor = ".".join(kafka_version.split(".")[:2]) + assert ( + f"inter.broker.protocol.version={major_minor}" + in charm.broker.config_manager.server_properties + ) assert len(DEPENDENCIES["kafka_service"]["version"].split(".")) == 3 From 76b4d003d3bb32aff9fb640b7ba5f3b38b594c20 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Mon, 2 Dec 2024 20:41:05 +0400 Subject: [PATCH 06/17] fix: use internal address instead of 0.0.0.0 for controller listener --- src/managers/config.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/managers/config.py b/src/managers/config.py index ceca9c25..e145421f 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -705,8 +705,13 @@ def server_properties(self) -> list[str]: advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] if self.state.kraft_mode: +<<<<<<< HEAD controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:SASL_PLAINTEXT" controller_listener = f"{CONTROLLER_LISTENER_NAME}://0.0.0.0:{CONTROLLER_PORT}" +======= + controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT" + controller_listener = f"{CONTROLLER_LISTENER_NAME}://{self.state.unit_broker.internal_address}:{CONTROLLER_PORT}" +>>>>>>> 40bf7bf (fix: use internal address instead of 0.0.0.0 for controller listener) # NOTE: Case where the controller is running standalone. Early return with a # smaller subset of config options From 96317969fd4d32a4d770bb488145e7c78787608e Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Thu, 12 Dec 2024 10:35:39 +0400 Subject: [PATCH 07/17] [DPE-5232] feat: scaling operation in KRaft mode --- src/core/cluster.py | 20 ++++++ src/core/models.py | 96 +++++++++++++++++++++++++++ src/events/broker.py | 114 ++++++++++++++++++++++++++++++-- src/events/peer_cluster.py | 3 + src/literals.py | 7 ++ src/managers/config.py | 10 +-- src/workload.py | 41 +++++++++--- tests/integration/helpers.py | 37 ++++++++++- tests/integration/test_kraft.py | 82 ++++++++++++++++++++++- tests/unit/test_kraft.py | 80 ++++++++++++++++++++-- 10 files changed, 461 insertions(+), 29 deletions(-) diff --git a/src/core/cluster.py b/src/core/cluster.py index b132f00e..7c4c07ca 100644 --- a/src/core/cluster.py +++ b/src/core/cluster.py @@ -161,6 +161,9 @@ def peer_cluster_orchestrator(self) -> PeerCluster: { "controller_quorum_uris": self.cluster.controller_quorum_uris, "controller_password": self.cluster.controller_password, + "bootstrap_controller": self.cluster.bootstrap_controller, + "bootstrap_unit_id": self.cluster.bootstrap_unit_id, + "bootstrap_replica_id": self.cluster.bootstrap_replica_id, } ) @@ -183,6 +186,9 @@ def peer_cluster(self) -> PeerCluster: "balancer_uris": self.cluster.balancer_uris, "controller_quorum_uris": self.cluster.controller_quorum_uris, "controller_password": self.cluster.controller_password, + "bootstrap_controller": self.cluster.bootstrap_controller, + "bootstrap_unit_id": self.cluster.bootstrap_unit_id, + "bootstrap_replica_id": self.cluster.bootstrap_replica_id, } ) @@ -226,6 +232,13 @@ def unit_broker(self) -> KafkaBroker: substrate=self.substrate, ) + @property + def kraft_unit_id(self) -> int: + """Returns current unit ID in KRaft Quorum Manager.""" + if self.runs_broker and self.runs_controller: + return KRAFT_NODE_ID_OFFSET + self.unit_broker.unit_id + return self.unit_broker.unit_id + @cached_property def peer_units_data_interfaces(self) -> dict[Unit, DataPeerOtherUnitData]: """The cluster peer relation.""" @@ -455,6 +468,13 @@ def controller_quorum_uris(self) -> str: ) return "" + @property + def bootstrap_controller(self) -> str: + """Returns the controller listener in the format HOST:PORT.""" + if self.runs_controller: + return f"{self.unit_broker.internal_address}:{CONTROLLER_PORT}" + return "" + @property def log_dirs(self) -> str: """Builds the necessary log.dirs based on mounted storage volumes. diff --git a/src/core/models.py b/src/core/models.py index fc4cbd22..6235429f 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -11,6 +11,7 @@ import requests from charms.data_platform_libs.v0.data_interfaces import ( + PROV_SECRET_PREFIX, Data, DataPeerData, DataPeerUnitData, @@ -99,6 +100,9 @@ def __init__( broker_uris: str = "", cluster_uuid: str = "", controller_quorum_uris: str = "", + bootstrap_controller: str = "", + bootstrap_unit_id: str = "", + bootstrap_replica_id: str = "", racks: int = 0, broker_capacities: BrokerCapacities = {}, zk_username: str = "", @@ -115,6 +119,9 @@ def __init__( self._broker_uris = broker_uris self._cluster_uuid = cluster_uuid self._controller_quorum_uris = controller_quorum_uris + self._bootstrap_controller = bootstrap_controller + self._bootstrap_unit_id = bootstrap_unit_id + self._bootstrap_replica_id = bootstrap_replica_id self._racks = racks self._broker_capacities = broker_capacities self._zk_username = zk_username @@ -125,6 +132,18 @@ def __init__( self._balancer_uris = balancer_uris self._controller_password = controller_password + def _fetch_from_secrets(self, group, field): + if not self.relation: + return "" + + if secrets_uri := self.relation.data[self.relation.app].get( + f"{PROV_SECRET_PREFIX}{group}" + ): + if secret := self.data_interface._model.get_secret(id=secrets_uri): + return secret.get_content().get(field, "") + + return "" + @property def roles(self) -> str: """All the roles pass from the related application.""" @@ -145,6 +164,8 @@ def broker_username(self) -> str: if not self.relation or not self.relation.app: return "" + return self._fetch_from_secrets("broker", "broker-username") + return self.data_interface._fetch_relation_data_with_secrets( component=self.relation.app, req_secret_fields=BALANCER.requested_secrets, @@ -161,6 +182,8 @@ def broker_password(self) -> str: if not self.relation or not self.relation.app: return "" + return self._fetch_from_secrets("broker", "broker-password") + return self.data_interface._fetch_relation_data_with_secrets( component=self.relation.app, req_secret_fields=BALANCER.requested_secrets, @@ -232,6 +255,54 @@ def cluster_uuid(self) -> str: or "" ) + @property + def bootstrap_controller(self) -> str: + """Bootstrap controller in KRaft mode.""" + if self._bootstrap_controller: + return self._bootstrap_controller + + if not self.relation or not self.relation.app: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="bootstrap-controller" + ) + or "" + ) + + @property + def bootstrap_unit_id(self) -> str: + """Bootstrap unit ID in KRaft mode.""" + if self._bootstrap_unit_id: + return self._bootstrap_unit_id + + if not self.relation or not self.relation.app: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="bootstrap-unit-id" + ) + or "" + ) + + @property + def bootstrap_replica_id(self) -> str: + """Directory ID of the bootstrap node in KRaft mode.""" + if self._bootstrap_replica_id: + return self._bootstrap_replica_id + + if not self.relation or not self.relation.app: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="bootstrap-replica-id" + ) + or "" + ) + @property def racks(self) -> int: """The number of racks for the brokers.""" @@ -489,6 +560,21 @@ def cluster_uuid(self) -> str: """Cluster uuid used for initializing storages.""" return self.relation_data.get("cluster-uuid", "") + @property + def bootstrap_replica_id(self) -> str: + """Directory ID of the bootstrap controller.""" + return self.relation_data.get("bootstrap-replica-id", "") + + @property + def bootstrap_controller(self) -> str: + """HOST:PORT address of the bootstrap controller.""" + return self.relation_data.get("bootstrap-controller", "") + + @property + def bootstrap_unit_id(self) -> str: + """Unit ID of the bootstrap controller.""" + return self.relation_data.get("bootstrap-unit-id", "") + class KafkaBroker(RelationState): """State collection metadata for a unit.""" @@ -640,6 +726,16 @@ def node_ip(self) -> str: """ return self.k8s.get_node_ip(self.pod_name) + @property + def directory_id(self) -> str: + """Directory ID of the node as saved in `meta.properties`.""" + return self.relation_data.get("directory-id", "") + + @property + def added_to_quorum(self) -> bool: + """Whether or not this node is added to dynamic quorum in KRaft mode.""" + return bool(self.relation_data.get("added-to-quorum", False)) + class ZooKeeper(RelationState): """State collection metadata for a the Zookeeper relation.""" diff --git a/src/events/broker.py b/src/events/broker.py index dcbbdea1..b78aa8bf 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -13,6 +13,7 @@ from ops import ( EventBase, InstallEvent, + LeaderElectedEvent, Object, PebbleReadyEvent, SecretChangedEvent, @@ -22,6 +23,7 @@ StorageEvent, UpdateStatusEvent, ) +from tenacity import retry, stop_after_attempt, wait_fixed from events.actions import ActionEvents from events.oauth import OAuthHandler @@ -116,6 +118,8 @@ def __init__(self, charm) -> None: self.framework.observe(getattr(self.charm.on, "install"), self._on_install) self.framework.observe(getattr(self.charm.on, "start"), self._on_start) + self.framework.observe(getattr(self.charm.on, "leader_elected"), self._leader_elected) + self.framework.observe(getattr(self.charm.on, "remove"), self._on_remove) if self.charm.substrate == "k8s": self.framework.observe(getattr(self.charm.on, "kafka_pebble_ready"), self._on_start) @@ -203,6 +207,7 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 # start kafka service self.workload.start() + self._add_controller() logger.info("Kafka service started") # TODO: Update users. Not sure if this is the best place, as cluster might be still @@ -337,6 +342,7 @@ def _on_update_status(self, _: UpdateStatusEvent) -> None: # NOTE for situations like IP change and late integration with rack-awareness charm. # If properties have changed, the broker will restart. self.charm.on.config_changed.emit() + self._add_controller() try: if self.health and not self.health.machine_configured(): @@ -445,15 +451,12 @@ def _init_kraft_mode(self) -> None: # cluster-uuid is only created on the broker (`cluster-manager` in large deployments) if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker: - uuid = self.workload.run_bin_command( - bin_keyword="storage", bin_args=["random-uuid"] - ).strip() - + uuid = self.workload.generate_uuid() self.charm.state.cluster.update({"cluster-uuid": uuid}) self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) - # Controller is tasked with populating quorum uris and the `controller` user password - if self.charm.state.runs_controller: + # Controller is tasked with populating quorum bootstrap config + if self.charm.state.runs_controller and not self.charm.state.cluster.bootstrap_controller: quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris} self.charm.state.cluster.update(quorum_uris) @@ -469,6 +472,13 @@ def _init_kraft_mode(self) -> None: elif not self.charm.state.peer_cluster.controller_password: # single mode, controller & leader self.charm.state.cluster.update({"controller-password": generated_password}) + + bootstrap_data = { + "bootstrap-controller": self.charm.state.bootstrap_controller, + "bootstrap-unit-id": str(self.charm.state.kraft_unit_id), + "bootstrap-replica-id": self.workload.generate_uuid(), + } + self.charm.state.cluster.update(bootstrap_data) def _format_storages(self) -> None: """Format storages provided relevant keys exist.""" @@ -482,8 +492,96 @@ def _format_storages(self) -> None: self.workload.format_storages( uuid=self.charm.state.peer_cluster.cluster_uuid, internal_user_credentials=credentials, + initial_controllers=f"{self.charm.state.peer_cluster.bootstrap_unit_id}@{self.charm.state.peer_cluster.bootstrap_controller}:{self.charm.state.peer_cluster.bootstrap_replica_id}", + ) + + def _leader_elected(self, event: LeaderElectedEvent) -> None: + if self.charm.state.runs_controller and self.charm.state.cluster.bootstrap_controller: + # remove previous leader from dynamic quorum, if the unit is still available, it would eventually re-join during update_status + prev_leader_id = self.charm.state.peer_cluster_orchestrator.bootstrap_unit_id + prev_replica_id = self.charm.state.peer_cluster_orchestrator.bootstrap_replica_id + self._remove_controller( + int(prev_leader_id), + prev_replica_id, + bootstrap_node=self.charm.state.bootstrap_controller, + ) + + updated_bootstrap_data = { + "bootstrap-controller": self.charm.state.bootstrap_controller, + "bootstrap-unit-id": str(self.charm.state.kraft_unit_id), + "bootstrap-replica-id": self.charm.state.unit_broker.directory_id, + } + self.charm.state.cluster.update(updated_bootstrap_data) + + if self.charm.state.peer_cluster_orchestrator: + self.charm.state.peer_cluster_orchestrator.update(updated_bootstrap_data) + + # A rolling restart is required to apply the new leader config + self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit() + + @retry( + wait=wait_fixed(15), + stop=stop_after_attempt(4), + reraise=True, + ) + def _add_controller(self) -> None: + """Adds current unit to the dynamic quorum in KRaft mode if this is a follower unit.""" + if ( + self.charm.state.runs_controller + and not self.charm.unit.is_leader() + and not self.charm.state.unit_broker.added_to_quorum + ): + result = self.workload.run_bin_command( + bin_keyword="metadata-quorum", + bin_args=[ + "--bootstrap-controller", + self.charm.state.cluster.bootstrap_controller, + "--command-config", + self.workload.paths.server_properties, + "add-controller", + ], + ) + logger.debug(result) + directory_id = self.workload.get_directory_id(self.charm.state.log_dirs) + self.charm.state.unit_broker.update( + {"directory-id": directory_id, "added-to-quorum": "true"} + ) + + @retry( + wait=wait_fixed(15), + stop=stop_after_attempt(4), + reraise=True, + ) + def _remove_controller( + self, controller_id: int, controller_directory_id: str, bootstrap_node: str | None = None + ): + if not bootstrap_node: + bootstrap_node = self.charm.state.cluster.bootstrap_controller + + self.workload.run_bin_command( + bin_keyword="metadata-quorum", + bin_args=[ + "--bootstrap-controller", + bootstrap_node, + "remove-controller", + "--controller-id", + str(controller_id), + "--controller-directory-id", + controller_directory_id, + ], ) + def remove_follower_unit(self) -> None: + """Removes current unit from the dynamic quorum in KRaft mode if this is a follower unit.""" + if ( + self.charm.state.runs_controller + and not self.charm.unit.is_leader() + and self.charm.state.unit_broker.added_to_quorum + ): + self._remove_controller( + self.charm.state.kraft_unit_id, self.charm.state.unit_broker.directory_id + ) + def update_external_services(self) -> None: """Attempts to update any external Kubernetes services.""" if not self.charm.substrate == "k8s": @@ -545,3 +643,7 @@ def update_peer_cluster_data(self) -> None: ) # self.charm.on.config_changed.emit() # ensure both broker+balancer get a changed event + + def _on_remove(self, _) -> None: + """Handler for stop.""" + self.remove_follower_unit() diff --git a/src/events/peer_cluster.py b/src/events/peer_cluster.py index 5be64bdd..6059f05d 100644 --- a/src/events/peer_cluster.py +++ b/src/events/peer_cluster.py @@ -115,6 +115,9 @@ def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None: "balancer-uris": self.charm.state.peer_cluster.balancer_uris, "controller-quorum-uris": self.charm.state.peer_cluster.controller_quorum_uris, "controller-password": self.charm.state.peer_cluster.controller_password, + "bootstrap-controller": self.charm.state.peer_cluster.bootstrap_controller, + "bootstrap-unit-id": self.charm.state.peer_cluster.bootstrap_unit_id, + "bootstrap-replica-id": self.charm.state.peer_cluster.bootstrap_replica_id, } ) diff --git a/src/literals.py b/src/literals.py index a36b406b..e4f11e8a 100644 --- a/src/literals.py +++ b/src/literals.py @@ -42,6 +42,7 @@ "__KafkaCruiseControlBrokerMetricSamples", ] MIN_REPLICAS = 3 +KRAFT_VERSION = 1 INTER_BROKER_USER = "sync" @@ -287,6 +288,12 @@ class Status(Enum): ) +class KRaftUnitStatus(Enum): + LEADER = "Leader" + FOLLOWER = "Follower" + OBSERVER = "Observer" + + DEPENDENCIES = { "kafka_service": { "dependencies": {"zookeeper": ">3.6"}, diff --git a/src/managers/config.py b/src/managers/config.py index e145421f..2493fb5c 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -113,7 +113,7 @@ def __init__( self.node_port = node_port @property - def scope(self) -> Scope: + def scope(self) -> str: """Internal scope validator.""" return self._scope @@ -681,7 +681,8 @@ def controller_properties(self) -> list[str]: properties = [ f"process.roles={','.join(roles)}", f"node.id={node_id}", - f"controller.quorum.voters={self.state.peer_cluster.controller_quorum_uris}", + # f"controller.quorum.voters={self.state.peer_cluster.controller_quorum_uris}", + f"controller.quorum.bootstrap.servers={self.state.peer_cluster.bootstrap_controller}", f"controller.listener.names={CONTROLLER_LISTENER_NAME}", *self.controller_scram_properties, ] @@ -705,13 +706,8 @@ def server_properties(self) -> list[str]: advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] if self.state.kraft_mode: -<<<<<<< HEAD - controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:SASL_PLAINTEXT" - controller_listener = f"{CONTROLLER_LISTENER_NAME}://0.0.0.0:{CONTROLLER_PORT}" -======= controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT" controller_listener = f"{CONTROLLER_LISTENER_NAME}://{self.state.unit_broker.internal_address}:{CONTROLLER_PORT}" ->>>>>>> 40bf7bf (fix: use internal address instead of 0.0.0.0 for controller listener) # NOTE: Case where the controller is running standalone. Early return with a # smaller subset of config options diff --git a/src/workload.py b/src/workload.py index 364683e8..dbb5f780 100644 --- a/src/workload.py +++ b/src/workload.py @@ -15,13 +15,7 @@ from typing_extensions import override from core.workload import CharmedKafkaPaths, WorkloadBase -from literals import ( - BALANCER, - BROKER, - GROUP, - SNAP_NAME, - USER, -) +from literals import BALANCER, BROKER, GROUP, KRAFT_VERSION, SNAP_NAME, USER logger = logging.getLogger(__name__) PATCHED_SNAP = ( @@ -132,7 +126,7 @@ def install(self) -> bool: import os os.system(f"wget {PATCHED_SNAP}") - os.system("sudo snap install --dangerous charmed-kafka_3.9.0_amd64.snap") + os.system("sudo snap install --dangerous ./charmed-kafka_3.9.0_amd64.snap") # self.kafka.ensure(snap.SnapState.Present, revision=CHARMED_KAFKA_SNAP_REVISION) # self.kafka.connect(plug="removable-media") # self.kafka.hold() @@ -191,7 +185,11 @@ def run_bin_command( return self.exec(command) def format_storages( - self, uuid: str, internal_user_credentials: dict[str, str] | None = None + self, + uuid: str, + internal_user_credentials: dict[str, str] | None = None, + kraft_version: int = KRAFT_VERSION, + initial_controllers: str | None = None, ) -> None: """Use a passed uuid to format storages.""" # NOTE data dirs have changed permissions by storage_attached hook. For some reason @@ -207,6 +205,17 @@ def format_storages( "-c", self.paths.server_properties, ] + + if kraft_version > 0: + command.append("--feature") + command.append(f"kraft.version={kraft_version}") + + if initial_controllers: + command.append("--initial-controllers") + command.append(initial_controllers) + else: + command.append("--standalone") + if internal_user_credentials: for user, password in internal_user_credentials.items(): command += ["--add-scram", f"'SCRAM-SHA-512=[name={user},password={password}]'"] @@ -216,6 +225,20 @@ def format_storages( self.exec(["chmod", "-R", "750", f"{self.paths.data_path}"]) self.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.paths.data_path}"]) + def generate_uuid(self) -> str: + """Generate UUID using `kafka-storage.sh` utility.""" + uuid = self.run_bin_command(bin_keyword="storage", bin_args=["random-uuid"]).strip() + return uuid + + def get_directory_id(self, log_dirs: str) -> str: + """Read directory.id from meta.properties file in the logs dir.""" + raw = self.read(os.path.join(log_dirs, "meta.properties")) + for line in raw: + if line.startswith("directory.id"): + return line.strip().replace("directory.id=", "") + + return "" + class KafkaWorkload(Workload): """Broker specific wrapper.""" diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index be28d641..e8993d47 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -23,7 +23,14 @@ from tenacity.wait import wait_fixed from core.models import JSON -from literals import BALANCER_WEBSERVER_USER, JMX_CC_PORT, PATHS, PEER, SECURITY_PROTOCOL_PORTS +from literals import ( + BALANCER_WEBSERVER_USER, + JMX_CC_PORT, + PATHS, + PEER, + SECURITY_PROTOCOL_PORTS, + KRaftUnitStatus, +) from managers.auth import Acl, AuthManager METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) @@ -605,3 +612,31 @@ def get_kafka_broker_state(ops_test: OpsTest, app_name: str) -> JSON: def get_replica_count_by_broker_id(ops_test: OpsTest, app_name: str) -> dict[str, Any]: broker_state_json = get_kafka_broker_state(ops_test, app_name) return broker_state_json.get("ReplicaCountByBrokerId", {}) + + +@retry( + wait=wait_fixed(20), + stop=stop_after_attempt(6), + reraise=True, +) +def kraft_quorum_status( + ops_test: OpsTest, unit_name: str, bootstrap_controller: str +) -> dict[int, KRaftUnitStatus]: + """Returns a dict mapping of unit ID to KRaft unit status based on `kafka-metadata-quorum.sh` utility's output.""" + result = check_output( + f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.metadata-quorum --bootstrap-controller {bootstrap_controller} describe --replication'", + stderr=PIPE, + shell=True, + universal_newlines=True, + ) + # parse `kafka-metadata-quorum.sh` output + # NodeId DirectoryId LogEndOffset Lag LastFetchTimestamp LastCaughtUpTimestamp Status + unit_status: dict[int, str] = {} + for line in result.split("\n"): + fields = [c.strip() for c in line.split("\t")] + try: + unit_status[int(fields[0])] = KRaftUnitStatus(fields[6]) + except (ValueError, IndexError): + continue + + return unit_status diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py index ac7438bc..3ca2d1c6 100644 --- a/tests/integration/test_kraft.py +++ b/tests/integration/test_kraft.py @@ -11,12 +11,14 @@ from literals import ( CONTROLLER_PORT, + KRAFT_NODE_ID_OFFSET, PEER_CLUSTER_ORCHESTRATOR_RELATION, PEER_CLUSTER_RELATION, SECURITY_PROTOCOL_PORTS, + KRaftUnitStatus, ) -from .helpers import APP_NAME, check_socket, create_test_topic, get_address +from .helpers import APP_NAME, check_socket, get_address, kraft_quorum_status, create_test_topic logger = logging.getLogger(__name__) @@ -73,7 +75,7 @@ async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm): num_units=1, series="jammy", config={ - "roles": self.controller_app, + "roles": "controller", "profile": "testing", }, trust=True, @@ -134,3 +136,79 @@ async def test_authorizer(self, ops_test: OpsTest): port = SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal await create_test_topic(ops_test, f"{address}:{port}") + + @pytest.mark.abort_on_fail + async def test_scaling(self, ops_test: OpsTest): + await ops_test.model.applications[self.controller_app].add_units(count=2) + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), + status="active", + timeout=1200, + idle_period=20, + ) + + address = await get_address(ops_test=ops_test, app_name=self.controller_app) + bootstrap_controller = f"{address}:{CONTROLLER_PORT}" + + unit_status = kraft_quorum_status( + ops_test, f"{self.controller_app}/0", bootstrap_controller + ) + print(unit_status) + + offset = KRAFT_NODE_ID_OFFSET if self.controller_app == APP_NAME else 0 + + for unit_id, status in unit_status.items(): + if unit_id == offset + 0: + assert status == KRaftUnitStatus.LEADER + elif unit_id < offset + 100: + assert status == KRaftUnitStatus.FOLLOWER + else: + assert status == KRaftUnitStatus.OBSERVER + + @pytest.mark.abort_on_fail + async def test_leader_change(self, ops_test: OpsTest): + await ops_test.model.applications[self.controller_app].destroy_units( + f"{self.controller_app}/0" + ) + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), + status="active", + timeout=600, + idle_period=20, + ) + + # ensure proper cleanup + async with ops_test.fast_forward(fast_interval="20s"): + await asyncio.sleep(120) + + address = await get_address(ops_test=ops_test, app_name=self.controller_app, unit_num=1) + bootstrap_controller = f"{address}:{CONTROLLER_PORT}" + offset = KRAFT_NODE_ID_OFFSET if self.controller_app == APP_NAME else 0 + + unit_status = kraft_quorum_status( + ops_test, f"{self.controller_app}/1", bootstrap_controller + ) + + # assert previous leader is removed + assert (offset + 0) not in unit_status + # assert new leader is elected + assert KRaftUnitStatus.LEADER in unit_status.values() + + # test cluster stability by adding a new controller + await ops_test.model.applications[self.controller_app].add_units(count=1) + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), + status="active", + timeout=1200, + idle_period=20, + ) + + # ensure unit is added to dynamic quorum + async with ops_test.fast_forward(fast_interval="20s"): + await asyncio.sleep(60) + + unit_status = kraft_quorum_status( + ops_test, f"{self.controller_app}/1", bootstrap_controller + ) + assert (offset + 3) in unit_status + assert unit_status[offset + 3] == KRaftUnitStatus.FOLLOWER diff --git a/tests/unit/test_kraft.py b/tests/unit/test_kraft.py index 7c6b6911..24424e5e 100644 --- a/tests/unit/test_kraft.py +++ b/tests/unit/test_kraft.py @@ -82,7 +82,8 @@ def test_ready_to_start_no_peer_cluster(charm_configuration, base_state: State): state_in = dataclasses.replace(base_state, relations=[cluster_peer]) # When - state_out = ctx.run(ctx.on.start(), state_in) + with patch("workload.KafkaWorkload.run_bin_command", return_value="cluster-uuid-number"): + state_out = ctx.run(ctx.on.start(), state_in) # Then assert state_out.unit_status == Status.NO_PEER_CLUSTER_RELATION.value.status @@ -102,7 +103,8 @@ def test_ready_to_start_missing_data_as_controller(charm_configuration, base_sta state_in = dataclasses.replace(base_state, relations=[cluster_peer, peer_cluster]) # When - state_out = ctx.run(ctx.on.start(), state_in) + with patch("workload.KafkaWorkload.run_bin_command", return_value="cluster-uuid-number"): + state_out = ctx.run(ctx.on.start(), state_in) # Then assert state_out.unit_status == Status.NO_BROKER_DATA.value.status @@ -156,11 +158,81 @@ def test_ready_to_start(charm_configuration, base_state: State): state_out = ctx.run(ctx.on.start(), state_in) # Then - # Second call of format will have to pass "cluster-uuid-number" as set above - assert "cluster-uuid-number" in patched_run_bin_command.call_args_list[1][1]["bin_args"] + # Third call of format will have to pass "cluster-uuid-number" as set above + assert patched_run_bin_command.call_count == 3 + assert "cluster-uuid-number" in patched_run_bin_command.call_args_list[2][1]["bin_args"] assert "cluster-uuid" in state_out.get_relations(PEER)[0].local_app_data assert "controller-quorum-uris" in state_out.get_relations(PEER)[0].local_app_data + assert "bootstrap-controller" in state_out.get_relations(PEER)[0].local_app_data + assert "bootstrap-unit-id" in state_out.get_relations(PEER)[0].local_app_data + assert "bootstrap-replica-id" in state_out.get_relations(PEER)[0].local_app_data # Only the internal users should be created. assert "admin-password" in next(iter(state_out.secrets)).latest_content assert "sync-password" in next(iter(state_out.secrets)).latest_content assert state_out.unit_status == ActiveStatus() + + +def test_remove_controller(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "controller" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation( + PEER, PEER, local_unit_data={"added-to-quorum": "true", "directory-id": "random-uuid"} + ) + state_in = dataclasses.replace(base_state, relations=[cluster_peer], leader=False) + + # When + with ( + patch("workload.KafkaWorkload.run_bin_command") as patched_run_bin_command, + patch("charms.operator_libs_linux.v0.sysctl.Config.remove"), + ): + _ = ctx.run(ctx.on.remove(), state_in) + + # Then + patched_run_bin_command.assert_called_once() + assert "random-uuid" in patched_run_bin_command.call_args_list[0][1]["bin_args"] + + +def test_leader_change(charm_configuration, base_state: State): + previous_controller = "10.10.10.10:9097" + charm_configuration["options"]["roles"]["default"] = "controller" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation( + PEER, + PEER, + local_unit_data={"added-to-quorum": "true", "directory-id": "new-uuid"}, + local_app_data={ + "bootstrap-controller": previous_controller, + "bootstrap-replica-id": "old-uuid", + "bootstrap-unit-id": "1", + }, + ) + restart_peer = PeerRelation("restart", "rolling_op") + + state_in = dataclasses.replace(base_state, relations=[cluster_peer, restart_peer]) + + # When + with ( + patch( + "charms.rolling_ops.v0.rollingops.RollingOpsManager._on_run_with_lock", autospec=True + ) + ): + state_out = ctx.run(ctx.on.leader_elected(), state_in) + + # Then + assert state_out.get_relations(PEER)[0].local_app_data["bootstrap-replica-id"] == "new-uuid" + assert state_out.get_relations(PEER)[0].local_app_data["bootstrap-unit-id"] == "0" + assert ( + state_out.get_relations(PEER)[0].local_app_data["bootstrap-controller"] + != previous_controller + ) From cf274b74d04af1fb5ab7c9f02c26412e4b303447 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Fri, 13 Dec 2024 16:08:20 +0400 Subject: [PATCH 08/17] rebase with SCRAM auth changes --- src/events/broker.py | 37 ++++++++++--------- src/managers/config.py | 18 +++++++++- tests/integration/helpers.py | 2 +- tests/integration/test_kraft.py | 64 +++++++++++++++++++++++---------- 4 files changed, 84 insertions(+), 37 deletions(-) diff --git a/src/events/broker.py b/src/events/broker.py index b78aa8bf..f6623595 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -472,7 +472,7 @@ def _init_kraft_mode(self) -> None: elif not self.charm.state.peer_cluster.controller_password: # single mode, controller & leader self.charm.state.cluster.update({"controller-password": generated_password}) - + bootstrap_data = { "bootstrap-controller": self.charm.state.bootstrap_controller, "bootstrap-unit-id": str(self.charm.state.kraft_unit_id), @@ -498,13 +498,13 @@ def _format_storages(self) -> None: def _leader_elected(self, event: LeaderElectedEvent) -> None: if self.charm.state.runs_controller and self.charm.state.cluster.bootstrap_controller: # remove previous leader from dynamic quorum, if the unit is still available, it would eventually re-join during update_status - prev_leader_id = self.charm.state.peer_cluster_orchestrator.bootstrap_unit_id - prev_replica_id = self.charm.state.peer_cluster_orchestrator.bootstrap_replica_id - self._remove_controller( - int(prev_leader_id), - prev_replica_id, - bootstrap_node=self.charm.state.bootstrap_controller, - ) + # prev_leader_id = self.charm.state.peer_cluster_orchestrator.bootstrap_unit_id + # prev_replica_id = self.charm.state.peer_cluster_orchestrator.bootstrap_replica_id + # self._remove_controller( + # int(prev_leader_id), + # prev_replica_id, + # bootstrap_node=self.charm.state.bootstrap_controller, + # ) updated_bootstrap_data = { "bootstrap-controller": self.charm.state.bootstrap_controller, @@ -563,6 +563,8 @@ def _remove_controller( bin_args=[ "--bootstrap-controller", bootstrap_node, + "--command-config", + self.workload.paths.server_properties, "remove-controller", "--controller-id", str(controller_id), @@ -571,16 +573,17 @@ def _remove_controller( ], ) - def remove_follower_unit(self) -> None: - """Removes current unit from the dynamic quorum in KRaft mode if this is a follower unit.""" - if ( - self.charm.state.runs_controller - and not self.charm.unit.is_leader() - and self.charm.state.unit_broker.added_to_quorum + def remove_from_quorum(self) -> None: + """Removes current unit from the dynamic quorum in KRaft mode.""" + if self.charm.state.runs_controller and ( + self.charm.state.unit_broker.added_to_quorum or self.charm.unit.is_leader() ): - self._remove_controller( - self.charm.state.kraft_unit_id, self.charm.state.unit_broker.directory_id + directory_id = ( + self.charm.state.unit_broker.directory_id + if not self.charm.unit.is_leader() + else self.charm.state.cluster.bootstrap_replica_id ) + self._remove_controller(self.charm.state.kraft_unit_id, directory_id) def update_external_services(self) -> None: """Attempts to update any external Kubernetes services.""" @@ -646,4 +649,4 @@ def update_peer_cluster_data(self) -> None: def _on_remove(self, _) -> None: """Handler for stop.""" - self.remove_follower_unit() + self.remove_from_quorum() diff --git a/src/managers/config.py b/src/managers/config.py index 2493fb5c..2c9b3ba2 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -431,6 +431,21 @@ def controller_scram_properties(self) -> list[str]: f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}", ] + @property + def controller_kraft_client_properties(self) -> list[str]: + """Builds the SCRAM properties for controller' KRaft client to be able to communicate with quorum manager. + + Returns: + list of KRaft client properties to be set + """ + password = self.state.peer_cluster.controller_password + + return [ + f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{CONTROLLER_USER}" password="{password}";', + f"sasl.mechanism={self.internal_listener.mechanism}", + "security.protocol=SASL_PLAINTEXT", + ] + @property def oauth_properties(self) -> list[str]: """Builds the properties for the oauth listener. @@ -685,6 +700,7 @@ def controller_properties(self) -> list[str]: f"controller.quorum.bootstrap.servers={self.state.peer_cluster.bootstrap_controller}", f"controller.listener.names={CONTROLLER_LISTENER_NAME}", *self.controller_scram_properties, + *self.controller_kraft_client_properties, ] return properties @@ -706,7 +722,7 @@ def server_properties(self) -> list[str]: advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] if self.state.kraft_mode: - controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT" + controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:SASL_PLAINTEXT" controller_listener = f"{CONTROLLER_LISTENER_NAME}://{self.state.unit_broker.internal_address}:{CONTROLLER_PORT}" # NOTE: Case where the controller is running standalone. Early return with a diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index e8993d47..549216f1 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -624,7 +624,7 @@ def kraft_quorum_status( ) -> dict[int, KRaftUnitStatus]: """Returns a dict mapping of unit ID to KRaft unit status based on `kafka-metadata-quorum.sh` utility's output.""" result = check_output( - f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.metadata-quorum --bootstrap-controller {bootstrap_controller} describe --replication'", + f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.metadata-quorum --command-config {PATHS['kafka']['CONF']}/server.properties --bootstrap-controller {bootstrap_controller} describe --replication'", stderr=PIPE, shell=True, universal_newlines=True, diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py index 3ca2d1c6..ca4f8259 100644 --- a/tests/integration/test_kraft.py +++ b/tests/integration/test_kraft.py @@ -18,7 +18,7 @@ KRaftUnitStatus, ) -from .helpers import APP_NAME, check_socket, get_address, kraft_quorum_status, create_test_topic +from .helpers import APP_NAME, check_socket, create_test_topic, get_address, kraft_quorum_status logger = logging.getLogger(__name__) @@ -33,6 +33,25 @@ class TestKRaft: deployment_strat: str = os.environ.get("DEPLOYMENT", "multi") controller_app: str = {"single": APP_NAME, "multi": CONTROLLER_APP}[deployment_strat] + async def _assert_listeners_accessible(self, ops_test: OpsTest, unit_num=0): + address = await get_address(ops_test=ops_test, app_name=APP_NAME, unit_num=0) + assert check_socket( + address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal + ) # Internal listener + + # Client listener should not be enabled if there is no relations + assert not check_socket( + address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].client + ) + + # Check controller socket + if self.controller_app != APP_NAME: + address = await get_address( + ops_test=ops_test, app_name=self.controller_app, unit_num=unit_num + ) + + assert check_socket(address, CONTROLLER_PORT) + @pytest.mark.abort_on_fail async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm): await ops_test.model.add_machine(series="jammy") @@ -113,21 +132,7 @@ async def test_integrate(self, ops_test: OpsTest): @pytest.mark.abort_on_fail async def test_listeners(self, ops_test: OpsTest): - address = await get_address(ops_test=ops_test) - assert check_socket( - address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal - ) # Internal listener - - # Client listener should not be enabled if there is no relations - assert not check_socket( - address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].client - ) - - # Check controller socket - if self.controller_app != APP_NAME: - address = await get_address(ops_test=ops_test, app_name=self.controller_app) - - assert check_socket(address, CONTROLLER_PORT) + await self._assert_listeners_accessible(ops_test, unit_num=0) @pytest.mark.abort_on_fail async def test_authorizer(self, ops_test: OpsTest): @@ -136,9 +141,9 @@ async def test_authorizer(self, ops_test: OpsTest): port = SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal await create_test_topic(ops_test, f"{address}:{port}") - + @pytest.mark.abort_on_fail - async def test_scaling(self, ops_test: OpsTest): + async def test_scale_out(self, ops_test: OpsTest): await ops_test.model.applications[self.controller_app].add_units(count=2) await ops_test.model.wait_for_idle( apps=list({APP_NAME, self.controller_app}), @@ -212,3 +217,26 @@ async def test_leader_change(self, ops_test: OpsTest): ) assert (offset + 3) in unit_status assert unit_status[offset + 3] == KRaftUnitStatus.FOLLOWER + + @pytest.mark.abort_on_fail + async def test_scale_in(self, ops_test: OpsTest): + await ops_test.model.applications[self.controller_app].destroy_units( + *(f"{self.controller_app}/{unit_id}" for unit_id in (1, 2)) + ) + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), + status="active", + timeout=600, + idle_period=20, + ) + + address = await get_address(ops_test=ops_test, app_name=self.controller_app, unit_num=3) + bootstrap_controller = f"{address}:{CONTROLLER_PORT}" + offset = KRAFT_NODE_ID_OFFSET if self.controller_app == APP_NAME else 0 + + unit_status = kraft_quorum_status( + ops_test, f"{self.controller_app}/3", bootstrap_controller + ) + + assert unit_status[offset + 3] == KRaftUnitStatus.LEADER + await self._assert_listeners_accessible(ops_test, unit_num=3) From 5f8778a0f70a6f18b798305d64d2a75713cbeeb5 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Fri, 13 Dec 2024 16:33:45 +0400 Subject: [PATCH 09/17] add wait to test_scale_in & some tweaks --- src/events/broker.py | 14 +++++--------- tests/integration/helpers.py | 5 ++++- tests/integration/test_kraft.py | 4 +++- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/events/broker.py b/src/events/broker.py index f6623595..c2fbf11a 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -497,14 +497,6 @@ def _format_storages(self) -> None: def _leader_elected(self, event: LeaderElectedEvent) -> None: if self.charm.state.runs_controller and self.charm.state.cluster.bootstrap_controller: - # remove previous leader from dynamic quorum, if the unit is still available, it would eventually re-join during update_status - # prev_leader_id = self.charm.state.peer_cluster_orchestrator.bootstrap_unit_id - # prev_replica_id = self.charm.state.peer_cluster_orchestrator.bootstrap_replica_id - # self._remove_controller( - # int(prev_leader_id), - # prev_replica_id, - # bootstrap_node=self.charm.state.bootstrap_controller, - # ) updated_bootstrap_data = { "bootstrap-controller": self.charm.state.bootstrap_controller, @@ -583,7 +575,11 @@ def remove_from_quorum(self) -> None: if not self.charm.unit.is_leader() else self.charm.state.cluster.bootstrap_replica_id ) - self._remove_controller(self.charm.state.kraft_unit_id, directory_id) + self._remove_controller( + self.charm.state.kraft_unit_id, + directory_id, + bootstrap_node=self.charm.state.bootstrap_controller, + ) def update_external_services(self) -> None: """Attempts to update any external Kubernetes services.""" diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 549216f1..bb5ac530 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -620,7 +620,7 @@ def get_replica_count_by_broker_id(ops_test: OpsTest, app_name: str) -> dict[str reraise=True, ) def kraft_quorum_status( - ops_test: OpsTest, unit_name: str, bootstrap_controller: str + ops_test: OpsTest, unit_name: str, bootstrap_controller: str, verbose: bool = True ) -> dict[int, KRaftUnitStatus]: """Returns a dict mapping of unit ID to KRaft unit status based on `kafka-metadata-quorum.sh` utility's output.""" result = check_output( @@ -639,4 +639,7 @@ def kraft_quorum_status( except (ValueError, IndexError): continue + if verbose: + print(unit_status) + return unit_status diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py index ca4f8259..e3ad166f 100644 --- a/tests/integration/test_kraft.py +++ b/tests/integration/test_kraft.py @@ -158,7 +158,6 @@ async def test_scale_out(self, ops_test: OpsTest): unit_status = kraft_quorum_status( ops_test, f"{self.controller_app}/0", bootstrap_controller ) - print(unit_status) offset = KRAFT_NODE_ID_OFFSET if self.controller_app == APP_NAME else 0 @@ -230,6 +229,9 @@ async def test_scale_in(self, ops_test: OpsTest): idle_period=20, ) + async with ops_test.fast_forward(fast_interval="20s"): + await asyncio.sleep(60) + address = await get_address(ops_test=ops_test, app_name=self.controller_app, unit_num=3) bootstrap_controller = f"{address}:{CONTROLLER_PORT}" offset = KRAFT_NODE_ID_OFFSET if self.controller_app == APP_NAME else 0 From 275828b2ffe787cc3e5261e8d33a8c831346f6d3 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Fri, 13 Dec 2024 19:47:13 +0400 Subject: [PATCH 10/17] fix: race condition issue --- src/events/broker.py | 49 ++++++++++++++++++++++++---------------- tests/unit/test_kraft.py | 12 +++++----- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/src/events/broker.py b/src/events/broker.py index c2fbf11a..47e4adbd 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -16,6 +16,7 @@ LeaderElectedEvent, Object, PebbleReadyEvent, + RelationDepartedEvent, SecretChangedEvent, StartEvent, StorageAttachedEvent, @@ -119,7 +120,6 @@ def __init__(self, charm) -> None: self.framework.observe(getattr(self.charm.on, "install"), self._on_install) self.framework.observe(getattr(self.charm.on, "start"), self._on_start) self.framework.observe(getattr(self.charm.on, "leader_elected"), self._leader_elected) - self.framework.observe(getattr(self.charm.on, "remove"), self._on_remove) if self.charm.substrate == "k8s": self.framework.observe(getattr(self.charm.on, "kafka_pebble_ready"), self._on_start) @@ -129,6 +129,9 @@ def __init__(self, charm) -> None: self.framework.observe(getattr(self.charm.on, "secret_changed"), self._on_secret_changed) self.framework.observe(self.charm.on[PEER].relation_changed, self._on_config_changed) + self.framework.observe( + self.charm.on[PEER].relation_departed, self._on_peer_relation_departed + ) self.framework.observe( getattr(self.charm.on, "data_storage_attached"), self._on_storage_attached @@ -540,8 +543,8 @@ def _add_controller(self) -> None: ) @retry( - wait=wait_fixed(15), - stop=stop_after_attempt(4), + wait=wait_fixed(10), + stop=stop_after_attempt(3), reraise=True, ) def _remove_controller( @@ -550,20 +553,26 @@ def _remove_controller( if not bootstrap_node: bootstrap_node = self.charm.state.cluster.bootstrap_controller - self.workload.run_bin_command( - bin_keyword="metadata-quorum", - bin_args=[ - "--bootstrap-controller", - bootstrap_node, - "--command-config", - self.workload.paths.server_properties, - "remove-controller", - "--controller-id", - str(controller_id), - "--controller-directory-id", - controller_directory_id, - ], - ) + try: + self.workload.run_bin_command( + bin_keyword="metadata-quorum", + bin_args=[ + "--bootstrap-controller", + bootstrap_node, + "--command-config", + self.workload.paths.server_properties, + "remove-controller", + "--controller-id", + str(controller_id), + "--controller-directory-id", + controller_directory_id, + ], + ) + except Exception as e: + if "VoterNotFoundException" in getattr(e, "stderr"): + # successful + return + raise e def remove_from_quorum(self) -> None: """Removes current unit from the dynamic quorum in KRaft mode.""" @@ -643,6 +652,6 @@ def update_peer_cluster_data(self) -> None: # self.charm.on.config_changed.emit() # ensure both broker+balancer get a changed event - def _on_remove(self, _) -> None: - """Handler for stop.""" - self.remove_from_quorum() + def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: + if event.departing_unit == self.charm.unit: + self.remove_from_quorum() diff --git a/tests/unit/test_kraft.py b/tests/unit/test_kraft.py index 24424e5e..04407167 100644 --- a/tests/unit/test_kraft.py +++ b/tests/unit/test_kraft.py @@ -182,16 +182,16 @@ def test_remove_controller(charm_configuration, base_state: State): actions=ACTIONS, ) cluster_peer = PeerRelation( - PEER, PEER, local_unit_data={"added-to-quorum": "true", "directory-id": "random-uuid"} + PEER, + PEER, + local_unit_data={"added-to-quorum": "true", "directory-id": "random-uuid"}, + peers_data={1: {"added-to-quorum": "true", "directory-id": "other-uuid"}}, ) state_in = dataclasses.replace(base_state, relations=[cluster_peer], leader=False) # When - with ( - patch("workload.KafkaWorkload.run_bin_command") as patched_run_bin_command, - patch("charms.operator_libs_linux.v0.sysctl.Config.remove"), - ): - _ = ctx.run(ctx.on.remove(), state_in) + with (patch("workload.KafkaWorkload.run_bin_command") as patched_run_bin_command,): + _ = ctx.run(ctx.on.relation_departed(cluster_peer, remote_unit=0), state_in) # Then patched_run_bin_command.assert_called_once() From 0f35c71b7ee7f7682facc7d0843c2a572a812c18 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Fri, 13 Dec 2024 20:30:05 +0400 Subject: [PATCH 11/17] improve test_kraft --- src/events/broker.py | 3 ++- tests/integration/test_kraft.py | 27 +++++++++++++++++---------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/events/broker.py b/src/events/broker.py index 47e4adbd..6c64580d 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -569,7 +569,8 @@ def _remove_controller( ], ) except Exception as e: - if "VoterNotFoundException" in getattr(e, "stderr"): + error_details = getattr(e, "stderr") + if "VoterNotFoundException" in error_details or "TimeoutException" in error_details: # successful return raise e diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py index e3ad166f..0631b217 100644 --- a/tests/integration/test_kraft.py +++ b/tests/integration/test_kraft.py @@ -33,8 +33,10 @@ class TestKRaft: deployment_strat: str = os.environ.get("DEPLOYMENT", "multi") controller_app: str = {"single": APP_NAME, "multi": CONTROLLER_APP}[deployment_strat] - async def _assert_listeners_accessible(self, ops_test: OpsTest, unit_num=0): - address = await get_address(ops_test=ops_test, app_name=APP_NAME, unit_num=0) + async def _assert_listeners_accessible( + self, ops_test: OpsTest, broker_unit_num=0, controller_unit_num=0 + ): + address = await get_address(ops_test=ops_test, app_name=APP_NAME, unit_num=broker_unit_num) assert check_socket( address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal ) # Internal listener @@ -47,7 +49,7 @@ async def _assert_listeners_accessible(self, ops_test: OpsTest, unit_num=0): # Check controller socket if self.controller_app != APP_NAME: address = await get_address( - ops_test=ops_test, app_name=self.controller_app, unit_num=unit_num + ops_test=ops_test, app_name=self.controller_app, unit_num=controller_unit_num ) assert check_socket(address, CONTROLLER_PORT) @@ -132,7 +134,7 @@ async def test_integrate(self, ops_test: OpsTest): @pytest.mark.abort_on_fail async def test_listeners(self, ops_test: OpsTest): - await self._assert_listeners_accessible(ops_test, unit_num=0) + await self._assert_listeners_accessible(ops_test) @pytest.mark.abort_on_fail async def test_authorizer(self, ops_test: OpsTest): @@ -193,10 +195,11 @@ async def test_leader_change(self, ops_test: OpsTest): ops_test, f"{self.controller_app}/1", bootstrap_controller ) - # assert previous leader is removed - assert (offset + 0) not in unit_status # assert new leader is elected - assert KRaftUnitStatus.LEADER in unit_status.values() + assert ( + unit_status[offset + 1] == KRaftUnitStatus.LEADER + or unit_status[offset + 2] == KRaftUnitStatus.LEADER + ) # test cluster stability by adding a new controller await ops_test.model.applications[self.controller_app].add_units(count=1) @@ -223,14 +226,15 @@ async def test_scale_in(self, ops_test: OpsTest): *(f"{self.controller_app}/{unit_id}" for unit_id in (1, 2)) ) await ops_test.model.wait_for_idle( - apps=list({APP_NAME, self.controller_app}), + apps=[self.controller_app], status="active", timeout=600, idle_period=20, + wait_for_exact_units=1, ) async with ops_test.fast_forward(fast_interval="20s"): - await asyncio.sleep(60) + await asyncio.sleep(120) address = await get_address(ops_test=ops_test, app_name=self.controller_app, unit_num=3) bootstrap_controller = f"{address}:{CONTROLLER_PORT}" @@ -241,4 +245,7 @@ async def test_scale_in(self, ops_test: OpsTest): ) assert unit_status[offset + 3] == KRaftUnitStatus.LEADER - await self._assert_listeners_accessible(ops_test, unit_num=3) + broker_unit_num = 3 if self.controller_app == APP_NAME else 0 + await self._assert_listeners_accessible( + ops_test, broker_unit_num=broker_unit_num, controller_unit_num=3 + ) From d7b8f38260da1cac4f7a9d4248820d409548ceae Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Tue, 17 Dec 2024 13:20:45 +0400 Subject: [PATCH 12/17] revert hackish changes & update snap revision to 48 --- lib/charms/operator_libs_linux/v1/snap.py | 11 ++--------- src/literals.py | 2 +- src/workload.py | 23 ++++++++++++----------- tests/integration/test_charm.py | 1 - 4 files changed, 15 insertions(+), 22 deletions(-) diff --git a/lib/charms/operator_libs_linux/v1/snap.py b/lib/charms/operator_libs_linux/v1/snap.py index 946e5193..71cdee39 100644 --- a/lib/charms/operator_libs_linux/v1/snap.py +++ b/lib/charms/operator_libs_linux/v1/snap.py @@ -103,13 +103,6 @@ def inner(*args, **kwargs): JSONType = Union[Dict[str, Any], List[Any], str, int, float] -def try_int(x): - try: - return int(x) - except ValueError: - return 777 - - class SnapService: """Data wrapper for snap services.""" @@ -835,7 +828,7 @@ def _load_installed_snaps(self) -> None: name=i["name"], state=SnapState.Latest, channel=i["channel"], - revision=try_int(i["revision"]), + revision=int(i["revision"]), confinement=i["confinement"], apps=i.get("apps", None), ) @@ -853,7 +846,7 @@ def _load_info(self, name) -> Snap: name=info["name"], state=SnapState.Available, channel=info["channel"], - revision=try_int(info["revision"]), + revision=int(info["revision"]), confinement=info["confinement"], apps=None, ) diff --git a/src/literals.py b/src/literals.py index e4f11e8a..4232cd87 100644 --- a/src/literals.py +++ b/src/literals.py @@ -12,7 +12,7 @@ CHARM_KEY = "kafka" SNAP_NAME = "charmed-kafka" -CHARMED_KAFKA_SNAP_REVISION = 45 +CHARMED_KAFKA_SNAP_REVISION = 48 CONTAINER = "kafka" SUBSTRATE = "vm" STORAGE = "data" diff --git a/src/workload.py b/src/workload.py index dbb5f780..c076c086 100644 --- a/src/workload.py +++ b/src/workload.py @@ -15,12 +15,17 @@ from typing_extensions import override from core.workload import CharmedKafkaPaths, WorkloadBase -from literals import BALANCER, BROKER, GROUP, KRAFT_VERSION, SNAP_NAME, USER +from literals import ( + BALANCER, + BROKER, + CHARMED_KAFKA_SNAP_REVISION, + GROUP, + KRAFT_VERSION, + SNAP_NAME, + USER, +) logger = logging.getLogger(__name__) -PATCHED_SNAP = ( - "https://custom-built-snaps.s3.eu-north-1.amazonaws.com/charmed-kafka_3.9.0_amd64.snap" -) class Workload(WorkloadBase): @@ -123,13 +128,9 @@ def install(self) -> bool: True if successfully installed. False otherwise. """ try: - import os - - os.system(f"wget {PATCHED_SNAP}") - os.system("sudo snap install --dangerous ./charmed-kafka_3.9.0_amd64.snap") - # self.kafka.ensure(snap.SnapState.Present, revision=CHARMED_KAFKA_SNAP_REVISION) - # self.kafka.connect(plug="removable-media") - # self.kafka.hold() + self.kafka.ensure(snap.SnapState.Present, revision=CHARMED_KAFKA_SNAP_REVISION) + self.kafka.connect(plug="removable-media") + self.kafka.hold() return True except snap.SnapError as e: diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 8fc31545..924abb9b 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -217,7 +217,6 @@ async def test_logs_write_to_storage(ops_test: OpsTest): ) -@pytest.mark.skip async def test_rack_awareness_integration(ops_test: OpsTest): machine_ids = await ops_test.model.get_machines() await ops_test.model.deploy( From c492942bcddf3eba929b37045d39ff8482b64624 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Wed, 18 Dec 2024 13:17:47 +0400 Subject: [PATCH 13/17] add controller manager and event handler --- src/events/broker.py | 176 +----------------------------- src/events/controller.py | 215 +++++++++++++++++++++++++++++++++++++ src/managers/controller.py | 142 ++++++++++++++++++++++++ src/workload.py | 56 ---------- 4 files changed, 359 insertions(+), 230 deletions(-) create mode 100644 src/events/controller.py create mode 100644 src/managers/controller.py diff --git a/src/events/broker.py b/src/events/broker.py index 6c64580d..28f01cba 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -13,10 +13,8 @@ from ops import ( EventBase, InstallEvent, - LeaderElectedEvent, Object, PebbleReadyEvent, - RelationDepartedEvent, SecretChangedEvent, StartEvent, StorageAttachedEvent, @@ -24,9 +22,9 @@ StorageEvent, UpdateStatusEvent, ) -from tenacity import retry, stop_after_attempt, wait_fixed from events.actions import ActionEvents +from events.controller import KRaftHandler from events.oauth import OAuthHandler from events.provider import KafkaProvider from events.upgrade import KafkaDependencyModel, KafkaUpgrade @@ -38,7 +36,6 @@ CONTROLLER, DEPENDENCIES, GROUP, - INTERNAL_USERS, PEER, PROFILE_TESTING, REL_NAME, @@ -97,6 +94,7 @@ def __init__(self, charm) -> None: self.provider = KafkaProvider(self) self.oauth = OAuthHandler(self) + self.kraft = KRaftHandler(self) # MANAGERS @@ -119,7 +117,6 @@ def __init__(self, charm) -> None: self.framework.observe(getattr(self.charm.on, "install"), self._on_install) self.framework.observe(getattr(self.charm.on, "start"), self._on_start) - self.framework.observe(getattr(self.charm.on, "leader_elected"), self._leader_elected) if self.charm.substrate == "k8s": self.framework.observe(getattr(self.charm.on, "kafka_pebble_ready"), self._on_start) @@ -129,9 +126,6 @@ def __init__(self, charm) -> None: self.framework.observe(getattr(self.charm.on, "secret_changed"), self._on_secret_changed) self.framework.observe(self.charm.on[PEER].relation_changed, self._on_config_changed) - self.framework.observe( - self.charm.on[PEER].relation_departed, self._on_peer_relation_departed - ) self.framework.observe( getattr(self.charm.on, "data_storage_attached"), self._on_storage_attached @@ -174,9 +168,6 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 if not self.upgrade.idle: return - if self.charm.state.kraft_mode: - self._init_kraft_mode() - # FIXME ready to start probably needs to account for credentials being created beforehand current_status = self.charm.state.ready_to_start if current_status is not Status.ACTIVE: @@ -184,10 +175,6 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 event.defer() return - if self.charm.state.kraft_mode: - self.config_manager.set_server_properties() - self._format_storages() - self.update_external_services() # required settings given zookeeper connection config has been created @@ -210,7 +197,6 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 # start kafka service self.workload.start() - self._add_controller() logger.info("Kafka service started") # TODO: Update users. Not sure if this is the best place, as cluster might be still @@ -345,7 +331,6 @@ def _on_update_status(self, _: UpdateStatusEvent) -> None: # NOTE for situations like IP change and late integration with rack-awareness charm. # If properties have changed, the broker will restart. self.charm.on.config_changed.emit() - self._add_controller() try: if self.health and not self.health.machine_configured(): @@ -438,159 +423,6 @@ def healthy(self) -> bool: return True - def _init_kraft_mode(self) -> None: - """Initialize the server when running controller mode.""" - # NOTE: checks for `runs_broker` in this method should be `is_cluster_manager` in - # the large deployment feature. - if not self.model.unit.is_leader(): - return - - if not self.charm.state.cluster.internal_user_credentials and self.charm.state.runs_broker: - credentials = [ - (username, self.charm.workload.generate_password()) for username in INTERNAL_USERS - ] - for username, password in credentials: - self.charm.state.cluster.update({f"{username}-password": password}) - - # cluster-uuid is only created on the broker (`cluster-manager` in large deployments) - if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker: - uuid = self.workload.generate_uuid() - self.charm.state.cluster.update({"cluster-uuid": uuid}) - self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) - - # Controller is tasked with populating quorum bootstrap config - if self.charm.state.runs_controller and not self.charm.state.cluster.bootstrap_controller: - quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris} - self.charm.state.cluster.update(quorum_uris) - - generated_password = self.charm.workload.generate_password() - - if self.charm.state.peer_cluster_orchestrator: - self.charm.state.peer_cluster_orchestrator.update(quorum_uris) - - if not self.charm.state.peer_cluster_orchestrator.controller_password: - self.charm.state.peer_cluster_orchestrator.update( - {"controller-password": generated_password} - ) - elif not self.charm.state.peer_cluster.controller_password: - # single mode, controller & leader - self.charm.state.cluster.update({"controller-password": generated_password}) - - bootstrap_data = { - "bootstrap-controller": self.charm.state.bootstrap_controller, - "bootstrap-unit-id": str(self.charm.state.kraft_unit_id), - "bootstrap-replica-id": self.workload.generate_uuid(), - } - self.charm.state.cluster.update(bootstrap_data) - - def _format_storages(self) -> None: - """Format storages provided relevant keys exist.""" - if self.charm.state.runs_broker: - credentials = self.charm.state.cluster.internal_user_credentials - elif self.charm.state.runs_controller: - credentials = { - self.charm.state.peer_cluster.broker_username: self.charm.state.peer_cluster.broker_password - } - - self.workload.format_storages( - uuid=self.charm.state.peer_cluster.cluster_uuid, - internal_user_credentials=credentials, - initial_controllers=f"{self.charm.state.peer_cluster.bootstrap_unit_id}@{self.charm.state.peer_cluster.bootstrap_controller}:{self.charm.state.peer_cluster.bootstrap_replica_id}", - ) - - def _leader_elected(self, event: LeaderElectedEvent) -> None: - if self.charm.state.runs_controller and self.charm.state.cluster.bootstrap_controller: - - updated_bootstrap_data = { - "bootstrap-controller": self.charm.state.bootstrap_controller, - "bootstrap-unit-id": str(self.charm.state.kraft_unit_id), - "bootstrap-replica-id": self.charm.state.unit_broker.directory_id, - } - self.charm.state.cluster.update(updated_bootstrap_data) - - if self.charm.state.peer_cluster_orchestrator: - self.charm.state.peer_cluster_orchestrator.update(updated_bootstrap_data) - - # A rolling restart is required to apply the new leader config - self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit() - - @retry( - wait=wait_fixed(15), - stop=stop_after_attempt(4), - reraise=True, - ) - def _add_controller(self) -> None: - """Adds current unit to the dynamic quorum in KRaft mode if this is a follower unit.""" - if ( - self.charm.state.runs_controller - and not self.charm.unit.is_leader() - and not self.charm.state.unit_broker.added_to_quorum - ): - result = self.workload.run_bin_command( - bin_keyword="metadata-quorum", - bin_args=[ - "--bootstrap-controller", - self.charm.state.cluster.bootstrap_controller, - "--command-config", - self.workload.paths.server_properties, - "add-controller", - ], - ) - logger.debug(result) - directory_id = self.workload.get_directory_id(self.charm.state.log_dirs) - self.charm.state.unit_broker.update( - {"directory-id": directory_id, "added-to-quorum": "true"} - ) - - @retry( - wait=wait_fixed(10), - stop=stop_after_attempt(3), - reraise=True, - ) - def _remove_controller( - self, controller_id: int, controller_directory_id: str, bootstrap_node: str | None = None - ): - if not bootstrap_node: - bootstrap_node = self.charm.state.cluster.bootstrap_controller - - try: - self.workload.run_bin_command( - bin_keyword="metadata-quorum", - bin_args=[ - "--bootstrap-controller", - bootstrap_node, - "--command-config", - self.workload.paths.server_properties, - "remove-controller", - "--controller-id", - str(controller_id), - "--controller-directory-id", - controller_directory_id, - ], - ) - except Exception as e: - error_details = getattr(e, "stderr") - if "VoterNotFoundException" in error_details or "TimeoutException" in error_details: - # successful - return - raise e - - def remove_from_quorum(self) -> None: - """Removes current unit from the dynamic quorum in KRaft mode.""" - if self.charm.state.runs_controller and ( - self.charm.state.unit_broker.added_to_quorum or self.charm.unit.is_leader() - ): - directory_id = ( - self.charm.state.unit_broker.directory_id - if not self.charm.unit.is_leader() - else self.charm.state.cluster.bootstrap_replica_id - ) - self._remove_controller( - self.charm.state.kraft_unit_id, - directory_id, - bootstrap_node=self.charm.state.bootstrap_controller, - ) - def update_external_services(self) -> None: """Attempts to update any external Kubernetes services.""" if not self.charm.substrate == "k8s": @@ -652,7 +484,3 @@ def update_peer_cluster_data(self) -> None: ) # self.charm.on.config_changed.emit() # ensure both broker+balancer get a changed event - - def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: - if event.departing_unit == self.charm.unit: - self.remove_from_quorum() diff --git a/src/events/controller.py b/src/events/controller.py new file mode 100644 index 00000000..d842b5c7 --- /dev/null +++ b/src/events/controller.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Handler for controller specific logic.""" + +import logging +from typing import TYPE_CHECKING + +from ops import ( + LeaderElectedEvent, + Object, + PebbleReadyEvent, + RelationDepartedEvent, + StartEvent, + UpdateStatusEvent, +) + +from literals import ( + CONTAINER, + CONTROLLER, + INTERNAL_USERS, + PEER, + Status, +) +from managers.controller import ControllerManager +from workload import KafkaWorkload + +if TYPE_CHECKING: + from charm import KafkaCharm + from events.broker import BrokerOperator + +logger = logging.getLogger(__name__) + + +class KRaftHandler(Object): + """Handler for KRaft specific events.""" + + def __init__(self, broker: "BrokerOperator") -> None: + super().__init__(broker, CONTROLLER.value) + self.charm: "KafkaCharm" = broker.charm + self.broker: "BrokerOperator" = broker + + self.workload = KafkaWorkload( + container=( + self.charm.unit.get_container(CONTAINER) if self.charm.substrate == "k8s" else None + ) + ) + + self.controller_manager = ControllerManager(self.charm.state, self.workload) + + self.upgrade = self.broker.upgrade + + self.framework.observe(getattr(self.charm.on, "start"), self._on_start) + self.framework.observe(getattr(self.charm.on, "leader_elected"), self._leader_elected) + + if self.charm.substrate == "k8s": + self.framework.observe(getattr(self.charm.on, "kafka_pebble_ready"), self._on_start) + + self.framework.observe(getattr(self.charm.on, "update_status"), self._on_update_status) + + self.framework.observe( + self.charm.on[PEER].relation_departed, self._on_peer_relation_departed + ) + + def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 + """Handler for `start` or `pebble-ready` events.""" + if not self.workload.container_can_connect: + event.defer() + return + + # don't want to run default start/pebble-ready events during upgrades + if not self.upgrade.idle: + return + + self._init_kraft_mode() + + current_status = self.charm.state.ready_to_start + if current_status is not Status.ACTIVE: + event.defer() + return + + self.broker.config_manager.set_server_properties() + self._format_storages() + + # update status to add controller + self.charm.on.update_status.emit() + + def _on_update_status(self, _: UpdateStatusEvent) -> None: + """Handler for `update-status` events.""" + if not self.upgrade.idle or not self.broker.healthy: + return + + self._add_controller() + + def _init_kraft_mode(self) -> None: + """Initialize the server when running controller mode.""" + # NOTE: checks for `runs_broker` in this method should be `is_cluster_manager` in + # the large deployment feature. + if not self.model.unit.is_leader(): + return + + if not self.charm.state.cluster.internal_user_credentials and self.charm.state.runs_broker: + credentials = [ + (username, self.charm.workload.generate_password()) for username in INTERNAL_USERS + ] + for username, password in credentials: + self.charm.state.cluster.update({f"{username}-password": password}) + + # cluster-uuid is only created on the broker (`cluster-manager` in large deployments) + if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker: + uuid = self.controller_manager.generate_uuid() + self.charm.state.cluster.update({"cluster-uuid": uuid}) + self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) + + # Controller is tasked with populating quorum bootstrap config + if self.charm.state.runs_controller and not self.charm.state.cluster.bootstrap_controller: + quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris} + self.charm.state.cluster.update(quorum_uris) + + generated_password = self.charm.workload.generate_password() + + if self.charm.state.peer_cluster_orchestrator: + self.charm.state.peer_cluster_orchestrator.update(quorum_uris) + + if not self.charm.state.peer_cluster_orchestrator.controller_password: + self.charm.state.peer_cluster_orchestrator.update( + {"controller-password": generated_password} + ) + elif not self.charm.state.peer_cluster.controller_password: + # single mode, controller & leader + self.charm.state.cluster.update({"controller-password": generated_password}) + + bootstrap_data = { + "bootstrap-controller": self.charm.state.bootstrap_controller, + "bootstrap-unit-id": str(self.charm.state.kraft_unit_id), + "bootstrap-replica-id": self.controller_manager.generate_uuid(), + } + self.charm.state.cluster.update(bootstrap_data) + + def _format_storages(self) -> None: + """Format storages provided relevant keys exist.""" + if not self.charm.state.kraft_mode: + return + + if self.charm.state.runs_broker: + credentials = self.charm.state.cluster.internal_user_credentials + elif self.charm.state.runs_controller: + credentials = { + self.charm.state.peer_cluster.broker_username: self.charm.state.peer_cluster.broker_password + } + + self.controller_manager.format_storages( + uuid=self.charm.state.peer_cluster.cluster_uuid, + internal_user_credentials=credentials, + initial_controllers=f"{self.charm.state.peer_cluster.bootstrap_unit_id}@{self.charm.state.peer_cluster.bootstrap_controller}:{self.charm.state.peer_cluster.bootstrap_replica_id}", + ) + + def _leader_elected(self, event: LeaderElectedEvent) -> None: + if ( + not self.charm.state.cluster.bootstrap_controller + or not self.charm.state.runs_controller + ): + return + + updated_bootstrap_data = { + "bootstrap-controller": self.charm.state.bootstrap_controller, + "bootstrap-unit-id": str(self.charm.state.kraft_unit_id), + "bootstrap-replica-id": self.charm.state.unit_broker.directory_id, + } + self.charm.state.cluster.update(updated_bootstrap_data) + + if self.charm.state.peer_cluster_orchestrator: + self.charm.state.peer_cluster_orchestrator.update(updated_bootstrap_data) + + # A rolling restart is required to apply the new leader config + self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit() + + def _add_controller(self) -> None: + """Adds current unit to the dynamic quorum in KRaft mode if this is a follower unit.""" + if ( + self.charm.unit.is_leader() + or self.charm.state.unit_broker.added_to_quorum + or not self.charm.state.runs_controller + ): + return + + directory_id = self.controller_manager.add_controller( + self.charm.state.cluster.bootstrap_controller + ) + + self.charm.state.unit_broker.update( + {"directory-id": directory_id, "added-to-quorum": "true"} + ) + + def remove_from_quorum(self) -> None: + """Removes current unit from the dynamic quorum in KRaft mode.""" + if not self.charm.state.runs_controller: + return + + if self.charm.state.unit_broker.added_to_quorum or self.charm.unit.is_leader(): + directory_id = ( + self.charm.state.unit_broker.directory_id + if not self.charm.unit.is_leader() + else self.charm.state.cluster.bootstrap_replica_id + ) + self.controller_manager.remove_controller( + self.charm.state.kraft_unit_id, + directory_id, + bootstrap_node=self.charm.state.bootstrap_controller, + ) + + def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: + if event.departing_unit == self.charm.unit: + self.remove_from_quorum() diff --git a/src/managers/controller.py b/src/managers/controller.py new file mode 100644 index 00000000..f36c8617 --- /dev/null +++ b/src/managers/controller.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Manager for handling KRaft Controller.""" + +import logging +import os +from typing import TYPE_CHECKING + +from tenacity import retry, stop_after_attempt, wait_fixed + +from core.cluster import ClusterState +from core.workload import WorkloadBase +from literals import GROUP, KRAFT_VERSION, USER + +if TYPE_CHECKING: + pass + + +logger = logging.getLogger(__name__) + + +class ControllerManager: + """Manager for handling KRaft controller functions.""" + + def __init__(self, state: ClusterState, workload: WorkloadBase): + self.state = state + self.workload = workload + + def format_storages( + self, + uuid: str, + internal_user_credentials: dict[str, str] | None = None, + kraft_version: int = KRAFT_VERSION, + initial_controllers: str | None = None, + ) -> None: + """Use a passed uuid to format storages.""" + # NOTE data dirs have changed permissions by storage_attached hook. For some reason + # storage command bin needs these locations to be root owned. Momentarily raise permissions + # during the format phase. + self.workload.exec(["chown", "-R", "root:root", f"{self.workload.paths.data_path}"]) + + command = [ + "format", + "--ignore-formatted", + "--cluster-id", + uuid, + "-c", + self.workload.paths.server_properties, + ] + + if kraft_version > 0: + command.append("--feature") + command.append(f"kraft.version={kraft_version}") + + if initial_controllers: + command.append("--initial-controllers") + command.append(initial_controllers) + else: + command.append("--standalone") + + if internal_user_credentials: + for user, password in internal_user_credentials.items(): + command += ["--add-scram", f"'SCRAM-SHA-512=[name={user},password={password}]'"] + self.workload.run_bin_command(bin_keyword="storage", bin_args=command) + + # Drop permissions again for the main process + self.workload.exec(["chmod", "-R", "750", f"{self.workload.paths.data_path}"]) + self.workload.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}"]) + + def generate_uuid(self) -> str: + """Generate UUID using `kafka-storage.sh` utility.""" + uuid = self.workload.run_bin_command( + bin_keyword="storage", bin_args=["random-uuid"] + ).strip() + return uuid + + def get_directory_id(self, log_dirs: str) -> str: + """Read directory.id from meta.properties file in the logs dir.""" + raw = self.workload.read(os.path.join(log_dirs, "meta.properties")) + for line in raw: + if line.startswith("directory.id"): + return line.strip().replace("directory.id=", "") + + return "" + + @retry( + wait=wait_fixed(15), + stop=stop_after_attempt(4), + reraise=True, + ) + def add_controller(self, bootstrap_node: str) -> str: + """Adds current unit to the dynamic quorum in KRaft mode, returns the added unit's directory_id if successful.""" + result = self.workload.run_bin_command( + bin_keyword="metadata-quorum", + bin_args=[ + "--bootstrap-controller", + bootstrap_node, + "--command-config", + self.workload.paths.server_properties, + "add-controller", + ], + ) + logger.debug(result) + + directory_id = self.get_directory_id(self.state.log_dirs) + return directory_id + + @retry( + wait=wait_fixed(10), + stop=stop_after_attempt(3), + reraise=True, + ) + def remove_controller( + self, controller_id: int, controller_directory_id: str, bootstrap_node: str | None = None + ): + """Removes a controller with specified controller_id and directory_id from KRaft dynamic quorum.""" + if not bootstrap_node: + bootstrap_node = self.state.cluster.bootstrap_controller + + try: + self.workload.run_bin_command( + bin_keyword="metadata-quorum", + bin_args=[ + "--bootstrap-controller", + bootstrap_node, + "--command-config", + self.workload.paths.server_properties, + "remove-controller", + "--controller-id", + str(controller_id), + "--controller-directory-id", + controller_directory_id, + ], + ) + except Exception as e: + error_details = getattr(e, "stderr") + if "VoterNotFoundException" in error_details or "TimeoutException" in error_details: + # successful + return + raise e diff --git a/src/workload.py b/src/workload.py index c076c086..53864f39 100644 --- a/src/workload.py +++ b/src/workload.py @@ -20,7 +20,6 @@ BROKER, CHARMED_KAFKA_SNAP_REVISION, GROUP, - KRAFT_VERSION, SNAP_NAME, USER, ) @@ -185,61 +184,6 @@ def run_bin_command( command = f"{opts_str} {SNAP_NAME}.{bin_keyword} {bin_str}" return self.exec(command) - def format_storages( - self, - uuid: str, - internal_user_credentials: dict[str, str] | None = None, - kraft_version: int = KRAFT_VERSION, - initial_controllers: str | None = None, - ) -> None: - """Use a passed uuid to format storages.""" - # NOTE data dirs have changed permissions by storage_attached hook. For some reason - # storage command bin needs these locations to be root owned. Momentarily raise permissions - # during the format phase. - self.exec(["chown", "-R", "root:root", f"{self.paths.data_path}"]) - - command = [ - "format", - "--ignore-formatted", - "--cluster-id", - uuid, - "-c", - self.paths.server_properties, - ] - - if kraft_version > 0: - command.append("--feature") - command.append(f"kraft.version={kraft_version}") - - if initial_controllers: - command.append("--initial-controllers") - command.append(initial_controllers) - else: - command.append("--standalone") - - if internal_user_credentials: - for user, password in internal_user_credentials.items(): - command += ["--add-scram", f"'SCRAM-SHA-512=[name={user},password={password}]'"] - self.run_bin_command(bin_keyword="storage", bin_args=command) - - # Drop permissions again for the main process - self.exec(["chmod", "-R", "750", f"{self.paths.data_path}"]) - self.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.paths.data_path}"]) - - def generate_uuid(self) -> str: - """Generate UUID using `kafka-storage.sh` utility.""" - uuid = self.run_bin_command(bin_keyword="storage", bin_args=["random-uuid"]).strip() - return uuid - - def get_directory_id(self, log_dirs: str) -> str: - """Read directory.id from meta.properties file in the logs dir.""" - raw = self.read(os.path.join(log_dirs, "meta.properties")) - for line in raw: - if line.startswith("directory.id"): - return line.strip().replace("directory.id=", "") - - return "" - class KafkaWorkload(Workload): """Broker specific wrapper.""" From 46282881e148d2f6cdf8d0e61e399a4ca187aa52 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Wed, 18 Dec 2024 13:57:07 +0400 Subject: [PATCH 14/17] some fixes --- src/core/models.py | 16 +--------------- src/events/controller.py | 4 ++-- src/managers/config.py | 1 - src/managers/controller.py | 5 +++-- tests/unit/test_charm.py | 2 +- 5 files changed, 7 insertions(+), 21 deletions(-) diff --git a/src/core/models.py b/src/core/models.py index 6235429f..9ace20d9 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -132,7 +132,7 @@ def __init__( self._balancer_uris = balancer_uris self._controller_password = controller_password - def _fetch_from_secrets(self, group, field): + def _fetch_from_secrets(self, group, field) -> str: if not self.relation: return "" @@ -166,13 +166,6 @@ def broker_username(self) -> str: return self._fetch_from_secrets("broker", "broker-username") - return self.data_interface._fetch_relation_data_with_secrets( - component=self.relation.app, - req_secret_fields=BALANCER.requested_secrets, - relation=self.relation, - fields=BALANCER.requested_secrets, - ).get("broker-username", "") - @property def broker_password(self) -> str: """The provided password for the broker application.""" @@ -184,13 +177,6 @@ def broker_password(self) -> str: return self._fetch_from_secrets("broker", "broker-password") - return self.data_interface._fetch_relation_data_with_secrets( - component=self.relation.app, - req_secret_fields=BALANCER.requested_secrets, - relation=self.relation, - fields=BALANCER.requested_secrets, - ).get("broker-password", "") - @property def broker_uris(self) -> str: """The provided uris for the balancer application to connect to the broker application.""" diff --git a/src/events/controller.py b/src/events/controller.py index d842b5c7..b002267d 100644 --- a/src/events/controller.py +++ b/src/events/controller.py @@ -80,7 +80,6 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 event.defer() return - self.broker.config_manager.set_server_properties() self._format_storages() # update status to add controller @@ -97,7 +96,7 @@ def _init_kraft_mode(self) -> None: """Initialize the server when running controller mode.""" # NOTE: checks for `runs_broker` in this method should be `is_cluster_manager` in # the large deployment feature. - if not self.model.unit.is_leader(): + if not self.model.unit.is_leader() or not self.charm.state.kraft_mode: return if not self.charm.state.cluster.internal_user_credentials and self.charm.state.runs_broker: @@ -143,6 +142,7 @@ def _format_storages(self) -> None: if not self.charm.state.kraft_mode: return + self.broker.config_manager.set_server_properties() if self.charm.state.runs_broker: credentials = self.charm.state.cluster.internal_user_credentials elif self.charm.state.runs_controller: diff --git a/src/managers/config.py b/src/managers/config.py index 2c9b3ba2..e2db5370 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -696,7 +696,6 @@ def controller_properties(self) -> list[str]: properties = [ f"process.roles={','.join(roles)}", f"node.id={node_id}", - # f"controller.quorum.voters={self.state.peer_cluster.controller_quorum_uris}", f"controller.quorum.bootstrap.servers={self.state.peer_cluster.bootstrap_controller}", f"controller.listener.names={CONTROLLER_LISTENER_NAME}", *self.controller_scram_properties, diff --git a/src/managers/controller.py b/src/managers/controller.py index f36c8617..d68465f7 100644 --- a/src/managers/controller.py +++ b/src/managers/controller.py @@ -6,6 +6,7 @@ import logging import os +from subprocess import CalledProcessError from typing import TYPE_CHECKING from tenacity import retry, stop_after_attempt, wait_fixed @@ -134,8 +135,8 @@ def remove_controller( controller_directory_id, ], ) - except Exception as e: - error_details = getattr(e, "stderr") + except CalledProcessError as e: + error_details = e.stderr if "VoterNotFoundException" in error_details or "TimeoutException" in error_details: # successful return diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index ce8adbae..8e4cff8d 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -265,7 +265,7 @@ def test_start_defers_without_zookeeper(ctx: Context, base_state: State) -> None state_out = ctx.run(ctx.on.start(), state_in) # Then - assert len(state_out.deferred) == 1 + assert len(state_out.deferred) == 2 assert state_out.deferred[0].name == "start" From 434c948bf8ebe9fe629f192be00541c3e2cc61ef Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Wed, 18 Dec 2024 17:25:42 +0400 Subject: [PATCH 15/17] don't emit restart on leader_elected --- src/events/controller.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/events/controller.py b/src/events/controller.py index b002267d..893c3a12 100644 --- a/src/events/controller.py +++ b/src/events/controller.py @@ -173,8 +173,8 @@ def _leader_elected(self, event: LeaderElectedEvent) -> None: if self.charm.state.peer_cluster_orchestrator: self.charm.state.peer_cluster_orchestrator.update(updated_bootstrap_data) - # A rolling restart is required to apply the new leader config - self.charm.on[f"{self.charm.restart.name}"].acquire_lock.emit() + # change bootstrap controller config on followers and brokers + self.charm.on.config_changed.emit() def _add_controller(self) -> None: """Adds current unit to the dynamic quorum in KRaft mode if this is a follower unit.""" From b3b7ac84ff2c382943a159826d12deca6de6df2b Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Wed, 18 Dec 2024 17:26:33 +0400 Subject: [PATCH 16/17] cast scope --- src/managers/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/managers/config.py b/src/managers/config.py index e2db5370..45ba060a 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -11,7 +11,7 @@ import re import textwrap from abc import abstractmethod -from typing import Iterable +from typing import Iterable, cast from lightkube.core.exceptions import ApiError from typing_extensions import override @@ -113,9 +113,9 @@ def __init__( self.node_port = node_port @property - def scope(self) -> str: + def scope(self) -> Scope: """Internal scope validator.""" - return self._scope + return cast(Scope, self._scope) @scope.setter def scope(self, value): From ce50ad8e59bc20c8618c50c8a26972a1cb9eca9e Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Thu, 19 Dec 2024 10:42:21 +0400 Subject: [PATCH 17/17] apply fixes from Enrico's review --- src/literals.py | 6 ------ src/managers/controller.py | 5 ----- tests/integration/helpers.py | 17 +++++++++-------- tests/integration/test_kraft.py | 10 ++++++++-- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/src/literals.py b/src/literals.py index 4232cd87..52f814b9 100644 --- a/src/literals.py +++ b/src/literals.py @@ -288,12 +288,6 @@ class Status(Enum): ) -class KRaftUnitStatus(Enum): - LEADER = "Leader" - FOLLOWER = "Follower" - OBSERVER = "Observer" - - DEPENDENCIES = { "kafka_service": { "dependencies": {"zookeeper": ">3.6"}, diff --git a/src/managers/controller.py b/src/managers/controller.py index d68465f7..935311fa 100644 --- a/src/managers/controller.py +++ b/src/managers/controller.py @@ -7,7 +7,6 @@ import logging import os from subprocess import CalledProcessError -from typing import TYPE_CHECKING from tenacity import retry, stop_after_attempt, wait_fixed @@ -15,10 +14,6 @@ from core.workload import WorkloadBase from literals import GROUP, KRAFT_VERSION, USER -if TYPE_CHECKING: - pass - - logger = logging.getLogger(__name__) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index bb5ac530..cea263dc 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -6,6 +6,7 @@ import socket import subprocess from contextlib import closing +from enum import Enum from json.decoder import JSONDecodeError from pathlib import Path from subprocess import PIPE, CalledProcessError, check_output @@ -23,14 +24,7 @@ from tenacity.wait import wait_fixed from core.models import JSON -from literals import ( - BALANCER_WEBSERVER_USER, - JMX_CC_PORT, - PATHS, - PEER, - SECURITY_PROTOCOL_PORTS, - KRaftUnitStatus, -) +from literals import BALANCER_WEBSERVER_USER, JMX_CC_PORT, PATHS, PEER, SECURITY_PROTOCOL_PORTS from managers.auth import Acl, AuthManager METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) @@ -40,6 +34,13 @@ REL_NAME_ADMIN = "kafka-client-admin" TEST_DEFAULT_MESSAGES = 15 + +class KRaftUnitStatus(Enum): + LEADER = "Leader" + FOLLOWER = "Follower" + OBSERVER = "Observer" + + logger = logging.getLogger(__name__) diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py index 0631b217..0c193d85 100644 --- a/tests/integration/test_kraft.py +++ b/tests/integration/test_kraft.py @@ -15,10 +15,16 @@ PEER_CLUSTER_ORCHESTRATOR_RELATION, PEER_CLUSTER_RELATION, SECURITY_PROTOCOL_PORTS, - KRaftUnitStatus, ) -from .helpers import APP_NAME, check_socket, create_test_topic, get_address, kraft_quorum_status +from .helpers import ( + APP_NAME, + KRaftUnitStatus, + check_socket, + create_test_topic, + get_address, + kraft_quorum_status, +) logger = logging.getLogger(__name__)