From cf274b74d04af1fb5ab7c9f02c26412e4b303447 Mon Sep 17 00:00:00 2001 From: Iman Enami Date: Fri, 13 Dec 2024 16:08:20 +0400 Subject: [PATCH] rebase with SCRAM auth changes --- src/events/broker.py | 37 ++++++++++--------- src/managers/config.py | 18 +++++++++- tests/integration/helpers.py | 2 +- tests/integration/test_kraft.py | 64 +++++++++++++++++++++++---------- 4 files changed, 84 insertions(+), 37 deletions(-) diff --git a/src/events/broker.py b/src/events/broker.py index b78aa8bf..f6623595 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -472,7 +472,7 @@ def _init_kraft_mode(self) -> None: elif not self.charm.state.peer_cluster.controller_password: # single mode, controller & leader self.charm.state.cluster.update({"controller-password": generated_password}) - + bootstrap_data = { "bootstrap-controller": self.charm.state.bootstrap_controller, "bootstrap-unit-id": str(self.charm.state.kraft_unit_id), @@ -498,13 +498,13 @@ def _format_storages(self) -> None: def _leader_elected(self, event: LeaderElectedEvent) -> None: if self.charm.state.runs_controller and self.charm.state.cluster.bootstrap_controller: # remove previous leader from dynamic quorum, if the unit is still available, it would eventually re-join during update_status - prev_leader_id = self.charm.state.peer_cluster_orchestrator.bootstrap_unit_id - prev_replica_id = self.charm.state.peer_cluster_orchestrator.bootstrap_replica_id - self._remove_controller( - int(prev_leader_id), - prev_replica_id, - bootstrap_node=self.charm.state.bootstrap_controller, - ) + # prev_leader_id = self.charm.state.peer_cluster_orchestrator.bootstrap_unit_id + # prev_replica_id = self.charm.state.peer_cluster_orchestrator.bootstrap_replica_id + # self._remove_controller( + # int(prev_leader_id), + # prev_replica_id, + # bootstrap_node=self.charm.state.bootstrap_controller, + # ) updated_bootstrap_data = { "bootstrap-controller": self.charm.state.bootstrap_controller, @@ -563,6 +563,8 @@ def _remove_controller( bin_args=[ "--bootstrap-controller", bootstrap_node, + "--command-config", + self.workload.paths.server_properties, "remove-controller", "--controller-id", str(controller_id), @@ -571,16 +573,17 @@ def _remove_controller( ], ) - def remove_follower_unit(self) -> None: - """Removes current unit from the dynamic quorum in KRaft mode if this is a follower unit.""" - if ( - self.charm.state.runs_controller - and not self.charm.unit.is_leader() - and self.charm.state.unit_broker.added_to_quorum + def remove_from_quorum(self) -> None: + """Removes current unit from the dynamic quorum in KRaft mode.""" + if self.charm.state.runs_controller and ( + self.charm.state.unit_broker.added_to_quorum or self.charm.unit.is_leader() ): - self._remove_controller( - self.charm.state.kraft_unit_id, self.charm.state.unit_broker.directory_id + directory_id = ( + self.charm.state.unit_broker.directory_id + if not self.charm.unit.is_leader() + else self.charm.state.cluster.bootstrap_replica_id ) + self._remove_controller(self.charm.state.kraft_unit_id, directory_id) def update_external_services(self) -> None: """Attempts to update any external Kubernetes services.""" @@ -646,4 +649,4 @@ def update_peer_cluster_data(self) -> None: def _on_remove(self, _) -> None: """Handler for stop.""" - self.remove_follower_unit() + self.remove_from_quorum() diff --git a/src/managers/config.py b/src/managers/config.py index 2493fb5c..2c9b3ba2 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -431,6 +431,21 @@ def controller_scram_properties(self) -> list[str]: f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}", ] + @property + def controller_kraft_client_properties(self) -> list[str]: + """Builds the SCRAM properties for controller' KRaft client to be able to communicate with quorum manager. + + Returns: + list of KRaft client properties to be set + """ + password = self.state.peer_cluster.controller_password + + return [ + f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{CONTROLLER_USER}" password="{password}";', + f"sasl.mechanism={self.internal_listener.mechanism}", + "security.protocol=SASL_PLAINTEXT", + ] + @property def oauth_properties(self) -> list[str]: """Builds the properties for the oauth listener. @@ -685,6 +700,7 @@ def controller_properties(self) -> list[str]: f"controller.quorum.bootstrap.servers={self.state.peer_cluster.bootstrap_controller}", f"controller.listener.names={CONTROLLER_LISTENER_NAME}", *self.controller_scram_properties, + *self.controller_kraft_client_properties, ] return properties @@ -706,7 +722,7 @@ def server_properties(self) -> list[str]: advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] if self.state.kraft_mode: - controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT" + controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:SASL_PLAINTEXT" controller_listener = f"{CONTROLLER_LISTENER_NAME}://{self.state.unit_broker.internal_address}:{CONTROLLER_PORT}" # NOTE: Case where the controller is running standalone. Early return with a diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index e8993d47..549216f1 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -624,7 +624,7 @@ def kraft_quorum_status( ) -> dict[int, KRaftUnitStatus]: """Returns a dict mapping of unit ID to KRaft unit status based on `kafka-metadata-quorum.sh` utility's output.""" result = check_output( - f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.metadata-quorum --bootstrap-controller {bootstrap_controller} describe --replication'", + f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.metadata-quorum --command-config {PATHS['kafka']['CONF']}/server.properties --bootstrap-controller {bootstrap_controller} describe --replication'", stderr=PIPE, shell=True, universal_newlines=True, diff --git a/tests/integration/test_kraft.py b/tests/integration/test_kraft.py index 3ca2d1c6..ca4f8259 100644 --- a/tests/integration/test_kraft.py +++ b/tests/integration/test_kraft.py @@ -18,7 +18,7 @@ KRaftUnitStatus, ) -from .helpers import APP_NAME, check_socket, get_address, kraft_quorum_status, create_test_topic +from .helpers import APP_NAME, check_socket, create_test_topic, get_address, kraft_quorum_status logger = logging.getLogger(__name__) @@ -33,6 +33,25 @@ class TestKRaft: deployment_strat: str = os.environ.get("DEPLOYMENT", "multi") controller_app: str = {"single": APP_NAME, "multi": CONTROLLER_APP}[deployment_strat] + async def _assert_listeners_accessible(self, ops_test: OpsTest, unit_num=0): + address = await get_address(ops_test=ops_test, app_name=APP_NAME, unit_num=0) + assert check_socket( + address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal + ) # Internal listener + + # Client listener should not be enabled if there is no relations + assert not check_socket( + address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].client + ) + + # Check controller socket + if self.controller_app != APP_NAME: + address = await get_address( + ops_test=ops_test, app_name=self.controller_app, unit_num=unit_num + ) + + assert check_socket(address, CONTROLLER_PORT) + @pytest.mark.abort_on_fail async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm): await ops_test.model.add_machine(series="jammy") @@ -113,21 +132,7 @@ async def test_integrate(self, ops_test: OpsTest): @pytest.mark.abort_on_fail async def test_listeners(self, ops_test: OpsTest): - address = await get_address(ops_test=ops_test) - assert check_socket( - address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal - ) # Internal listener - - # Client listener should not be enabled if there is no relations - assert not check_socket( - address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].client - ) - - # Check controller socket - if self.controller_app != APP_NAME: - address = await get_address(ops_test=ops_test, app_name=self.controller_app) - - assert check_socket(address, CONTROLLER_PORT) + await self._assert_listeners_accessible(ops_test, unit_num=0) @pytest.mark.abort_on_fail async def test_authorizer(self, ops_test: OpsTest): @@ -136,9 +141,9 @@ async def test_authorizer(self, ops_test: OpsTest): port = SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal await create_test_topic(ops_test, f"{address}:{port}") - + @pytest.mark.abort_on_fail - async def test_scaling(self, ops_test: OpsTest): + async def test_scale_out(self, ops_test: OpsTest): await ops_test.model.applications[self.controller_app].add_units(count=2) await ops_test.model.wait_for_idle( apps=list({APP_NAME, self.controller_app}), @@ -212,3 +217,26 @@ async def test_leader_change(self, ops_test: OpsTest): ) assert (offset + 3) in unit_status assert unit_status[offset + 3] == KRaftUnitStatus.FOLLOWER + + @pytest.mark.abort_on_fail + async def test_scale_in(self, ops_test: OpsTest): + await ops_test.model.applications[self.controller_app].destroy_units( + *(f"{self.controller_app}/{unit_id}" for unit_id in (1, 2)) + ) + await ops_test.model.wait_for_idle( + apps=list({APP_NAME, self.controller_app}), + status="active", + timeout=600, + idle_period=20, + ) + + address = await get_address(ops_test=ops_test, app_name=self.controller_app, unit_num=3) + bootstrap_controller = f"{address}:{CONTROLLER_PORT}" + offset = KRAFT_NODE_ID_OFFSET if self.controller_app == APP_NAME else 0 + + unit_status = kraft_quorum_status( + ops_test, f"{self.controller_app}/3", bootstrap_controller + ) + + assert unit_status[offset + 3] == KRaftUnitStatus.LEADER + await self._assert_listeners_accessible(ops_test, unit_num=3)