From b5d3c3b0cb1392140b89fe3aad52d8996b1a8b03 Mon Sep 17 00:00:00 2001 From: Iman Enami <44609233+imanenami@users.noreply.github.com> Date: Tue, 10 Dec 2024 13:52:57 +0400 Subject: [PATCH] [DPE-5349] feat: Add internal user and SASL/SCRAM authentication (#284) Co-authored-by: Iman Enami --- src/core/cluster.py | 9 +++++++++ src/core/models.py | 23 +++++++++++++++++++++ src/events/broker.py | 12 ++++++++++- src/events/peer_cluster.py | 1 + src/literals.py | 13 ++++++++++-- src/managers/config.py | 36 ++++++++++++++++++++++++++++----- tests/integration/test_kraft.py | 18 ++++++++++------- 7 files changed, 97 insertions(+), 15 deletions(-) diff --git a/src/core/cluster.py b/src/core/cluster.py index 0c067a2c..b132f00e 100644 --- a/src/core/cluster.py +++ b/src/core/cluster.py @@ -40,6 +40,7 @@ BROKER, CONTROLLER, CONTROLLER_PORT, + CONTROLLER_USER, INTERNAL_USERS, KRAFT_NODE_ID_OFFSET, MIN_REPLICAS, @@ -65,10 +66,12 @@ setattr(custom_secret_groups, "BROKER", "broker") setattr(custom_secret_groups, "BALANCER", "balancer") setattr(custom_secret_groups, "ZOOKEEPER", "zookeeper") +setattr(custom_secret_groups, "CONTROLLER", "controller") SECRET_LABEL_MAP = { "broker-username": getattr(custom_secret_groups, "BROKER"), "broker-password": getattr(custom_secret_groups, "BROKER"), + "controller-password": getattr(custom_secret_groups, "CONTROLLER"), "broker-uris": getattr(custom_secret_groups, "BROKER"), "zk-username": getattr(custom_secret_groups, "ZOOKEEPER"), "zk-password": getattr(custom_secret_groups, "ZOOKEEPER"), @@ -157,6 +160,7 @@ def peer_cluster_orchestrator(self) -> PeerCluster: extra_kwargs.update( { "controller_quorum_uris": self.cluster.controller_quorum_uris, + "controller_password": self.cluster.controller_password, } ) @@ -178,6 +182,7 @@ def peer_cluster(self) -> PeerCluster: "balancer_password": self.cluster.balancer_password, "balancer_uris": self.cluster.balancer_uris, "controller_quorum_uris": self.cluster.controller_quorum_uris, + "controller_password": self.cluster.controller_password, } ) @@ -323,6 +328,10 @@ def super_users(self) -> str: Semicolon delimited string of current super users """ super_users = set(INTERNAL_USERS) + + if self.kraft_mode: + super_users.add(CONTROLLER_USER) + for relation in self.client_relations: if not relation or not relation.app: continue diff --git a/src/core/models.py b/src/core/models.py index bd6eacf1..776d99d9 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -103,6 +103,7 @@ def __init__( balancer_username: str = "", balancer_password: str = "", balancer_uris: str = "", + controller_password: str = "", ): super().__init__(relation, data_interface, None, None) self._broker_username = broker_username @@ -118,6 +119,7 @@ def __init__( self._balancer_username = balancer_username self._balancer_password = balancer_password self._balancer_uris = balancer_uris + self._controller_password = controller_password @property def roles(self) -> str: @@ -194,6 +196,22 @@ def controller_quorum_uris(self) -> str: or "" ) + @property + def controller_password(self) -> str: + """The controller user password in KRaft mode.""" + if self._controller_password: + return self._controller_password + + if not self.relation or not self.relation.app: + return "" + + return self.data_interface._fetch_relation_data_with_secrets( + component=self.relation.app, + req_secret_fields=["controller-password"], + relation=self.relation, + fields=["controller-password"], + ).get("controller-password", "") + @property def cluster_uuid(self) -> str: """The cluster uuid used to format storages in KRaft mode.""" @@ -412,6 +430,11 @@ def internal_user_credentials(self) -> dict[str, str]: return credentials + @property + def controller_password(self) -> str: + """The controller user password in KRaft mode.""" + return self.relation_data.get("controller-password", "") + @property def client_passwords(self) -> dict[str, str]: """Usernames and passwords of related client applications.""" diff --git a/src/events/broker.py b/src/events/broker.py index fe32df84..dcbbdea1 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -452,14 +452,24 @@ def _init_kraft_mode(self) -> None: self.charm.state.cluster.update({"cluster-uuid": uuid}) self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) - # Controller is tasked with populating quorum uris + # Controller is tasked with populating quorum uris and the `controller` user password if self.charm.state.runs_controller: quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris} self.charm.state.cluster.update(quorum_uris) + generated_password = self.charm.workload.generate_password() + if self.charm.state.peer_cluster_orchestrator: self.charm.state.peer_cluster_orchestrator.update(quorum_uris) + if not self.charm.state.peer_cluster_orchestrator.controller_password: + self.charm.state.peer_cluster_orchestrator.update( + {"controller-password": generated_password} + ) + elif not self.charm.state.peer_cluster.controller_password: + # single mode, controller & leader + self.charm.state.cluster.update({"controller-password": generated_password}) + def _format_storages(self) -> None: """Format storages provided relevant keys exist.""" if self.charm.state.runs_broker: diff --git a/src/events/peer_cluster.py b/src/events/peer_cluster.py index 1b9e066c..5be64bdd 100644 --- a/src/events/peer_cluster.py +++ b/src/events/peer_cluster.py @@ -114,6 +114,7 @@ def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None: "balancer-password": self.charm.state.peer_cluster.balancer_password, "balancer-uris": self.charm.state.peer_cluster.balancer_uris, "controller-quorum-uris": self.charm.state.peer_cluster.controller_quorum_uris, + "controller-password": self.charm.state.peer_cluster.controller_password, } ) diff --git a/src/literals.py b/src/literals.py index 13739792..120127d6 100644 --- a/src/literals.py +++ b/src/literals.py @@ -12,7 +12,7 @@ CHARM_KEY = "kafka" SNAP_NAME = "charmed-kafka" -CHARMED_KAFKA_SNAP_REVISION = 42 +CHARMED_KAFKA_SNAP_REVISION = 45 CONTAINER = "kafka" SUBSTRATE = "vm" STORAGE = "data" @@ -46,10 +46,13 @@ INTER_BROKER_USER = "sync" ADMIN_USER = "admin" +CONTROLLER_USER = "controller" INTERNAL_USERS = [INTER_BROKER_USER, ADMIN_USER] BALANCER_WEBSERVER_USER = "balancer" BALANCER_WEBSERVER_PORT = 9090 -SECRETS_APP = [f"{user}-password" for user in INTERNAL_USERS + [BALANCER_WEBSERVER_USER]] +SECRETS_APP = [ + f"{user}-password" for user in INTERNAL_USERS + [BALANCER_WEBSERVER_USER, CONTROLLER_USER] +] SECRETS_UNIT = [ "ca-cert", "csr", @@ -147,6 +150,7 @@ def __eq__(self, value: object, /) -> bool: "balancer-username", "balancer-password", "balancer-uris", + "controller-password", ], ) CONTROLLER = Role( @@ -157,6 +161,7 @@ def __eq__(self, value: object, /) -> bool: requested_secrets=[ "broker-username", "broker-password", + "controller-password", ], ) BALANCER = Role( @@ -168,6 +173,7 @@ def __eq__(self, value: object, /) -> bool: "broker-username", "broker-password", "broker-uris", + "controller-passwrod", "zk-username", "zk-password", "zk-uris", @@ -232,6 +238,9 @@ class Status(Enum): MISSING_MODE = StatusLevel(BlockedStatus("Application needs ZooKeeper or KRaft mode"), "DEBUG") NO_CLUSTER_UUID = StatusLevel(WaitingStatus("Waiting for cluster uuid"), "DEBUG") NO_QUORUM_URIS = StatusLevel(WaitingStatus("Waiting for quorum uris"), "DEBUG") + MISSING_CONTROLLER_PASSWORD = StatusLevel( + WaitingStatus("Waiting for controller user credentials"), "DEBUG" + ) ZK_NOT_RELATED = StatusLevel(BlockedStatus("missing required zookeeper relation"), "DEBUG") ZK_NOT_CONNECTED = StatusLevel(BlockedStatus("unit not connected to zookeeper"), "ERROR") ZK_TLS_MISMATCH = StatusLevel( diff --git a/src/managers/config.py b/src/managers/config.py index 63f86c2d..ceca9c25 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -26,6 +26,7 @@ BROKER, CONTROLLER_LISTENER_NAME, CONTROLLER_PORT, + CONTROLLER_USER, DEFAULT_BALANCER_GOALS, HARD_BALANCER_GOALS, INTER_BROKER_USER, @@ -412,6 +413,24 @@ def scram_properties(self) -> list[str]: return scram_properties + @property + def controller_scram_properties(self) -> list[str]: + """Builds the SCRAM properties for controller listener. + + Returns: + list of scram properties to be set + """ + password = self.state.peer_cluster.controller_password + listener_mechanism = self.internal_listener.mechanism.lower() + listener_name = CONTROLLER_LISTENER_NAME.lower() + + return [ + "sasl.enabled.mechanisms=SCRAM-SHA-512", + f"sasl.mechanism.controller.protocol={self.internal_listener.mechanism}", + f'listener.name.{listener_name}.{listener_mechanism}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{CONTROLLER_USER}" password="{password}" user_{CONTROLLER_USER}="{password}";', + f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}", + ] + @property def oauth_properties(self) -> list[str]: """Builds the properties for the oauth listener. @@ -636,8 +655,9 @@ def metrics_reporter_properties(self) -> list[str]: def authorizer_class(self) -> list[str]: """Return the authorizer Java class used on Kafka.""" if self.state.kraft_mode: - # return ["authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer"] - return [] + return [ + "authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer" + ] return ["authorizer.class.name=kafka.security.authorizer.AclAuthorizer"] @property @@ -663,6 +683,7 @@ def controller_properties(self) -> list[str]: f"node.id={node_id}", f"controller.quorum.voters={self.state.peer_cluster.controller_quorum_uris}", f"controller.listener.names={CONTROLLER_LISTENER_NAME}", + *self.controller_scram_properties, ] return properties @@ -684,16 +705,21 @@ def server_properties(self) -> list[str]: advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] if self.state.kraft_mode: - controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT" + controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:SASL_PLAINTEXT" controller_listener = f"{CONTROLLER_LISTENER_NAME}://0.0.0.0:{CONTROLLER_PORT}" # NOTE: Case where the controller is running standalone. Early return with a # smaller subset of config options if not self.state.runs_broker: properties = ( - [f"log.dirs={self.state.log_dirs}", f"listeners={controller_listener}"] + [ + f"super.users={self.state.super_users}", + f"log.dirs={self.state.log_dirs}", + f"listeners={controller_listener}", + f"listener.security.protocol.map={controller_protocol_map}", + ] + self.controller_properties - # + self.authorizer_class + + self.authorizer_class ) return properties diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py index 1269acbf..ac7438bc 100644 --- a/tests/integration/test_kraft.py +++ b/tests/integration/test_kraft.py @@ -16,11 +16,7 @@ SECURITY_PROTOCOL_PORTS, ) -from .helpers import ( - APP_NAME, - check_socket, - get_address, -) +from .helpers import APP_NAME, check_socket, create_test_topic, get_address logger = logging.getLogger(__name__) @@ -107,8 +103,8 @@ async def test_integrate(self, ops_test: OpsTest): apps=list({APP_NAME, self.controller_app}), idle_period=30 ) - async with ops_test.fast_forward(fast_interval="40s"): - await asyncio.sleep(120) + async with ops_test.fast_forward(fast_interval="20s"): + await asyncio.sleep(240) assert ops_test.model.applications[APP_NAME].status == "active" assert ops_test.model.applications[self.controller_app].status == "active" @@ -130,3 +126,11 @@ async def test_listeners(self, ops_test: OpsTest): address = await get_address(ops_test=ops_test, app_name=self.controller_app) assert check_socket(address, CONTROLLER_PORT) + + @pytest.mark.abort_on_fail + async def test_authorizer(self, ops_test: OpsTest): + + address = await get_address(ops_test=ops_test) + port = SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal + + await create_test_topic(ops_test, f"{address}:{port}")