From b5d3c3b0cb1392140b89fe3aad52d8996b1a8b03 Mon Sep 17 00:00:00 2001 From: Iman Enami <44609233+imanenami@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:52:57 +0400 Subject: [PATCH 1/2] [DPE-5349] feat: Add internal user and SASL/SCRAM authentication (#284) Co-authored-by: Iman Enami --- src/core/cluster.py | 9 +++++++++ src/core/models.py | 23 +++++++++++++++++++++ src/events/broker.py | 12 ++++++++++- src/events/peer_cluster.py | 1 + src/literals.py | 13 ++++++++++-- src/managers/config.py | 36 ++++++++++++++++++++++++++++----- tests/integration/test_kraft.py | 18 ++++++++++------- 7 files changed, 97 insertions(+), 15 deletions(-) diff --git a/src/core/cluster.py b/src/core/cluster.py index 0c067a2c..b132f00e 100644 --- a/src/core/cluster.py +++ b/src/core/cluster.py @@ -40,6 +40,7 @@ BROKER, CONTROLLER, CONTROLLER_PORT, + CONTROLLER_USER, INTERNAL_USERS, KRAFT_NODE_ID_OFFSET, MIN_REPLICAS, @@ -65,10 +66,12 @@ setattr(custom_secret_groups, "BROKER", "broker") setattr(custom_secret_groups, "BALANCER", "balancer") setattr(custom_secret_groups, "ZOOKEEPER", "zookeeper") +setattr(custom_secret_groups, "CONTROLLER", "controller") SECRET_LABEL_MAP = { "broker-username": getattr(custom_secret_groups, "BROKER"), "broker-password": getattr(custom_secret_groups, "BROKER"), + "controller-password": getattr(custom_secret_groups, "CONTROLLER"), "broker-uris": getattr(custom_secret_groups, "BROKER"), "zk-username": getattr(custom_secret_groups, "ZOOKEEPER"), "zk-password": getattr(custom_secret_groups, "ZOOKEEPER"), @@ -157,6 +160,7 @@ def peer_cluster_orchestrator(self) -> PeerCluster: extra_kwargs.update( { "controller_quorum_uris": self.cluster.controller_quorum_uris, + "controller_password": self.cluster.controller_password, } ) @@ -178,6 +182,7 @@ def peer_cluster(self) -> PeerCluster: "balancer_password": self.cluster.balancer_password, "balancer_uris": self.cluster.balancer_uris, "controller_quorum_uris": self.cluster.controller_quorum_uris, + "controller_password": self.cluster.controller_password, } ) @@ -323,6 +328,10 @@ def super_users(self) -> str: Semicolon delimited string of current super users """ super_users = set(INTERNAL_USERS) + + if self.kraft_mode: + super_users.add(CONTROLLER_USER) + for relation in self.client_relations: if not relation or not relation.app: continue diff --git a/src/core/models.py b/src/core/models.py index bd6eacf1..776d99d9 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -103,6 +103,7 @@ def __init__( balancer_username: str = "", balancer_password: str = "", balancer_uris: str = "", + controller_password: str = "", ): super().__init__(relation, data_interface, None, None) self._broker_username = broker_username @@ -118,6 +119,7 @@ def __init__( self._balancer_username = balancer_username self._balancer_password = balancer_password self._balancer_uris = balancer_uris + self._controller_password = controller_password @property def roles(self) -> str: @@ -194,6 +196,22 @@ def controller_quorum_uris(self) -> str: or "" ) + @property + def controller_password(self) -> str: + """The controller user password in KRaft mode.""" + if self._controller_password: + return self._controller_password + + if not self.relation or not self.relation.app: + return "" + + return self.data_interface._fetch_relation_data_with_secrets( + component=self.relation.app, + req_secret_fields=["controller-password"], + relation=self.relation, + fields=["controller-password"], + ).get("controller-password", "") + @property def cluster_uuid(self) -> str: """The cluster uuid used to format storages in KRaft mode.""" @@ -412,6 +430,11 @@ def internal_user_credentials(self) -> dict[str, str]: return credentials + @property + def controller_password(self) -> str: + """The controller user password in KRaft mode.""" + return self.relation_data.get("controller-password", "") + @property def client_passwords(self) -> dict[str, str]: """Usernames and passwords of related client applications.""" diff --git a/src/events/broker.py b/src/events/broker.py index fe32df84..dcbbdea1 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -452,14 +452,24 @@ def _init_kraft_mode(self) -> None: self.charm.state.cluster.update({"cluster-uuid": uuid}) self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) - # Controller is tasked with populating quorum uris + # Controller is tasked with populating quorum uris and the `controller` user password if self.charm.state.runs_controller: quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris} self.charm.state.cluster.update(quorum_uris) + generated_password = self.charm.workload.generate_password() + if self.charm.state.peer_cluster_orchestrator: self.charm.state.peer_cluster_orchestrator.update(quorum_uris) + if not self.charm.state.peer_cluster_orchestrator.controller_password: + self.charm.state.peer_cluster_orchestrator.update( + {"controller-password": generated_password} + ) + elif not self.charm.state.peer_cluster.controller_password: + # single mode, controller & leader + self.charm.state.cluster.update({"controller-password": generated_password}) + def _format_storages(self) -> None: """Format storages provided relevant keys exist.""" if self.charm.state.runs_broker: diff --git a/src/events/peer_cluster.py b/src/events/peer_cluster.py index 1b9e066c..5be64bdd 100644 --- a/src/events/peer_cluster.py +++ b/src/events/peer_cluster.py @@ -114,6 +114,7 @@ def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None: "balancer-password": self.charm.state.peer_cluster.balancer_password, "balancer-uris": self.charm.state.peer_cluster.balancer_uris, "controller-quorum-uris": self.charm.state.peer_cluster.controller_quorum_uris, + "controller-password": self.charm.state.peer_cluster.controller_password, } ) diff --git a/src/literals.py b/src/literals.py index 13739792..120127d6 100644 --- a/src/literals.py +++ b/src/literals.py @@ -12,7 +12,7 @@ CHARM_KEY = "kafka" SNAP_NAME = "charmed-kafka" -CHARMED_KAFKA_SNAP_REVISION = 42 +CHARMED_KAFKA_SNAP_REVISION = 45 CONTAINER = "kafka" SUBSTRATE = "vm" STORAGE = "data" @@ -46,10 +46,13 @@ INTER_BROKER_USER = "sync" ADMIN_USER = "admin" +CONTROLLER_USER = "controller" INTERNAL_USERS = [INTER_BROKER_USER, ADMIN_USER] BALANCER_WEBSERVER_USER = "balancer" BALANCER_WEBSERVER_PORT = 9090 -SECRETS_APP = [f"{user}-password" for user in INTERNAL_USERS + [BALANCER_WEBSERVER_USER]] +SECRETS_APP = [ + f"{user}-password" for user in INTERNAL_USERS + [BALANCER_WEBSERVER_USER, CONTROLLER_USER] +] SECRETS_UNIT = [ "ca-cert", "csr", @@ -147,6 +150,7 @@ def __eq__(self, value: object, /) -> bool: "balancer-username", "balancer-password", "balancer-uris", + "controller-password", ], ) CONTROLLER = Role( @@ -157,6 +161,7 @@ def __eq__(self, value: object, /) -> bool: requested_secrets=[ "broker-username", "broker-password", + "controller-password", ], ) BALANCER = Role( @@ -168,6 +173,7 @@ def __eq__(self, value: object, /) -> bool: "broker-username", "broker-password", "broker-uris", + "controller-passwrod", "zk-username", "zk-password", "zk-uris", @@ -232,6 +238,9 @@ class Status(Enum): MISSING_MODE = StatusLevel(BlockedStatus("Application needs ZooKeeper or KRaft mode"), "DEBUG") NO_CLUSTER_UUID = StatusLevel(WaitingStatus("Waiting for cluster uuid"), "DEBUG") NO_QUORUM_URIS = StatusLevel(WaitingStatus("Waiting for quorum uris"), "DEBUG") + MISSING_CONTROLLER_PASSWORD = StatusLevel( + WaitingStatus("Waiting for controller user credentials"), "DEBUG" + ) ZK_NOT_RELATED = StatusLevel(BlockedStatus("missing required zookeeper relation"), "DEBUG") ZK_NOT_CONNECTED = StatusLevel(BlockedStatus("unit not connected to zookeeper"), "ERROR") ZK_TLS_MISMATCH = StatusLevel( diff --git a/src/managers/config.py b/src/managers/config.py index 63f86c2d..ceca9c25 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -26,6 +26,7 @@ BROKER, CONTROLLER_LISTENER_NAME, CONTROLLER_PORT, + CONTROLLER_USER, DEFAULT_BALANCER_GOALS, HARD_BALANCER_GOALS, INTER_BROKER_USER, @@ -412,6 +413,24 @@ def scram_properties(self) -> list[str]: return scram_properties + @property + def controller_scram_properties(self) -> list[str]: + """Builds the SCRAM properties for controller listener. + + Returns: + list of scram properties to be set + """ + password = self.state.peer_cluster.controller_password + listener_mechanism = self.internal_listener.mechanism.lower() + listener_name = CONTROLLER_LISTENER_NAME.lower() + + return [ + "sasl.enabled.mechanisms=SCRAM-SHA-512", + f"sasl.mechanism.controller.protocol={self.internal_listener.mechanism}", + f'listener.name.{listener_name}.{listener_mechanism}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{CONTROLLER_USER}" password="{password}" user_{CONTROLLER_USER}="{password}";', + f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}", + ] + @property def oauth_properties(self) -> list[str]: """Builds the properties for the oauth listener. @@ -636,8 +655,9 @@ def metrics_reporter_properties(self) -> list[str]: def authorizer_class(self) -> list[str]: """Return the authorizer Java class used on Kafka.""" if self.state.kraft_mode: - # return ["authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer"] - return [] + return [ + "authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer" + ] return ["authorizer.class.name=kafka.security.authorizer.AclAuthorizer"] @property @@ -663,6 +683,7 @@ def controller_properties(self) -> list[str]: f"node.id={node_id}", f"controller.quorum.voters={self.state.peer_cluster.controller_quorum_uris}", f"controller.listener.names={CONTROLLER_LISTENER_NAME}", + *self.controller_scram_properties, ] return properties @@ -684,16 +705,21 @@ def server_properties(self) -> list[str]: advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] if self.state.kraft_mode: - controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT" + controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:SASL_PLAINTEXT" controller_listener = f"{CONTROLLER_LISTENER_NAME}://0.0.0.0:{CONTROLLER_PORT}" # NOTE: Case where the controller is running standalone. Early return with a # smaller subset of config options if not self.state.runs_broker: properties = ( - [f"log.dirs={self.state.log_dirs}", f"listeners={controller_listener}"] + [ + f"super.users={self.state.super_users}", + f"log.dirs={self.state.log_dirs}", + f"listeners={controller_listener}", + f"listener.security.protocol.map={controller_protocol_map}", + ] + self.controller_properties - # + self.authorizer_class + + self.authorizer_class ) return properties diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py index 1269acbf..ac7438bc 100644 --- a/tests/integration/test_kraft.py +++ b/tests/integration/test_kraft.py @@ -16,11 +16,7 @@ SECURITY_PROTOCOL_PORTS, ) -from .helpers import ( - APP_NAME, - check_socket, - get_address, -) +from .helpers import APP_NAME, check_socket, create_test_topic, get_address logger = logging.getLogger(__name__) @@ -107,8 +103,8 @@ async def test_integrate(self, ops_test: OpsTest): apps=list({APP_NAME, self.controller_app}), idle_period=30 ) - async with ops_test.fast_forward(fast_interval="40s"): - await asyncio.sleep(120) + async with ops_test.fast_forward(fast_interval="20s"): + await asyncio.sleep(240) assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[self.controller_app].status == "active" @@ -130,3 +126,11 @@ async def test_listeners(self, ops_test: OpsTest): address = await get_address(ops_test=ops_test, app_name=self.controller_app) assert check_socket(address, CONTROLLER_PORT) + + @pytest.mark.abort_on_fail + async def test_authorizer(self, ops_test: OpsTest): + + address = await get_address(ops_test=ops_test) + port = SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal + + await create_test_topic(ops_test, f"{address}:{port}") From 1c6b435abdbba30f6a0ca328ce8f0f555d248767 Mon Sep 17 00:00:00 2001 From: Alex Batisse Date: Wed, 11 Dec 2024 10:08:23 +0100 Subject: [PATCH 2/2] [DPE-6138] Update zookeeper client lib (#282) --- lib/charms/zookeeper/v0/client.py | 85 +++++++++++++++++++++++-------- requirements.txt | 6 +-- src/core/models.py | 72 +++++++++++++++++--------- tests/integration/helpers.py | 6 +-- tests/integration/test_upgrade.py | 2 +- tests/unit/test_config.py | 6 +-- tox.ini | 1 + 7 files changed, 124 insertions(+), 54 deletions(-) diff --git a/lib/charms/zookeeper/v0/client.py b/lib/charms/zookeeper/v0/client.py index f65c6910..50a8fb8b 100644 --- a/lib/charms/zookeeper/v0/client.py +++ b/lib/charms/zookeeper/v0/client.py @@ -74,7 +74,7 @@ def update_cluster(new_members: List[str], event: EventBase) -> None: # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 6 +LIBPATCH = 8 logger = logging.getLogger(__name__) @@ -101,6 +101,12 @@ class QuorumLeaderNotFoundError(Exception): pass +class NoUnitFoundError(Exception): + """Generic exception for when there are no running zk unit in the app.""" + + pass + + class ZooKeeperManager: """Handler for performing ZK commands.""" @@ -114,6 +120,7 @@ def __init__( keyfile_path: Optional[str] = "", keyfile_password: Optional[str] = "", certfile_path: Optional[str] = "", + read_only: bool = True, ): self.hosts = hosts self.username = username @@ -123,12 +130,21 @@ def __init__( self.keyfile_path = keyfile_path self.keyfile_password = keyfile_password self.certfile_path = certfile_path - self.leader = "" + self.zk_host = "" + self.read_only = read_only - try: - self.leader = self.get_leader() - except RetryError: - raise QuorumLeaderNotFoundError("quorum leader not found") + if not read_only: + try: + self.zk_host = self.get_leader() + except RetryError: + raise QuorumLeaderNotFoundError("quorum leader not found") + + else: + try: + self.zk_host = self.get_any_unit() + + except RetryError: + raise NoUnitFoundError @retry( wait=wait_fixed(3), @@ -170,6 +186,35 @@ def get_leader(self) -> str: return leader or "" + @retry( + wait=wait_fixed(3), + stop=stop_after_attempt(2), + retry=retry_if_not_result(lambda result: True if result else False), + ) + def get_any_unit(self) -> str: + any_host = None + for host in self.hosts: + try: + with ZooKeeperClient( + host=host, + client_port=self.client_port, + username=self.username, + password=self.password, + use_ssl=self.use_ssl, + keyfile_path=self.keyfile_path, + keyfile_password=self.keyfile_password, + certfile_path=self.certfile_path, + ) as zk: + response = zk.srvr + if response: + any_host = host + break + except KazooTimeoutError: # in the case of having a dead unit in relation data + logger.debug(f"TIMEOUT - {host}") + continue + + return any_host or "" + @property def server_members(self) -> Set[str]: """The current members within the ZooKeeper quorum. @@ -179,7 +224,7 @@ def server_members(self) -> Set[str]: e.g {"server.1=10.141.78.207:2888:3888:participant;0.0.0.0:2181"} """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -200,7 +245,7 @@ def config_version(self) -> int: The zookeeper config version decoded from base16 """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -221,7 +266,7 @@ def members_syncing(self) -> bool: True if any members are syncing. Otherwise False. """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -305,7 +350,7 @@ def add_members(self, members: Iterable[str]) -> None: # specific connection to leader with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -330,7 +375,7 @@ def remove_members(self, members: Iterable[str]) -> None: for member in members: member_id = re.findall(r"server.([0-9]+)", member)[0] with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -356,7 +401,7 @@ def leader_znodes(self, path: str) -> Set[str]: Set of all nested child zNodes """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -369,7 +414,7 @@ def leader_znodes(self, path: str) -> Set[str]: return all_znode_children - def create_znode_leader(self, path: str, acls: List[ACL]) -> None: + def create_znode_leader(self, path: str, acls: List[ACL] | None = None) -> None: """Creates a new zNode on the current quorum leader with given ACLs. Args: @@ -377,7 +422,7 @@ def create_znode_leader(self, path: str, acls: List[ACL]) -> None: acls: the ACLs to be set on that path """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -388,7 +433,7 @@ def create_znode_leader(self, path: str, acls: List[ACL]) -> None: ) as zk: zk.create_znode(path=path, acls=acls) - def set_acls_znode_leader(self, path: str, acls: List[ACL]) -> None: + def set_acls_znode_leader(self, path: str, acls: List[ACL] | None = None) -> None: """Updates ACLs for an existing zNode on the current quorum leader. Args: @@ -396,7 +441,7 @@ def set_acls_znode_leader(self, path: str, acls: List[ACL]) -> None: acls: the new ACLs to be set on that path """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -414,7 +459,7 @@ def delete_znode_leader(self, path: str) -> None: path: the zNode path to delete """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -432,7 +477,7 @@ def get_version(self) -> str: String of ZooKeeper service version """ with ZooKeeperClient( - host=self.leader, + host=self.zk_host, client_port=self.client_port, username=self.username, password=self.password, @@ -577,7 +622,7 @@ def delete_znode(self, path: str) -> None: return self.client.delete(path, recursive=True) - def create_znode(self, path: str, acls: List[ACL]) -> None: + def create_znode(self, path: str, acls: List[ACL] | None = None) -> None: """Create new znode. Args: @@ -599,7 +644,7 @@ def get_acls(self, path: str) -> List[ACL]: return acl_list if acl_list else [] - def set_acls(self, path: str, acls: List[ACL]) -> None: + def set_acls(self, path: str, acls: List[ACL] | None = None) -> None: """Sets acls for a desired znode path. Args: diff --git a/requirements.txt b/requirements.txt index 21dfa336..94f6c94d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ cosl==0.0.24 ; python_version >= "3.8" and python_version < "4.0" cryptography==43.0.3 ; python_version >= "3.8" and python_version < "4.0" exceptiongroup==1.2.2 ; python_version >= "3.8" and python_version < "3.11" h11==0.14.0 ; python_version >= "3.8" and python_version < "4.0" -httpcore==1.0.6 ; python_version >= "3.8" and python_version < "4.0" +httpcore==1.0.7 ; python_version >= "3.8" and python_version < "4.0" httpx==0.27.2 ; python_version >= "3.8" and python_version < "4.0" idna==3.10 ; python_version >= "3.8" and python_version < "4.0" importlib-resources==6.4.5 ; python_version >= "3.8" and python_version < "3.9" @@ -20,11 +20,11 @@ ops==2.17.0 ; python_version >= "3.8" and python_version < "4.0" pkgutil-resolve-name==1.3.10 ; python_version >= "3.8" and python_version < "3.9" pure-sasl==0.6.2 ; python_version >= "3.8" and python_version < "4.0" pycparser==2.22 ; python_version >= "3.8" and python_version < "4.0" and platform_python_implementation != "PyPy" -pydantic==1.10.18 ; python_version >= "3.8" and python_version < "4.0" +pydantic==1.10.19 ; python_version >= "3.8" and python_version < "4.0" pyyaml==6.0.2 ; python_version >= "3.8" and python_version < "4.0" referencing==0.35.1 ; python_version >= "3.8" and python_version < "4.0" requests==2.32.3 ; python_version >= "3.8" and python_version < "4.0" -rpds-py==0.18.1 ; python_version >= "3.8" and python_version < "4.0" +rpds-py==0.20.1 ; python_version >= "3.8" and python_version < "4.0" sniffio==1.3.1 ; python_version >= "3.8" and python_version < "4.0" tenacity==9.0.0 ; python_version >= "3.8" and python_version < "4.0" typing-extensions==4.12.2 ; python_version >= "3.8" and python_version < "4.0" diff --git a/src/core/models.py b/src/core/models.py index 776d99d9..fc4cbd22 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -15,7 +15,11 @@ DataPeerData, DataPeerUnitData, ) -from charms.zookeeper.v0.client import QuorumLeaderNotFoundError, ZooKeeperManager +from charms.zookeeper.v0.client import ( + NoUnitFoundError, + QuorumLeaderNotFoundError, + ZooKeeperManager, +) from kazoo.client import AuthFailedError, ConnectionLoss, NoNodeError from kazoo.exceptions import NoAuthError from lightkube.resources.core_v1 import Node, Pod @@ -712,23 +716,6 @@ def chroot(self) -> str: or "" ) - @property - def uris(self) -> str: - """Comma separated connection string, containing endpoints + chroot.""" - if not self.relation: - return "" - - return ",".join( - sorted( # sorting as they may be disordered - ( - self.data_interface.fetch_relation_field( - relation_id=self.relation.id, field="uris" - ) - or "" - ).split(",") - ) - ) - @property def tls(self) -> bool: """Check if TLS is enabled on ZooKeeper.""" @@ -760,11 +747,45 @@ def zookeeper_connected(self) -> bool: return True + @property + def hosts(self) -> list[str]: + """Get the hosts from the databag.""" + return [host.split(":")[0] for host in self.endpoints.split(",")] + + @property + def uris(self): + """Comma separated connection string, containing endpoints + chroot.""" + return f"{self.endpoints.removesuffix('/')}/{self.database.removeprefix('/')}" + + @property + def port(self) -> int: + """Get the port in use from the databag. + + We can extract from: + - host1:port,host2:port + - host1,host2:port + """ + try: + port = next( + iter([int(host.split(":")[1]) for host in reversed(self.endpoints.split(","))]), + 2181, + ) + except IndexError: + # compatibility with older zk versions + port = 2181 + + return port + @property def zookeeper_version(self) -> str: """Get running zookeeper version.""" - hosts = self.endpoints.split(",") - zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password) + zk = ZooKeeperManager( + hosts=self.hosts, + client_port=self.port, + username=self.username, + password=self.password, + use_ssl=self.tls, + ) return zk.get_version() @@ -778,16 +799,21 @@ def zookeeper_version(self) -> str: def broker_active(self) -> bool: """Checks if broker id is recognised as active by ZooKeeper.""" broker_id = self.data_interface.local_unit.name.split("/")[1] - hosts = self.endpoints.split(",") path = f"{self.database}/brokers/ids/" - - zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password) try: + zk = ZooKeeperManager( + hosts=self.hosts, + client_port=self.port, + username=self.username, + password=self.password, + use_ssl=self.tls, + ) brokers = zk.leader_znodes(path=path) except ( NoNodeError, AuthFailedError, QuorumLeaderNotFoundError, + NoUnitFoundError, ConnectionLoss, NoAuthError, ) as e: diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 30e146e2..be28d641 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -9,7 +9,7 @@ from json.decoder import JSONDecodeError from pathlib import Path from subprocess import PIPE, CalledProcessError, check_output -from typing import Any, Dict, List, Optional, Set +from typing import Any, List, Optional, Set import yaml from charms.kafka.v0.client import KafkaClient @@ -463,7 +463,7 @@ def get_provider_data( return provider_relation_data | user_secret | tls_secret -def get_active_brokers(config: Dict) -> Set[str]: +def get_active_brokers(config: dict[str, str]) -> set[str]: """Gets all brokers currently connected to ZooKeeper. Args: @@ -473,9 +473,9 @@ def get_active_brokers(config: Dict) -> Set[str]: Set of active broker ids """ chroot = config.get("database", config.get("chroot", "")) - hosts = config.get("endpoints", "").split(",") username = config.get("username", "") password = config.get("password", "") + hosts = [host.split(":")[0] for host in config.get("endpoints", "").split(",")] zk = ZooKeeperManager(hosts=hosts, username=username, password=password) path = f"{chroot}/brokers/ids/" diff --git a/tests/integration/test_upgrade.py b/tests/integration/test_upgrade.py index 19b0777b..935f6bc2 100644 --- a/tests/integration/test_upgrade.py +++ b/tests/integration/test_upgrade.py @@ -30,7 +30,7 @@ async def test_in_place_upgrade(ops_test: OpsTest, kafka_charm, app_charm): await asyncio.gather( ops_test.model.deploy( ZK_NAME, - channel="3/edge", + channel=CHANNEL, application_name=ZK_NAME, num_units=1, ), diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index bb94d261..321330dd 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -392,8 +392,7 @@ def test_zookeeper_config_succeeds_fails_config(ctx: Context, base_state: State) "database": "/kafka", "chroot": "/kafka", "username": "moria", - "endpoints": "1.1.1.1,2.2.2.2", - "uris": "1.1.1.1:2181,2.2.2.2:2181/kafka", + "endpoints": "1.1.1.1:2181,2.2.2.2:2181", "tls": "disabled", }, ) @@ -418,8 +417,7 @@ def test_zookeeper_config_succeeds_valid_config(ctx: Context, base_state: State) "chroot": "/kafka", "username": "moria", "password": "mellon", - "endpoints": "1.1.1.1,2.2.2.2", - "uris": "1.1.1.1:2181/kafka,2.2.2.2:2181/kafka", + "endpoints": "1.1.1.1:2181,2.2.2.2:2181", "tls": "disabled", }, ) diff --git a/tox.ini b/tox.ini index b13e4c24..317b1f45 100644 --- a/tox.ini +++ b/tox.ini @@ -58,6 +58,7 @@ commands = --skip {tox_root}/.git \ --skip {tox_root}/.tox \ --skip {tox_root}/build \ + --skip {tox_root}/docs \ --skip {tox_root}/lib \ --skip {tox_root}/tests/integration/*/lib \ --skip {tox_root}/venv \