Skip to content

Commit

Permalink
rebase with SCRAM auth changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Iman Enami committed Dec 13, 2024
1 parent 9631796 commit cf274b7
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 37 deletions.
37 changes: 20 additions & 17 deletions src/events/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def _init_kraft_mode(self) -> None:
elif not self.charm.state.peer_cluster.controller_password:
# single mode, controller & leader
self.charm.state.cluster.update({"controller-password": generated_password})

bootstrap_data = {
"bootstrap-controller": self.charm.state.bootstrap_controller,
"bootstrap-unit-id": str(self.charm.state.kraft_unit_id),
Expand All @@ -498,13 +498,13 @@ def _format_storages(self) -> None:
def _leader_elected(self, event: LeaderElectedEvent) -> None:
if self.charm.state.runs_controller and self.charm.state.cluster.bootstrap_controller:
# remove previous leader from dynamic quorum, if the unit is still available, it would eventually re-join during update_status
prev_leader_id = self.charm.state.peer_cluster_orchestrator.bootstrap_unit_id
prev_replica_id = self.charm.state.peer_cluster_orchestrator.bootstrap_replica_id
self._remove_controller(
int(prev_leader_id),
prev_replica_id,
bootstrap_node=self.charm.state.bootstrap_controller,
)
# prev_leader_id = self.charm.state.peer_cluster_orchestrator.bootstrap_unit_id
# prev_replica_id = self.charm.state.peer_cluster_orchestrator.bootstrap_replica_id
# self._remove_controller(
# int(prev_leader_id),
# prev_replica_id,
# bootstrap_node=self.charm.state.bootstrap_controller,
# )

updated_bootstrap_data = {
"bootstrap-controller": self.charm.state.bootstrap_controller,
Expand Down Expand Up @@ -563,6 +563,8 @@ def _remove_controller(
bin_args=[
"--bootstrap-controller",
bootstrap_node,
"--command-config",
self.workload.paths.server_properties,
"remove-controller",
"--controller-id",
str(controller_id),
Expand All @@ -571,16 +573,17 @@ def _remove_controller(
],
)

def remove_follower_unit(self) -> None:
"""Removes current unit from the dynamic quorum in KRaft mode if this is a follower unit."""
if (
self.charm.state.runs_controller
and not self.charm.unit.is_leader()
and self.charm.state.unit_broker.added_to_quorum
def remove_from_quorum(self) -> None:
"""Removes current unit from the dynamic quorum in KRaft mode."""
if self.charm.state.runs_controller and (
self.charm.state.unit_broker.added_to_quorum or self.charm.unit.is_leader()
):
self._remove_controller(
self.charm.state.kraft_unit_id, self.charm.state.unit_broker.directory_id
directory_id = (
self.charm.state.unit_broker.directory_id
if not self.charm.unit.is_leader()
else self.charm.state.cluster.bootstrap_replica_id
)
self._remove_controller(self.charm.state.kraft_unit_id, directory_id)

def update_external_services(self) -> None:
"""Attempts to update any external Kubernetes services."""
Expand Down Expand Up @@ -646,4 +649,4 @@ def update_peer_cluster_data(self) -> None:

def _on_remove(self, _) -> None:
"""Handler for stop."""
self.remove_follower_unit()
self.remove_from_quorum()
18 changes: 17 additions & 1 deletion src/managers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,21 @@ def controller_scram_properties(self) -> list[str]:
f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}",
]

@property
def controller_kraft_client_properties(self) -> list[str]:
"""Builds the SCRAM properties for controller' KRaft client to be able to communicate with quorum manager.
Returns:
list of KRaft client properties to be set
"""
password = self.state.peer_cluster.controller_password

return [
f'sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{CONTROLLER_USER}" password="{password}";',
f"sasl.mechanism={self.internal_listener.mechanism}",
"security.protocol=SASL_PLAINTEXT",
]

@property
def oauth_properties(self) -> list[str]:
"""Builds the properties for the oauth listener.
Expand Down Expand Up @@ -685,6 +700,7 @@ def controller_properties(self) -> list[str]:
f"controller.quorum.bootstrap.servers={self.state.peer_cluster.bootstrap_controller}",
f"controller.listener.names={CONTROLLER_LISTENER_NAME}",
*self.controller_scram_properties,
*self.controller_kraft_client_properties,
]

return properties
Expand All @@ -706,7 +722,7 @@ def server_properties(self) -> list[str]:
advertised_listeners = [listener.advertised_listener for listener in self.all_listeners]

if self.state.kraft_mode:
controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT"
controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:SASL_PLAINTEXT"
controller_listener = f"{CONTROLLER_LISTENER_NAME}://{self.state.unit_broker.internal_address}:{CONTROLLER_PORT}"

# NOTE: Case where the controller is running standalone. Early return with a
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ def kraft_quorum_status(
) -> dict[int, KRaftUnitStatus]:
"""Returns a dict mapping of unit ID to KRaft unit status based on `kafka-metadata-quorum.sh` utility's output."""
result = check_output(
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.metadata-quorum --bootstrap-controller {bootstrap_controller} describe --replication'",
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {unit_name} sudo -i 'charmed-kafka.metadata-quorum --command-config {PATHS['kafka']['CONF']}/server.properties --bootstrap-controller {bootstrap_controller} describe --replication'",
stderr=PIPE,
shell=True,
universal_newlines=True,
Expand Down
64 changes: 46 additions & 18 deletions tests/integration/test_kraft.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
KRaftUnitStatus,
)

from .helpers import APP_NAME, check_socket, get_address, kraft_quorum_status, create_test_topic
from .helpers import APP_NAME, check_socket, create_test_topic, get_address, kraft_quorum_status

logger = logging.getLogger(__name__)

Expand All @@ -33,6 +33,25 @@ class TestKRaft:
deployment_strat: str = os.environ.get("DEPLOYMENT", "multi")
controller_app: str = {"single": APP_NAME, "multi": CONTROLLER_APP}[deployment_strat]

async def _assert_listeners_accessible(self, ops_test: OpsTest, unit_num=0):
address = await get_address(ops_test=ops_test, app_name=APP_NAME, unit_num=0)
assert check_socket(
address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal
) # Internal listener

# Client listener should not be enabled if there is no relations
assert not check_socket(
address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].client
)

# Check controller socket
if self.controller_app != APP_NAME:
address = await get_address(
ops_test=ops_test, app_name=self.controller_app, unit_num=unit_num
)

assert check_socket(address, CONTROLLER_PORT)

@pytest.mark.abort_on_fail
async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm):
await ops_test.model.add_machine(series="jammy")
Expand Down Expand Up @@ -113,21 +132,7 @@ async def test_integrate(self, ops_test: OpsTest):

@pytest.mark.abort_on_fail
async def test_listeners(self, ops_test: OpsTest):
address = await get_address(ops_test=ops_test)
assert check_socket(
address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal
) # Internal listener

# Client listener should not be enabled if there is no relations
assert not check_socket(
address, SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].client
)

# Check controller socket
if self.controller_app != APP_NAME:
address = await get_address(ops_test=ops_test, app_name=self.controller_app)

assert check_socket(address, CONTROLLER_PORT)
await self._assert_listeners_accessible(ops_test, unit_num=0)

@pytest.mark.abort_on_fail
async def test_authorizer(self, ops_test: OpsTest):
Expand All @@ -136,9 +141,9 @@ async def test_authorizer(self, ops_test: OpsTest):
port = SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal

await create_test_topic(ops_test, f"{address}:{port}")

@pytest.mark.abort_on_fail
async def test_scaling(self, ops_test: OpsTest):
async def test_scale_out(self, ops_test: OpsTest):
await ops_test.model.applications[self.controller_app].add_units(count=2)
await ops_test.model.wait_for_idle(
apps=list({APP_NAME, self.controller_app}),
Expand Down Expand Up @@ -212,3 +217,26 @@ async def test_leader_change(self, ops_test: OpsTest):
)
assert (offset + 3) in unit_status
assert unit_status[offset + 3] == KRaftUnitStatus.FOLLOWER

@pytest.mark.abort_on_fail
async def test_scale_in(self, ops_test: OpsTest):
await ops_test.model.applications[self.controller_app].destroy_units(
*(f"{self.controller_app}/{unit_id}" for unit_id in (1, 2))
)
await ops_test.model.wait_for_idle(
apps=list({APP_NAME, self.controller_app}),
status="active",
timeout=600,
idle_period=20,
)

address = await get_address(ops_test=ops_test, app_name=self.controller_app, unit_num=3)
bootstrap_controller = f"{address}:{CONTROLLER_PORT}"
offset = KRAFT_NODE_ID_OFFSET if self.controller_app == APP_NAME else 0

unit_status = kraft_quorum_status(
ops_test, f"{self.controller_app}/3", bootstrap_controller
)

assert unit_status[offset + 3] == KRaftUnitStatus.LEADER
await self._assert_listeners_accessible(ops_test, unit_num=3)

0 comments on commit cf274b7

Please sign in to comment.