From 86f044581b40a3932c577eaf7f1f15d0a3921958 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Zamora=20Mart=C3=ADnez?= <76525382+zmraul@users.noreply.github.com> Date: Wed, 23 Oct 2024 22:20:40 +0200 Subject: [PATCH] KRaft (#232) --- .github/workflows/ci.yaml | 2 + config.yaml | 2 +- pyproject.toml | 2 +- src/charm.py | 10 +- src/core/cluster.py | 160 ++++++++++++++++++++-------- src/core/models.py | 55 ++++++++++ src/core/structured_config.py | 4 +- src/events/balancer.py | 6 +- src/events/broker.py | 108 ++++++++++++++++--- src/events/peer_cluster.py | 93 ++++++++++++----- src/literals.py | 24 ++++- src/managers/auth.py | 13 ++- src/managers/balancer.py | 6 +- src/managers/config.py | 109 ++++++++++++++++--- src/workload.py | 26 +++++ tests/integration/test_kraft.py | 132 +++++++++++++++++++++++ tests/unit/scenario/test_kraft.py | 168 ++++++++++++++++++++++++++++++ tests/unit/test_charm.py | 4 +- tox.ini | 15 +++ 19 files changed, 816 insertions(+), 123 deletions(-) create mode 100644 tests/integration/test_kraft.py create mode 100644 tests/unit/scenario/test_kraft.py diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 87e937d5..db910608 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -71,6 +71,8 @@ jobs: - integration-upgrade - integration-balancer-single - integration-balancer-multi + - integration-kraft-single + - integration-kraft-multi name: ${{ matrix.tox-environments }} needs: - lint diff --git a/config.yaml b/config.yaml index a13fff71..881de916 100644 --- a/config.yaml +++ b/config.yaml @@ -5,7 +5,7 @@ options: roles: description: | Comma separated list of the roles assigned to the nodes of this cluster. - This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control). + This configuration accepts the following roles: 'broker' (standard functionality), 'balancer' (cruise control), 'controller' (KRaft mode). type: string default: broker compression_type: diff --git a/pyproject.toml b/pyproject.toml index 84b93116..d8694190 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ show_missing = true minversion = "6.0" log_cli_level = "INFO" asyncio_mode = "auto" -markers = ["unstable", "broker", "balancer"] +markers = ["unstable", "broker", "balancer", "kraft"] # Formatting tools configuration [tool.black] diff --git a/src/charm.py b/src/charm.py index a8849305..339126f8 100755 --- a/src/charm.py +++ b/src/charm.py @@ -120,7 +120,10 @@ def _on_roles_changed(self, _): This handler is in charge of stopping the workloads, since the sub-operators would not be instantiated if roles are changed. """ - if not self.state.runs_broker and self.broker.workload.active(): + if ( + not (self.state.runs_broker or self.state.runs_controller) + and self.broker.workload.active() + ): self.broker.workload.stop() if ( @@ -179,8 +182,9 @@ def _on_collect_status(self, event: CollectStatusEvent): if not self.broker.workload.active(): event.add_status(Status.BROKER_NOT_RUNNING.value.status) - if not self.state.zookeeper.broker_active(): - event.add_status(Status.ZK_NOT_CONNECTED.value.status) + if not self.state.kraft_mode: + if not self.state.zookeeper.broker_active(): + event.add_status(Status.ZK_NOT_CONNECTED.value.status) if __name__ == "__main__": diff --git a/src/core/cluster.py b/src/core/cluster.py index 4db5888e..c66b2202 100644 --- a/src/core/cluster.py +++ b/src/core/cluster.py @@ -37,7 +37,10 @@ ADMIN_USER, BALANCER, BROKER, + CONTROLLER, + CONTROLLER_PORT, INTERNAL_USERS, + KRAFT_NODE_ID_OFFSET, MIN_REPLICAS, OAUTH_REL_NAME, PEER, @@ -84,7 +87,7 @@ class PeerClusterData(ProviderData, RequirerData): """Broker provider data model.""" SECRET_LABEL_MAP = SECRET_LABEL_MAP - SECRET_FIELDS = BALANCER.requested_secrets + SECRET_FIELDS = list(set(BALANCER.requested_secrets) | set(CONTROLLER.requested_secrets)) class ClusterState(Object): @@ -136,46 +139,48 @@ def peer_cluster_relation(self) -> Relation | None: @property def peer_cluster_orchestrator(self) -> PeerCluster: """The state for the related `peer-cluster-orchestrator` application that this charm is requiring from.""" - balancer_kwargs: dict[str, Any] = ( - { - "balancer_username": self.cluster.balancer_username, - "balancer_password": self.cluster.balancer_password, - "balancer_uris": self.cluster.balancer_uris, - } - if self.runs_balancer - else {} - ) + extra_kwargs: dict[str, Any] = {} + + if self.runs_balancer: + extra_kwargs.update( + { + "balancer_username": self.cluster.balancer_username, + "balancer_password": self.cluster.balancer_password, + "balancer_uris": self.cluster.balancer_uris, + } + ) + + if self.runs_controller: + extra_kwargs.update( + { + "controller_quorum_uris": self.cluster.controller_quorum_uris, + } + ) return PeerCluster( relation=self.peer_cluster_relation, data_interface=PeerClusterData(self.model, PEER_CLUSTER_RELATION), - **balancer_kwargs, + **extra_kwargs, ) @property def peer_cluster(self) -> PeerCluster: - """The state for the related `peer-cluster` application that this charm is providing to.""" - return PeerCluster( - relation=self.peer_cluster_orchestrator_relation, - data_interface=PeerClusterOrchestratorData( - self.model, PEER_CLUSTER_ORCHESTRATOR_RELATION - ), - ) - - @property - def balancer(self) -> PeerCluster: """The state for the `peer-cluster-orchestrator` related balancer application.""" - balancer_kwargs: dict[str, Any] = ( - { - "balancer_username": self.cluster.balancer_username, - "balancer_password": self.cluster.balancer_password, - "balancer_uris": self.cluster.balancer_uris, - } - if self.runs_balancer - else {} - ) + extra_kwargs: dict[str, Any] = {} - if self.runs_broker: # must be providing, initialise with necessary broker data + if self.runs_controller or self.runs_balancer: + extra_kwargs.update( + { + "balancer_username": self.cluster.balancer_username, + "balancer_password": self.cluster.balancer_password, + "balancer_uris": self.cluster.balancer_uris, + "controller_quorum_uris": self.cluster.controller_quorum_uris, + } + ) + + # FIXME: `cluster_manager` check instead of running broker + # must be providing, initialise with necessary broker data + if self.runs_broker: return PeerCluster( relation=self.peer_cluster_orchestrator_relation, # if same app, this will be None and OK data_interface=PeerClusterOrchestratorData( @@ -184,12 +189,13 @@ def balancer(self) -> PeerCluster: broker_username=ADMIN_USER, broker_password=self.cluster.internal_user_credentials.get(ADMIN_USER, ""), broker_uris=self.bootstrap_server, + cluster_uuid=self.cluster.cluster_uuid, racks=self.racks, broker_capacities=self.broker_capacities, zk_username=self.zookeeper.username, zk_password=self.zookeeper.password, zk_uris=self.zookeeper.uris, - **balancer_kwargs, # in case of roles=broker,balancer on this app + **extra_kwargs, # in case of roles=broker,[balancer,controller] on this app ) else: # must be roles=balancer only then, only load with necessary balancer data @@ -345,7 +351,11 @@ def default_auth(self) -> AuthMap: def enabled_auth(self) -> list[AuthMap]: """The currently enabled auth.protocols and their auth.mechanisms, based on related applications.""" enabled_auth = [] - if self.client_relations or self.runs_balancer or self.peer_cluster_orchestrator_relation: + if ( + self.client_relations + or self.runs_balancer + or BALANCER.value in self.peer_cluster_orchestrator.roles + ): enabled_auth.append(self.default_auth) if self.oauth_relation: enabled_auth.append(AuthMap(self.default_auth.protocol, "OAUTHBEARER")) @@ -394,6 +404,21 @@ def bootstrap_server(self) -> str: ) ) + @property + def controller_quorum_uris(self) -> str: + """The current controller quorum uris when running KRaft mode.""" + # FIXME: when running broker node.id will be unit-id + 100. If unit is only running + # the controller node.id == unit-id. This way we can keep a human readable mapping of ids. + if self.runs_controller: + node_offset = KRAFT_NODE_ID_OFFSET if self.runs_broker else 0 + return ",".join( + [ + f"{broker.unit_id + node_offset}@{broker.host}:{CONTROLLER_PORT}" + for broker in self.brokers + ] + ) + return "" + @property def log_dirs(self) -> str: """Builds the necessary log.dirs based on mounted storage volumes. @@ -446,7 +471,7 @@ def ready_to_start(self) -> Status: # noqa: C901 if not self.peer_relation: return Status.NO_PEER_RELATION - for status in [self._broker_status, self._balancer_status]: + for status in [self._broker_status, self._balancer_status, self._controller_status]: if status != Status.ACTIVE: return status @@ -461,29 +486,40 @@ def _balancer_status(self) -> Status: if not self.peer_cluster_relation and not self.runs_broker: return Status.NO_PEER_CLUSTER_RELATION - if not self.balancer.broker_connected: + if not self.peer_cluster.broker_connected: return Status.NO_BROKER_DATA - if len(self.balancer.broker_capacities.get("brokerCapacities", [])) < MIN_REPLICAS: + if len(self.peer_cluster.broker_capacities.get("brokerCapacities", [])) < MIN_REPLICAS: return Status.NOT_ENOUGH_BROKERS return Status.ACTIVE @property - def _broker_status(self) -> Status: + def _broker_status(self) -> Status: # noqa: C901 """Checks for role=broker specific readiness.""" if not self.runs_broker: return Status.ACTIVE - if not self.zookeeper: - return Status.ZK_NOT_RELATED + # Neither ZooKeeper or KRaft are active + if self.kraft_mode is None: + return Status.MISSING_MODE + + if self.kraft_mode: + if not self.peer_cluster.controller_quorum_uris: # FIXME: peer_cluster or cluster? + return Status.NO_QUORUM_URIS + if not self.cluster.cluster_uuid: + return Status.NO_CLUSTER_UUID - if not self.zookeeper.zookeeper_connected: - return Status.ZK_NO_DATA + if self.kraft_mode == False: # noqa: E712 + if not self.zookeeper: + return Status.ZK_NOT_RELATED - # TLS must be enabled for Kafka and ZK or disabled for both - if self.cluster.tls_enabled ^ self.zookeeper.tls: - return Status.ZK_TLS_MISMATCH + if not self.zookeeper.zookeeper_connected: + return Status.ZK_NO_DATA + + # TLS must be enabled for Kafka and ZK or disabled for both + if self.cluster.tls_enabled ^ self.zookeeper.tls: + return Status.ZK_TLS_MISMATCH if self.cluster.tls_enabled and not self.unit_broker.certificate: return Status.NO_CERT @@ -493,6 +529,37 @@ def _broker_status(self) -> Status: return Status.ACTIVE + @property + def _controller_status(self) -> Status: + """Checks for role=controller specific readiness.""" + if not self.runs_controller: + return Status.ACTIVE + + if not self.peer_cluster_relation and not self.runs_broker: + return Status.NO_PEER_CLUSTER_RELATION + + if not self.peer_cluster.broker_connected_kraft_mode: + return Status.NO_BROKER_DATA + + return Status.ACTIVE + + @property + def kraft_mode(self) -> bool | None: + """Is the deployment running in KRaft mode? + + Returns: + True if Kraft mode, False if ZooKeeper, None when undefined. + """ + # NOTE: self.roles when running colocated, peer_cluster.roles when multiapp + if CONTROLLER.value in (self.roles + self.peer_cluster.roles): + return True + if self.zookeeper_relation: + return False + + # FIXME raise instead of none. `not kraft_mode` is falsy + # NOTE: if previous checks are not met, we don't know yet how the charm is being deployed + return None + @property def runs_balancer(self) -> bool: """Is the charm enabling the balancer?""" @@ -502,3 +569,8 @@ def runs_balancer(self) -> bool: def runs_broker(self) -> bool: """Is the charm enabling the broker(s)?""" return BROKER.value in self.roles + + @property + def runs_controller(self) -> bool: + """Is the charm enabling the controller?""" + return CONTROLLER.value in self.roles diff --git a/src/core/models.py b/src/core/models.py index 33cbdea9..9a8f136a 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -93,6 +93,8 @@ def __init__( broker_username: str = "", broker_password: str = "", broker_uris: str = "", + cluster_uuid: str = "", + controller_quorum_uris: str = "", racks: int = 0, broker_capacities: BrokerCapacities = {}, zk_username: str = "", @@ -106,6 +108,8 @@ def __init__( self._broker_username = broker_username self._broker_password = broker_password self._broker_uris = broker_uris + self._cluster_uuid = cluster_uuid + self._controller_quorum_uris = controller_quorum_uris self._racks = racks self._broker_capacities = broker_capacities self._zk_username = zk_username @@ -174,6 +178,38 @@ def broker_uris(self) -> str: fields=BALANCER.requested_secrets, ).get("broker-uris", "") + @property + def controller_quorum_uris(self) -> str: + """The quorum voters in KRaft mode.""" + if self._controller_quorum_uris: + return self._controller_quorum_uris + + if not self.relation or not self.relation.app: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="controller-quorum-uris" + ) + or "" + ) + + @property + def cluster_uuid(self) -> str: + """The cluster uuid used to format storages in KRaft mode.""" + if self._cluster_uuid: + return self._cluster_uuid + + if not self.relation or not self.relation.app: + return "" + + return ( + self.data_interface.fetch_relation_field( + relation_id=self.relation.id, field="cluster-uuid" + ) + or "" + ) + @property def racks(self) -> int: """The number of racks for the brokers.""" @@ -303,6 +339,7 @@ def balancer_uris(self) -> str: @property def broker_connected(self) -> bool: """Checks if there is an active broker relation with all necessary data.""" + # FIXME rename to specify balancer-broker connection if not all( [ self.broker_username, @@ -319,6 +356,14 @@ def broker_connected(self) -> bool: return True + @property + def broker_connected_kraft_mode(self) -> bool: + """Checks for necessary data required by a controller.""" + if not all([self.broker_username, self.broker_password, self.cluster_uuid]): + return False + + return True + class KafkaCluster(RelationState): """State collection metadata for the peer relation.""" @@ -407,6 +452,16 @@ def balancer_uris(self) -> str: """Persisted balancer uris.""" return self.relation_data.get("balancer-uris", "") + @property + def controller_quorum_uris(self) -> str: + """Persisted controller quorum voters.""" + return self.relation_data.get("controller-quorum-uris", "") + + @property + def cluster_uuid(self) -> str: + """Cluster uuid used for initializing storages.""" + return self.relation_data.get("cluster-uuid", "") + class KafkaBroker(RelationState): """State collection metadata for a unit.""" diff --git a/src/core/structured_config.py b/src/core/structured_config.py index 9b102046..3a18ec50 100644 --- a/src/core/structured_config.py +++ b/src/core/structured_config.py @@ -10,7 +10,7 @@ from charms.data_platform_libs.v0.data_models import BaseConfigModel from pydantic import Field, validator -from literals import BALANCER, BROKER, SUBSTRATE +from literals import BALANCER, BROKER, CONTROLLER, SUBSTRATE logger = logging.getLogger(__name__) @@ -261,7 +261,7 @@ def roles_values(cls, value: str) -> str: """Check roles values.""" roles = set(map(str.strip, value.split(","))) - if unknown_roles := roles - {BROKER.value, BALANCER.value}: + if unknown_roles := roles - {BROKER.value, BALANCER.value, CONTROLLER.value}: raise ValueError("Unknown role(s):", unknown_roles) return ",".join(sorted(roles)) # this has to be a string as it goes in to properties diff --git a/src/events/balancer.py b/src/events/balancer.py index 1ec88b7e..1433828b 100644 --- a/src/events/balancer.py +++ b/src/events/balancer.py @@ -169,7 +169,7 @@ def _on_config_changed(self, _: EventBase) -> None: content_changed = True # On k8s, adding/removing a broker does not change the bootstrap server property if exposed by nodeport - broker_capacities = self.charm.state.balancer.broker_capacities + broker_capacities = self.charm.state.peer_cluster.broker_capacities if ( file_content := json.loads( "".join(self.workload.read(self.workload.paths.capacity_jbod_json)) @@ -200,8 +200,8 @@ def rebalance(self, event: ActionEvent) -> None: available_brokers = [broker.unit_id for broker in self.charm.state.brokers] else: brokers = ( - [broker.name for broker in self.charm.state.balancer.relation.units] - if self.charm.state.balancer.relation + [broker.name for broker in self.charm.state.peer_cluster.relation.units] + if self.charm.state.peer_cluster.relation else [] ) available_brokers = [int(broker.split("/")[1]) for broker in brokers] diff --git a/src/events/broker.py b/src/events/broker.py index 9a9ceec7..1e2c9b3f 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -33,8 +33,10 @@ from literals import ( BROKER, CONTAINER, + CONTROLLER, DEPENDENCIES, GROUP, + INTERNAL_USERS, PEER, PROFILE_TESTING, REL_NAME, @@ -75,7 +77,7 @@ def __init__(self, charm) -> None: ) # Fast exit after workload instantiation, but before any event observer - if BROKER.value not in self.charm.config.roles: + if not any(role in self.charm.config.roles for role in [BROKER.value, CONTROLLER.value]): return self.health = KafkaHealth(self) if self.charm.substrate == "vm" else None @@ -87,7 +89,9 @@ def __init__(self, charm) -> None: ), ) self.password_action_events = PasswordActionEvents(self) - self.zookeeper = ZooKeeperHandler(self) + if not self.charm.state.runs_controller: + self.zookeeper = ZooKeeperHandler(self) + self.provider = KafkaProvider(self) self.oauth = OAuthHandler(self) @@ -148,7 +152,7 @@ def _on_install(self, event: InstallEvent) -> None: f"{TESTING_OPTIONS}" ) - def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: + def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 """Handler for `start` or `pebble-ready` events.""" if not self.workload.container_can_connect: event.defer() @@ -163,11 +167,19 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: if not self.upgrade.idle: return + if self.charm.state.kraft_mode: + self._init_kraft_mode() + + # FIXME ready to start probably needs to account for credentials being created beforehand self.charm._set_status(self.charm.state.ready_to_start) if not isinstance(self.charm.unit.status, ActiveStatus): event.defer() return + if self.charm.state.kraft_mode: + self.config_manager.set_server_properties() + self._format_storages() + self.update_external_services() # required settings given zookeeper connection config has been created @@ -192,6 +204,20 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: self.workload.start() logger.info("Kafka service started") + # TODO: Update users. Not sure if this is the best place, as cluster might be still + # stabilizing. + # if self.charm.state.kraft_mode and self.charm.state.runs_broker: + # for username, password in self.charm.state.cluster.internal_user_credentials.items(): + # try: + # self.auth_manager.add_user( + # username=username, password=password, zk_auth=False, internal=True, + # ) + # except subprocess.CalledProcessError: + # logger.warning("Error adding users, cluster might not be ready yet") + # logger.error(f"\n\tOn start:\nAdding user {username} failed. Let the rest of the hook run\n") + # # event.defer() + # continue + # service_start might fail silently, confirm with ZK if kafka is actually connected self.charm.on.update_status.emit() @@ -286,7 +312,7 @@ def _on_config_changed(self, event: EventBase) -> None: if self.model.relations.get(REL_NAME, None) and self.charm.unit.is_leader(): self.update_client_data() - if self.charm.state.peer_cluster_relation and self.charm.unit.is_leader(): + if self.charm.state.peer_cluster_orchestrator_relation and self.charm.unit.is_leader(): self.update_peer_cluster_data() def _on_update_status(self, _: UpdateStatusEvent) -> None: @@ -294,9 +320,10 @@ def _on_update_status(self, _: UpdateStatusEvent) -> None: if not self.upgrade.idle or not self.healthy: return - if not self.charm.state.zookeeper.broker_active(): - self.charm._set_status(Status.ZK_NOT_CONNECTED) - return + if not self.charm.state.kraft_mode: + if not self.charm.state.zookeeper.broker_active(): + self.charm._set_status(Status.ZK_NOT_CONNECTED) + return # NOTE for situations like IP change and late integration with rack-awareness charm. # If properties have changed, the broker will restart. @@ -334,10 +361,12 @@ def _on_storage_attached(self, event: StorageAttachedEvent) -> None: self.charm.state.unit_broker.update({"storages": self.balancer_manager.storages}) - if self.charm.substrate == "vm": + # FIXME: if KRaft, don't execute + if self.charm.substrate == "vm" and not self.charm.state.kraft_mode: # new dirs won't be used until topic partitions are assigned to it # either automatically for new topics, or manually for existing # set status only for running services, not on startup + # FIXME re-add this self.workload.exec(["chmod", "-R", "750", f"{self.workload.paths.data_path}"]) self.workload.exec( ["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}"] @@ -388,6 +417,50 @@ def healthy(self) -> bool: return True + def _init_kraft_mode(self) -> None: + """Initialize the server when running controller mode.""" + # NOTE: checks for `runs_broker` in this method should be `is_cluster_manager` in + # the large deployment feature. + if not self.model.unit.is_leader(): + return + + if not self.charm.state.cluster.internal_user_credentials and self.charm.state.runs_broker: + credentials = [ + (username, self.charm.workload.generate_password()) for username in INTERNAL_USERS + ] + for username, password in credentials: + self.charm.state.cluster.update({f"{username}-password": password}) + + # cluster-uuid is only created on the broker (`cluster-manager` in large deployments) + if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker: + uuid = self.workload.run_bin_command( + bin_keyword="storage", bin_args=["random-uuid", "2>", "/dev/null"] + ).strip() + self.charm.state.cluster.update({"cluster-uuid": uuid}) + self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) + + # Controller is tasked with populating quorum uris + if self.charm.state.runs_controller: + quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris} + self.charm.state.cluster.update(quorum_uris) + + if self.charm.state.peer_cluster_orchestrator: + self.charm.state.peer_cluster_orchestrator.update(quorum_uris) + + def _format_storages(self) -> None: + """Format storages provided relevant keys exist.""" + if self.charm.state.runs_broker: + credentials = self.charm.state.cluster.internal_user_credentials + elif self.charm.state.runs_controller: + credentials = { + self.charm.state.peer_cluster.broker_username: self.charm.state.peer_cluster.broker_password + } + + self.workload.format_storages( + uuid=self.charm.state.peer_cluster.cluster_uuid, + internal_user_credentials=credentials, + ) + def update_external_services(self) -> None: """Attempts to update any external Kubernetes services.""" if not self.charm.substrate == "k8s": @@ -433,17 +506,18 @@ def update_peer_cluster_data(self) -> None: if not self.charm.unit.is_leader() or not self.healthy: return - self.charm.state.balancer.update( + self.charm.state.peer_cluster.update( { "roles": self.charm.state.roles, - "broker-username": self.charm.state.balancer.broker_username, - "broker-password": self.charm.state.balancer.broker_password, - "broker-uris": self.charm.state.balancer.broker_uris, - "racks": str(self.charm.state.balancer.racks), - "broker-capacities": json.dumps(self.charm.state.balancer.broker_capacities), - "zk-uris": self.charm.state.balancer.zk_uris, - "zk-username": self.charm.state.balancer.zk_username, - "zk-password": self.charm.state.balancer.zk_password, + "broker-username": self.charm.state.peer_cluster.broker_username, + "broker-password": self.charm.state.peer_cluster.broker_password, + "broker-uris": self.charm.state.peer_cluster.broker_uris, + "cluster-uuid": self.charm.state.peer_cluster.cluster_uuid, + "racks": str(self.charm.state.peer_cluster.racks), + "broker-capacities": json.dumps(self.charm.state.peer_cluster.broker_capacities), + "zk-uris": self.charm.state.peer_cluster.zk_uris, + "zk-username": self.charm.state.peer_cluster.zk_username, + "zk-password": self.charm.state.peer_cluster.zk_password, } ) diff --git a/src/events/peer_cluster.py b/src/events/peer_cluster.py index bae68875..1b9e066c 100644 --- a/src/events/peer_cluster.py +++ b/src/events/peer_cluster.py @@ -16,13 +16,20 @@ diff, set_encoded_field, ) -from ops.charm import RelationChangedEvent, RelationCreatedEvent, RelationEvent, SecretChangedEvent +from ops.charm import ( + RelationBrokenEvent, + RelationChangedEvent, + RelationCreatedEvent, + RelationEvent, + SecretChangedEvent, +) from ops.framework import Object from core.cluster import custom_secret_groups from literals import ( BALANCER, BROKER, + CONTROLLER, PEER_CLUSTER_ORCHESTRATOR_RELATION, PEER_CLUSTER_RELATION, ) @@ -72,18 +79,20 @@ def _on_peer_cluster_created(self, event: RelationCreatedEvent) -> None: if not self.charm.unit.is_leader() or not event.relation.app: return - requested_secrets = ( - BALANCER.requested_secrets - if self.charm.state.runs_balancer - else BROKER.requested_secrets - ) or [] + requested_secrets = set() + if self.charm.state.runs_balancer: + requested_secrets |= set(BALANCER.requested_secrets) + if self.charm.state.runs_controller: + requested_secrets |= set(CONTROLLER.requested_secrets) + if self.charm.state.runs_broker: + requested_secrets |= set(BROKER.requested_secrets) # request secrets for the relation set_encoded_field( event.relation, self.charm.state.cluster.app, REQ_SECRET_FIELDS, - requested_secrets, + list(requested_secrets), ) # explicitly update the roles early, as we can't determine which model to instantiate @@ -92,21 +101,19 @@ def _on_peer_cluster_created(self, event: RelationCreatedEvent) -> None: def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None: """Generic handler for peer-cluster `relation-changed` events.""" - if ( - not self.charm.unit.is_leader() - or not self.charm.state.runs_balancer # only balancer needs handle this event - or not self.charm.state.balancer.roles # ensures secrets have set-up before writing - ): + # ensures secrets have set-up before writing + if not self.charm.unit.is_leader() or not self.charm.state.peer_cluster.roles: return self._default_relation_changed(event) # will no-op if relation does not exist - self.charm.state.balancer.update( + self.charm.state.peer_cluster.update( { - "balancer-username": self.charm.state.balancer.balancer_username, - "balancer-password": self.charm.state.balancer.balancer_password, - "balancer-uris": self.charm.state.balancer.balancer_uris, + "balancer-username": self.charm.state.peer_cluster.balancer_username, + "balancer-password": self.charm.state.peer_cluster.balancer_password, + "balancer-uris": self.charm.state.peer_cluster.balancer_uris, + "controller-quorum-uris": self.charm.state.peer_cluster.controller_quorum_uris, } ) @@ -114,33 +121,65 @@ def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None: def _on_peer_cluster_orchestrator_changed(self, event: RelationChangedEvent) -> None: """Generic handler for peer-cluster-orchestrator `relation-changed` events.""" + # TODO: `cluster_manager` check instead of runs_broker if ( not self.charm.unit.is_leader() or not self.charm.state.runs_broker # only broker needs handle this event - or "balancer" - not in self.charm.state.balancer.roles # ensures secrets have set-up before writing, and only writing to balancers + or not any( + role in self.charm.state.peer_cluster.roles + for role in [BALANCER.value, CONTROLLER.value] + ) # ensures secrets have set-up before writing, and only writing to controller,balancers ): return self._default_relation_changed(event) # will no-op if relation does not exist - self.charm.state.balancer.update( + self.charm.state.peer_cluster.update( { "roles": self.charm.state.roles, - "broker-username": self.charm.state.balancer.broker_username, - "broker-password": self.charm.state.balancer.broker_password, - "broker-uris": self.charm.state.balancer.broker_uris, - "racks": str(self.charm.state.balancer.racks), - "broker-capacities": json.dumps(self.charm.state.balancer.broker_capacities), - "zk-uris": self.charm.state.balancer.zk_uris, - "zk-username": self.charm.state.balancer.zk_username, - "zk-password": self.charm.state.balancer.zk_password, + "broker-username": self.charm.state.peer_cluster.broker_username, + "broker-password": self.charm.state.peer_cluster.broker_password, + "broker-uris": self.charm.state.peer_cluster.broker_uris, + "cluster-uuid": self.charm.state.peer_cluster.cluster_uuid, + "racks": str(self.charm.state.peer_cluster.racks), + "broker-capacities": json.dumps(self.charm.state.peer_cluster.broker_capacities), + "zk-uris": self.charm.state.peer_cluster.zk_uris, + "zk-username": self.charm.state.peer_cluster.zk_username, + "zk-password": self.charm.state.peer_cluster.zk_password, } ) self.charm.on.config_changed.emit() # ensure both broker+balancer get a changed event + def _on_peer_cluster_broken(self, _: RelationBrokenEvent): + """Handle the required logic to remove.""" + if self.charm.state.kraft_mode is not None: + return + + self.charm.workload.stop() + logger.info(f'Service {self.model.unit.name.split("/")[1]} stopped') + + # FIXME: probably a mix between cluster_manager and broker + if self.charm.state.runs_broker: + # Kafka keeps a meta.properties in every log.dir with a unique ClusterID + # this ID is provided by ZK, and removing it on relation-broken allows + # re-joining to another ZK cluster. + for storage in self.charm.model.storages["data"]: + self.charm.workload.exec( + [ + "rm", + f"{storage.location}/meta.properties", + f"{storage.location}/__cluster_metadata-0/quorum-state", + ] + ) + + if self.charm.unit.is_leader(): + # other charm methods assume credentials == ACLs + # necessary to clean-up credentials once ZK relation is lost + for username in self.charm.state.cluster.internal_user_credentials: + self.charm.state.cluster.update({f"{username}-password": ""}) + def _default_relation_changed(self, event: RelationChangedEvent): """Implements required logic from multiple 'handled' events from the `data-interfaces` library.""" if not isinstance(event, RelationEvent) or not event.relation or not event.relation.app: diff --git a/src/literals.py b/src/literals.py index b3b84344..5a2768a0 100644 --- a/src/literals.py +++ b/src/literals.py @@ -12,7 +12,7 @@ CHARM_KEY = "kafka" SNAP_NAME = "charmed-kafka" -CHARMED_KAFKA_SNAP_REVISION = 39 +CHARMED_KAFKA_SNAP_REVISION = 42 CONTAINER = "kafka" SUBSTRATE = "vm" STORAGE = "data" @@ -86,6 +86,13 @@ class Ports: AuthMap("SASL_PLAINTEXT", "OAUTHBEARER"): Ports(9095, 19095, 29095), AuthMap("SASL_SSL", "OAUTHBEARER"): Ports(9096, 19096, 29096), } +# FIXME this port should exist on the previous abstraction +CONTROLLER_PORT = 9097 +CONTROLLER_LISTENER_NAME = "INTERNAL_CONTROLLER" + +# FIXME: when running broker node.id will be unit-id + 100. If unit is only running +# the controller node.id == unit-id. This way we can keep a human readable mapping of ids. +KRAFT_NODE_ID_OFFSET = 100 DebugLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"] DatabagScope = Literal["unit", "app"] @@ -123,7 +130,7 @@ class Role: service: str paths: dict[str, str] relation: str - requested_secrets: list[str] | None = None + requested_secrets: list[str] def __eq__(self, value: object, /) -> bool: """Provide an easy comparison to the configuration key.""" @@ -141,6 +148,16 @@ def __eq__(self, value: object, /) -> bool: "balancer-uris", ], ) +CONTROLLER = Role( + value="controller", + service="daemon", + paths=PATHS["kafka"], + relation=PEER_CLUSTER_RELATION, + requested_secrets=[ + "broker-username", + "broker-password", + ], +) BALANCER = Role( value="balancer", service="cruise-control", @@ -211,6 +228,9 @@ class Status(Enum): BROKER_NOT_RUNNING = StatusLevel(BlockedStatus("Broker not running"), "WARNING") NOT_ALL_RELATED = StatusLevel(MaintenanceStatus("not all units related"), "DEBUG") CC_NOT_RUNNING = StatusLevel(BlockedStatus("Cruise Control not running"), "WARNING") + MISSING_MODE = StatusLevel(BlockedStatus("Application needs ZooKeeper or KRaft mode"), "DEBUG") + NO_CLUSTER_UUID = StatusLevel(WaitingStatus("Waiting for cluster uuid"), "DEBUG") + NO_QUORUM_URIS = StatusLevel(WaitingStatus("Waiting for quorum uris"), "DEBUG") ZK_NOT_RELATED = StatusLevel(BlockedStatus("missing required zookeeper relation"), "DEBUG") ZK_NOT_CONNECTED = StatusLevel(BlockedStatus("unit not connected to zookeeper"), "ERROR") ZK_TLS_MISMATCH = StatusLevel( diff --git a/src/managers/auth.py b/src/managers/auth.py index e7bb896b..99890139 100644 --- a/src/managers/auth.py +++ b/src/managers/auth.py @@ -13,6 +13,7 @@ from core.cluster import ClusterState from core.workload import WorkloadBase +from literals import SECURITY_PROTOCOL_PORTS logger = logging.getLogger(__name__) @@ -136,7 +137,9 @@ def _generate_consumer_acls(topic: str, username: str, group: str | None = None) return consumer_acls - def add_user(self, username: str, password: str, zk_auth: bool = False) -> None: + def add_user( + self, username: str, password: str, zk_auth: bool = False, internal: bool = False + ) -> None: """Adds new user credentials to ZooKeeper. Args: @@ -144,6 +147,7 @@ def add_user(self, username: str, password: str, zk_auth: bool = False) -> None: password: the user password zk_auth: flag to specify adding users using ZooKeeper authorizer For use before cluster start + internal: flag to use internal ports or client ones Raises: `(subprocess.CalledProcessError | ops.pebble.ExecError)`: if the error returned a non-zero exit code @@ -164,8 +168,13 @@ def add_user(self, username: str, password: str, zk_auth: bool = False) -> None: ] opts = [self.kafka_opts] else: + bootstrap_server = ( + f"{self.state.unit_broker.internal_address}:{SECURITY_PROTOCOL_PORTS[self.state.default_auth].internal}" + if internal + else self.state.bootstrap_server + ) command = base_command + [ - f"--bootstrap-server={self.state.bootstrap_server}", + f"--bootstrap-server={bootstrap_server}", f"--command-config={self.workload.paths.client_properties}", ] opts = [] diff --git a/src/managers/balancer.py b/src/managers/balancer.py index 870e6e18..8104518b 100644 --- a/src/managers/balancer.py +++ b/src/managers/balancer.py @@ -137,8 +137,8 @@ def __init__(self, dependent: "BrokerOperator | BalancerOperator") -> None: def cruise_control(self) -> CruiseControlClient: """Client for the CruiseControl REST API.""" return CruiseControlClient( - username=self.charm.state.balancer.balancer_username, - password=self.charm.state.balancer.balancer_password, + username=self.charm.state.peer_cluster.balancer_username, + password=self.charm.state.peer_cluster.balancer_password, ) @property @@ -160,7 +160,7 @@ def storages(self) -> str: def create_internal_topics(self) -> None: """Create Cruise Control topics.""" - bootstrap_servers = self.charm.state.balancer.broker_uris + bootstrap_servers = self.charm.state.peer_cluster.broker_uris property_file = f'{BALANCER.paths["CONF"]}/cruisecontrol.properties' for topic in BALANCER_TOPICS: diff --git a/src/managers/config.py b/src/managers/config.py index 2031ca8b..fdd0316c 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -23,6 +23,8 @@ ADMIN_USER, BALANCER_GOALS_TESTING, BROKER, + CONTROLLER_LISTENER_NAME, + CONTROLLER_PORT, DEFAULT_BALANCER_GOALS, HARD_BALANCER_GOALS, INTER_BROKER_USER, @@ -30,6 +32,7 @@ JMX_EXPORTER_PORT, JVM_MEM_MAX_GB, JVM_MEM_MIN_GB, + KRAFT_NODE_ID_OFFSET, PROFILE_TESTING, SECURITY_PROTOCOL_PORTS, AuthMap, @@ -40,7 +43,6 @@ DEFAULT_CONFIG_OPTIONS = """ sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 -authorizer.class.name=kafka.security.authorizer.AclAuthorizer allow.everyone.if.no.acl.found=false auto.create.topics.enable=false metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter @@ -67,7 +69,13 @@ num.partition.metrics.windows=3 num.broker.metrics.windows=10 """ -SERVER_PROPERTIES_BLACKLIST = ["profile", "log_level", "certificate_extra_sans"] +SERVER_PROPERTIES_BLACKLIST = [ + "profile", + "log_level", + "certificate_extra_sans", + "roles", + "expose_external", +] class Listener: @@ -270,9 +278,11 @@ def __init__( @property @override def kafka_opts(self) -> str: - opts = [ - f"-Djava.security.auth.login.config={self.workload.paths.zk_jaas}", - ] + opts = [] + if not self.state.runs_controller: + opts = [ + f"-Djava.security.auth.login.config={self.workload.paths.zk_jaas}", + ] http_proxy = os.environ.get("JUJU_CHARM_HTTP_PROXY") https_proxy = os.environ.get("JUJU_CHARM_HTTPS_PROXY") @@ -314,6 +324,9 @@ def auth_properties(self) -> list[str]: Returns: List of properties to be set """ + if self.state.kraft_mode: + return [] + return [ f"broker.id={self.state.unit_broker.unit_id}", f"zookeeper.connect={self.state.zookeeper.connect}", @@ -441,6 +454,11 @@ def internal_listener(self) -> Listener: scope="INTERNAL", ) + @property + def controller_listener(self) -> None: + """Return the controller listener.""" + pass # TODO: No good abstraction in place for the controller use case + @property def client_listeners(self) -> list[Listener]: """Return a list of extra listeners.""" @@ -564,6 +582,41 @@ def metrics_reporter_properties(self) -> list[str]: username=ADMIN_USER, prefix="cruise.control.metrics.reporter" ) + @property + def authorizer_class(self) -> list[str]: + """Return the authorizer Java class used on Kafka.""" + if self.state.kraft_mode: + # return ["authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer"] + return [] + return ["authorizer.class.name=kafka.security.authorizer.AclAuthorizer"] + + @property + def controller_properties(self) -> list[str]: + """Builds all properties necessary for starting Kafka controller service. + + Returns: + List of properties to be set + """ + if self.state.kraft_mode == False: # noqa: E712 + return [] + + roles = [] + node_id = self.state.unit_broker.unit_id + if self.state.runs_broker: + roles.append("broker") + node_id += KRAFT_NODE_ID_OFFSET + if self.state.runs_controller: + roles.append("controller") + + properties = [ + f"process.roles={','.join(roles)}", + f"node.id={node_id}", + f"controller.quorum.voters={self.state.peer_cluster.controller_quorum_uris}", + f"controller.listener.names={CONTROLLER_LISTENER_NAME}", + ] + + return properties + @property def server_properties(self) -> list[str]: """Builds all properties necessary for starting Kafka service. @@ -580,6 +633,24 @@ def server_properties(self) -> list[str]: listeners_repr = [listener.listener for listener in self.all_listeners] advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] + if self.state.kraft_mode: + controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT" + controller_listener = f"{CONTROLLER_LISTENER_NAME}://0.0.0.0:{CONTROLLER_PORT}" + + # NOTE: Case where the controller is running standalone. Early return with a + # smaller subset of config options + if not self.state.runs_broker: + properties = ( + [f"log.dirs={self.state.log_dirs}", f"listeners={controller_listener}"] + + self.controller_properties + # + self.authorizer_class + ) + return properties + + protocol_map.append(controller_protocol_map) + if self.state.runs_controller: + listeners_repr.append(controller_listener) + properties = ( [ f"super.users={self.state.super_users}", @@ -591,17 +662,21 @@ def server_properties(self) -> list[str]: f"inter.broker.protocol.version={self.inter_broker_protocol_version}", ] + self.scram_properties + + self.auth_properties + self.oauth_properties + self.config_properties + self.default_replication_properties - + self.auth_properties + self.rack_properties + self.metrics_reporter_properties + DEFAULT_CONFIG_OPTIONS.split("\n") + + self.authorizer_class + + self.controller_properties ) if self.state.cluster.tls_enabled and self.state.unit_broker.certificate: - properties += self.tls_properties + self.zookeeper_tls_properties + properties += self.tls_properties + if self.state.kraft_mode == False: # noqa: E712 + properties += self.zookeeper_tls_properties if self.config.profile == PROFILE_TESTING: properties += TESTING_OPTIONS.split("\n") @@ -732,10 +807,12 @@ def goals(self) -> list[str]: if self.config.profile == PROFILE_TESTING: goals = BALANCER_GOALS_TESTING - if self.state.balancer.racks: + if self.state.peer_cluster.racks: if ( - min([3, len(self.state.balancer.broker_capacities.get("brokerCapacities", []))]) - > self.state.balancer.racks + min( + [3, len(self.state.peer_cluster.broker_capacities.get("brokerCapacities", []))] + ) + > self.state.peer_cluster.racks ): # replication-factor > racks is not ideal goals = goals + ["RackAwareDistribution"] else: @@ -789,10 +866,10 @@ def cruise_control_properties(self) -> list[str]: """ properties = ( [ - f"bootstrap.servers={self.state.balancer.broker_uris}", - f"zookeeper.connect={self.state.balancer.zk_uris}", + f"bootstrap.servers={self.state.peer_cluster.broker_uris}", + f"zookeeper.connect={self.state.peer_cluster.zk_uris}", "zookeeper.security.enabled=true", - f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{self.state.balancer.broker_username}" password="{self.state.balancer.broker_password}";', + f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{self.state.peer_cluster.broker_username}" password="{self.state.peer_cluster.broker_password}";', f"sasl.mechanism={self.state.default_auth.mechanism}", f"security.protocol={self.state.default_auth.protocol}", f"capacity.config.file={self.workload.paths.capacity_jbod_json}", @@ -818,8 +895,8 @@ def jaas_config(self) -> str: f""" Client {{ org.apache.zookeeper.server.auth.DigestLoginModule required - username="{self.state.balancer.zk_username}" - password="{self.state.balancer.zk_password}"; + username="{self.state.peer_cluster.zk_username}" + password="{self.state.peer_cluster.zk_password}"; }}; """ ) @@ -838,7 +915,7 @@ def set_cruise_control_properties(self) -> None: def set_broker_capacities(self) -> None: """Writes all broker storage capacities to `capacityJBOD.json`.""" self.workload.write( - content=json.dumps(self.state.balancer.broker_capacities), + content=json.dumps(self.state.peer_cluster.broker_capacities), path=self.workload.paths.capacity_jbod_json, ) diff --git a/src/workload.py b/src/workload.py index 53864f39..6622602e 100644 --- a/src/workload.py +++ b/src/workload.py @@ -184,6 +184,32 @@ def run_bin_command( command = f"{opts_str} {SNAP_NAME}.{bin_keyword} {bin_str}" return self.exec(command) + def format_storages( + self, uuid: str, internal_user_credentials: dict[str, str] | None = None + ) -> None: + """Use a passed uuid to format storages.""" + # NOTE data dirs have changed permissions by storage_attached hook. For some reason + # storage command bin needs these locations to be root owned. Momentarily raise permissions + # during the format phase. + self.exec(["chown", "-R", "root:root", f"{self.paths.data_path}"]) + + command = [ + "format", + "--ignore-formatted", + "--cluster-id", + uuid, + "-c", + self.paths.server_properties, + ] + if internal_user_credentials: + for user, password in internal_user_credentials.items(): + command += ["--add-scram", f"'SCRAM-SHA-512=[name={user},password={password}]'"] + self.run_bin_command(bin_keyword="storage", bin_args=command) + + # Drop permissions again for the main process + self.exec(["chmod", "-R", "750", f"{self.paths.data_path}"]) + self.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.paths.data_path}"]) + class KafkaWorkload(Workload): """Broker specific wrapper.""" diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py new file mode 100644 index 00000000..1269acbf --- /dev/null +++ b/tests/integration/test_kraft.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import asyncio +import logging +import os + +import pytest +from pytest_operator.plugin import OpsTest + +from literals import ( + CONTROLLER_PORT, + PEER_CLUSTER_ORCHESTRATOR_RELATION, + PEER_CLUSTER_RELATION, + SECURITY_PROTOCOL_PORTS, +) + +from .helpers import ( + APP_NAME, + check_socket, + get_address, +) + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.kraft + +CONTROLLER_APP = "controller" +PRODUCER_APP = "producer" + + +class TestKRaft: + + deployment_strat: str = os.environ.get("DEPLOYMENT", "multi") + controller_app: str = {"single": APP_NAME, "multi": CONTROLLER_APP}[deployment_strat] + + @pytest.mark.abort_on_fail + async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm): + await ops_test.model.add_machine(series="jammy") + machine_ids = await ops_test.model.get_machines() + + await asyncio.gather( + ops_test.model.deploy( + kafka_charm, + application_name=APP_NAME, + num_units=1, + series="jammy", + to=machine_ids[0], + config={ + "roles": "broker,controller" if self.controller_app == APP_NAME else "broker", + "profile": "testing", + }, + trust=True, + ), + ops_test.model.deploy( + "kafka-test-app", + application_name=PRODUCER_APP, + channel="edge", + num_units=1, + series="jammy", + config={ + "topic_name": "HOT-TOPIC", + "num_messages": 100000, + "role": "producer", + "partitions": 20, + "replication_factor": "1", + }, + trust=True, + ), + ) + + if self.controller_app != APP_NAME: + await ops_test.model.deploy( + kafka_charm, + application_name=self.controller_app, + num_units=1, + series="jammy", + config={ + "roles": self.controller_app, + "profile": "testing", + }, + trust=True, + ) + + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), + idle_period=30, + timeout=1800, + raise_on_error=False, + ) + if self.controller_app != APP_NAME: + assert ops_test.model.applications[APP_NAME].status == "blocked" + assert ops_test.model.applications[self.controller_app].status == "blocked" + else: + assert ops_test.model.applications[APP_NAME].status == "active" + + @pytest.mark.abort_on_fail + async def test_integrate(self, ops_test: OpsTest): + if self.controller_app != APP_NAME: + await ops_test.model.add_relation( + f"{APP_NAME}:{PEER_CLUSTER_ORCHESTRATOR_RELATION}", + f"{CONTROLLER_APP}:{PEER_CLUSTER_RELATION}", + ) + + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), idle_period=30 + ) + + async with ops_test.fast_forward(fast_interval="40s"): + await asyncio.sleep(120) + + assert ops_test.model.applications[APP_NAME].status == "active" + assert ops_test.model.applications[self.controller_app].status == "active" + + @pytest.mark.abort_on_fail + async def test_listeners(self, ops_test: OpsTest): + address = await get_address(ops_test=ops_test) + assert check_socket( + address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal + ) # Internal listener + + # Client listener should not be enabled if there is no relations + assert not check_socket( + address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].client + ) + + # Check controller socket + if self.controller_app != APP_NAME: + address = await get_address(ops_test=ops_test, app_name=self.controller_app) + + assert check_socket(address, CONTROLLER_PORT) diff --git a/tests/unit/scenario/test_kraft.py b/tests/unit/scenario/test_kraft.py new file mode 100644 index 00000000..dd9e756a --- /dev/null +++ b/tests/unit/scenario/test_kraft.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +import dataclasses +import json +import logging +from pathlib import Path +from unittest.mock import patch + +import pytest +import yaml +from ops import ActiveStatus +from scenario import Container, Context, PeerRelation, Relation, State + +from charm import KafkaCharm +from literals import ( + CONTAINER, + PEER, + PEER_CLUSTER_ORCHESTRATOR_RELATION, + PEER_CLUSTER_RELATION, + SUBSTRATE, + Status, +) + +pytestmark = pytest.mark.kraft + +logger = logging.getLogger(__name__) + + +CONFIG = yaml.safe_load(Path("./config.yaml").read_text()) +ACTIONS = yaml.safe_load(Path("./actions.yaml").read_text()) +METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) + + +@pytest.fixture() +def charm_configuration(): + """Enable direct mutation on configuration dict.""" + return json.loads(json.dumps(CONFIG)) + + +@pytest.fixture() +def base_state(): + + if SUBSTRATE == "k8s": + state = State(leader=True, containers=[Container(name=CONTAINER, can_connect=True)]) + + else: + state = State(leader=True) + + return state + + +def test_ready_to_start_maintenance_no_peer_relation(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "controller" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + state_in = base_state + + # When + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + assert state_out.unit_status == Status.NO_PEER_RELATION.value.status + + +def test_ready_to_start_no_peer_cluster(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "controller" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation(PEER, PEER) + state_in = dataclasses.replace(base_state, relations=[cluster_peer]) + + # When + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + assert state_out.unit_status == Status.NO_PEER_CLUSTER_RELATION.value.status + + +def test_ready_to_start_missing_data_as_controller(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "controller" + charm_configuration["options"]["expose-external"]["default"] = "none" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation(PEER, PEER) + peer_cluster = Relation(PEER_CLUSTER_RELATION, "peer_cluster") + state_in = dataclasses.replace(base_state, relations=[cluster_peer, peer_cluster]) + + # When + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + assert state_out.unit_status == Status.NO_BROKER_DATA.value.status + + +def test_ready_to_start_missing_data_as_broker(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "broker" + charm_configuration["options"]["expose-external"]["default"] = "none" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation(PEER, PEER) + peer_cluster = Relation( + PEER_CLUSTER_ORCHESTRATOR_RELATION, "peer_cluster", remote_app_data={"roles": "controller"} + ) + state_in = dataclasses.replace(base_state, relations=[cluster_peer, peer_cluster]) + + # When + with patch("workload.KafkaWorkload.run_bin_command", return_value="cluster-uuid-number"): + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + assert state_out.unit_status == Status.NO_QUORUM_URIS.value.status + + +def test_ready_to_start(charm_configuration, base_state: State): + # Given + charm_configuration["options"]["roles"]["default"] = "broker,controller" + charm_configuration["options"]["expose-external"]["default"] = "none" + ctx = Context( + KafkaCharm, + meta=METADATA, + config=charm_configuration, + actions=ACTIONS, + ) + cluster_peer = PeerRelation(PEER, PEER) + state_in = dataclasses.replace(base_state, relations=[cluster_peer]) + + # When + with ( + patch( + "workload.KafkaWorkload.run_bin_command", return_value="cluster-uuid-number" + ) as patched_run_bin_command, + patch("health.KafkaHealth.machine_configured", return_value=True), + patch("workload.KafkaWorkload.start"), + patch("charms.operator_libs_linux.v1.snap.SnapCache"), + ): + state_out = ctx.run(ctx.on.start(), state_in) + + # Then + # Second call of format will have to pass "cluster-uuid-number" as set above + assert "cluster-uuid-number" in patched_run_bin_command.call_args_list[1][1]["bin_args"] + assert "cluster-uuid" in state_out.get_relations(PEER)[0].local_app_data + assert "controller-quorum-uris" in state_out.get_relations(PEER)[0].local_app_data + # Only the internal users should be created. + assert "admin-password" in next(iter(state_out.secrets)).latest_content + assert "sync-password" in next(iter(state_out.secrets)).latest_content + assert state_out.unit_status == ActiveStatus() diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 924b1c82..d502b176 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -101,12 +101,12 @@ def test_ready_to_start_maintenance_no_peer_relation(harness: Harness[KafkaCharm assert harness.charm.unit.status == Status.NO_PEER_RELATION.value.status -def test_ready_to_start_blocks_no_zookeeper_relation(harness: Harness[KafkaCharm]): +def test_ready_to_start_blocks_no_mode(harness: Harness[KafkaCharm]): with harness.hooks_disabled(): harness.add_relation(PEER, CHARM_KEY) harness.charm.on.start.emit() - assert harness.charm.unit.status == Status.ZK_NOT_RELATED.value.status + assert harness.charm.unit.status == Status.MISSING_MODE.value.status def test_ready_to_start_waits_no_zookeeper_data(harness: Harness[KafkaCharm]): diff --git a/tox.ini b/tox.ini index 5a139fa2..b13e4c24 100644 --- a/tox.ini +++ b/tox.ini @@ -30,6 +30,8 @@ set_env = ha: TEST_FILE=ha/test_ha.py balancer-single: DEPLOYMENT=single balancer-multi: DEPLOYMENT=multi + kraft-single: DEPLOYMENT=single + kraft-multi: DEPLOYMENT=multi pass_env = PYTHONPATH @@ -113,3 +115,16 @@ pass_env = commands = poetry install --no-root --with integration poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_balancer.py + +[testenv:integration-kraft-{single,multi}] +description = Run KRaft mode tests +set_env = + {[testenv]set_env} + # Workaround for https://github.com/python-poetry/poetry/issues/6958 + POETRY_INSTALLER_PARALLEL = false +pass_env = + {[testenv]pass_env} + CI +commands = + poetry install --no-root --with integration + poetry run pytest -vv --tb native --log-cli-level=INFO -s {posargs} {[vars]tests_path}/integration/test_kraft.py