Skip to content

Commit

Permalink
[DPE-5349] feat: Add internal user and SASL/SCRAM authentication (#284)
Browse files Browse the repository at this point in the history
Co-authored-by: Iman Enami <[email protected]>
  • Loading branch information
imanenami and Iman Enami authored Dec 10, 2024
1 parent 8287939 commit b5d3c3b
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 15 deletions.
9 changes: 9 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
BROKER,
CONTROLLER,
CONTROLLER_PORT,
CONTROLLER_USER,
INTERNAL_USERS,
KRAFT_NODE_ID_OFFSET,
MIN_REPLICAS,
Expand All @@ -65,10 +66,12 @@
setattr(custom_secret_groups, "BROKER", "broker")
setattr(custom_secret_groups, "BALANCER", "balancer")
setattr(custom_secret_groups, "ZOOKEEPER", "zookeeper")
setattr(custom_secret_groups, "CONTROLLER", "controller")

SECRET_LABEL_MAP = {
"broker-username": getattr(custom_secret_groups, "BROKER"),
"broker-password": getattr(custom_secret_groups, "BROKER"),
"controller-password": getattr(custom_secret_groups, "CONTROLLER"),
"broker-uris": getattr(custom_secret_groups, "BROKER"),
"zk-username": getattr(custom_secret_groups, "ZOOKEEPER"),
"zk-password": getattr(custom_secret_groups, "ZOOKEEPER"),
Expand Down Expand Up @@ -157,6 +160,7 @@ def peer_cluster_orchestrator(self) -> PeerCluster:
extra_kwargs.update(
{
"controller_quorum_uris": self.cluster.controller_quorum_uris,
"controller_password": self.cluster.controller_password,
}
)

Expand All @@ -178,6 +182,7 @@ def peer_cluster(self) -> PeerCluster:
"balancer_password": self.cluster.balancer_password,
"balancer_uris": self.cluster.balancer_uris,
"controller_quorum_uris": self.cluster.controller_quorum_uris,
"controller_password": self.cluster.controller_password,
}
)

Expand Down Expand Up @@ -323,6 +328,10 @@ def super_users(self) -> str:
Semicolon delimited string of current super users
"""
super_users = set(INTERNAL_USERS)

if self.kraft_mode:
super_users.add(CONTROLLER_USER)

for relation in self.client_relations:
if not relation or not relation.app:
continue
Expand Down
23 changes: 23 additions & 0 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(
balancer_username: str = "",
balancer_password: str = "",
balancer_uris: str = "",
controller_password: str = "",
):
super().__init__(relation, data_interface, None, None)
self._broker_username = broker_username
Expand All @@ -118,6 +119,7 @@ def __init__(
self._balancer_username = balancer_username
self._balancer_password = balancer_password
self._balancer_uris = balancer_uris
self._controller_password = controller_password

@property
def roles(self) -> str:
Expand Down Expand Up @@ -194,6 +196,22 @@ def controller_quorum_uris(self) -> str:
or ""
)

@property
def controller_password(self) -> str:
"""The controller user password in KRaft mode."""
if self._controller_password:
return self._controller_password

if not self.relation or not self.relation.app:
return ""

return self.data_interface._fetch_relation_data_with_secrets(
component=self.relation.app,
req_secret_fields=["controller-password"],
relation=self.relation,
fields=["controller-password"],
).get("controller-password", "")

@property
def cluster_uuid(self) -> str:
"""The cluster uuid used to format storages in KRaft mode."""
Expand Down Expand Up @@ -412,6 +430,11 @@ def internal_user_credentials(self) -> dict[str, str]:

return credentials

@property
def controller_password(self) -> str:
"""The controller user password in KRaft mode."""
return self.relation_data.get("controller-password", "")

@property
def client_passwords(self) -> dict[str, str]:
"""Usernames and passwords of related client applications."""
Expand Down
12 changes: 11 additions & 1 deletion src/events/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,24 @@ def _init_kraft_mode(self) -> None:
self.charm.state.cluster.update({"cluster-uuid": uuid})
self.charm.state.peer_cluster.update({"cluster-uuid": uuid})

# Controller is tasked with populating quorum uris
# Controller is tasked with populating quorum uris and the `controller` user password
if self.charm.state.runs_controller:
quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris}
self.charm.state.cluster.update(quorum_uris)

generated_password = self.charm.workload.generate_password()

if self.charm.state.peer_cluster_orchestrator:
self.charm.state.peer_cluster_orchestrator.update(quorum_uris)

if not self.charm.state.peer_cluster_orchestrator.controller_password:
self.charm.state.peer_cluster_orchestrator.update(
{"controller-password": generated_password}
)
elif not self.charm.state.peer_cluster.controller_password:
# single mode, controller & leader
self.charm.state.cluster.update({"controller-password": generated_password})

def _format_storages(self) -> None:
"""Format storages provided relevant keys exist."""
if self.charm.state.runs_broker:
Expand Down
1 change: 1 addition & 0 deletions src/events/peer_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None:
"balancer-password": self.charm.state.peer_cluster.balancer_password,
"balancer-uris": self.charm.state.peer_cluster.balancer_uris,
"controller-quorum-uris": self.charm.state.peer_cluster.controller_quorum_uris,
"controller-password": self.charm.state.peer_cluster.controller_password,
}
)

Expand Down
13 changes: 11 additions & 2 deletions src/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

CHARM_KEY = "kafka"
SNAP_NAME = "charmed-kafka"
CHARMED_KAFKA_SNAP_REVISION = 42
CHARMED_KAFKA_SNAP_REVISION = 45
CONTAINER = "kafka"
SUBSTRATE = "vm"
STORAGE = "data"
Expand Down Expand Up @@ -46,10 +46,13 @@

INTER_BROKER_USER = "sync"
ADMIN_USER = "admin"
CONTROLLER_USER = "controller"
INTERNAL_USERS = [INTER_BROKER_USER, ADMIN_USER]
BALANCER_WEBSERVER_USER = "balancer"
BALANCER_WEBSERVER_PORT = 9090
SECRETS_APP = [f"{user}-password" for user in INTERNAL_USERS + [BALANCER_WEBSERVER_USER]]
SECRETS_APP = [
f"{user}-password" for user in INTERNAL_USERS + [BALANCER_WEBSERVER_USER, CONTROLLER_USER]
]
SECRETS_UNIT = [
"ca-cert",
"csr",
Expand Down Expand Up @@ -147,6 +150,7 @@ def __eq__(self, value: object, /) -> bool:
"balancer-username",
"balancer-password",
"balancer-uris",
"controller-password",
],
)
CONTROLLER = Role(
Expand All @@ -157,6 +161,7 @@ def __eq__(self, value: object, /) -> bool:
requested_secrets=[
"broker-username",
"broker-password",
"controller-password",
],
)
BALANCER = Role(
Expand All @@ -168,6 +173,7 @@ def __eq__(self, value: object, /) -> bool:
"broker-username",
"broker-password",
"broker-uris",
"controller-passwrod",
"zk-username",
"zk-password",
"zk-uris",
Expand Down Expand Up @@ -232,6 +238,9 @@ class Status(Enum):
MISSING_MODE = StatusLevel(BlockedStatus("Application needs ZooKeeper or KRaft mode"), "DEBUG")
NO_CLUSTER_UUID = StatusLevel(WaitingStatus("Waiting for cluster uuid"), "DEBUG")
NO_QUORUM_URIS = StatusLevel(WaitingStatus("Waiting for quorum uris"), "DEBUG")
MISSING_CONTROLLER_PASSWORD = StatusLevel(
WaitingStatus("Waiting for controller user credentials"), "DEBUG"
)
ZK_NOT_RELATED = StatusLevel(BlockedStatus("missing required zookeeper relation"), "DEBUG")
ZK_NOT_CONNECTED = StatusLevel(BlockedStatus("unit not connected to zookeeper"), "ERROR")
ZK_TLS_MISMATCH = StatusLevel(
Expand Down
36 changes: 31 additions & 5 deletions src/managers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
BROKER,
CONTROLLER_LISTENER_NAME,
CONTROLLER_PORT,
CONTROLLER_USER,
DEFAULT_BALANCER_GOALS,
HARD_BALANCER_GOALS,
INTER_BROKER_USER,
Expand Down Expand Up @@ -412,6 +413,24 @@ def scram_properties(self) -> list[str]:

return scram_properties

@property
def controller_scram_properties(self) -> list[str]:
"""Builds the SCRAM properties for controller listener.
Returns:
list of scram properties to be set
"""
password = self.state.peer_cluster.controller_password
listener_mechanism = self.internal_listener.mechanism.lower()
listener_name = CONTROLLER_LISTENER_NAME.lower()

return [
"sasl.enabled.mechanisms=SCRAM-SHA-512",
f"sasl.mechanism.controller.protocol={self.internal_listener.mechanism}",
f'listener.name.{listener_name}.{listener_mechanism}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{CONTROLLER_USER}" password="{password}" user_{CONTROLLER_USER}="{password}";',
f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}",
]

@property
def oauth_properties(self) -> list[str]:
"""Builds the properties for the oauth listener.
Expand Down Expand Up @@ -636,8 +655,9 @@ def metrics_reporter_properties(self) -> list[str]:
def authorizer_class(self) -> list[str]:
"""Return the authorizer Java class used on Kafka."""
if self.state.kraft_mode:
# return ["authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer"]
return []
return [
"authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer"
]
return ["authorizer.class.name=kafka.security.authorizer.AclAuthorizer"]

@property
Expand All @@ -663,6 +683,7 @@ def controller_properties(self) -> list[str]:
f"node.id={node_id}",
f"controller.quorum.voters={self.state.peer_cluster.controller_quorum_uris}",
f"controller.listener.names={CONTROLLER_LISTENER_NAME}",
*self.controller_scram_properties,
]

return properties
Expand All @@ -684,16 +705,21 @@ def server_properties(self) -> list[str]:
advertised_listeners = [listener.advertised_listener for listener in self.all_listeners]

if self.state.kraft_mode:
controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:PLAINTEXT"
controller_protocol_map = f"{CONTROLLER_LISTENER_NAME}:SASL_PLAINTEXT"
controller_listener = f"{CONTROLLER_LISTENER_NAME}://0.0.0.0:{CONTROLLER_PORT}"

# NOTE: Case where the controller is running standalone. Early return with a
# smaller subset of config options
if not self.state.runs_broker:
properties = (
[f"log.dirs={self.state.log_dirs}", f"listeners={controller_listener}"]
[
f"super.users={self.state.super_users}",
f"log.dirs={self.state.log_dirs}",
f"listeners={controller_listener}",
f"listener.security.protocol.map={controller_protocol_map}",
]
+ self.controller_properties
# + self.authorizer_class
+ self.authorizer_class
)
return properties

Expand Down
18 changes: 11 additions & 7 deletions tests/integration/test_kraft.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
SECURITY_PROTOCOL_PORTS,
)

from .helpers import (
APP_NAME,
check_socket,
get_address,
)
from .helpers import APP_NAME, check_socket, create_test_topic, get_address

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,8 +103,8 @@ async def test_integrate(self, ops_test: OpsTest):
apps=list({APP_NAME, self.controller_app}), idle_period=30
)

async with ops_test.fast_forward(fast_interval="40s"):
await asyncio.sleep(120)
async with ops_test.fast_forward(fast_interval="20s"):
await asyncio.sleep(240)

assert ops_test.model.applications[APP_NAME].status == "active"
assert ops_test.model.applications[self.controller_app].status == "active"
Expand All @@ -130,3 +126,11 @@ async def test_listeners(self, ops_test: OpsTest):
address = await get_address(ops_test=ops_test, app_name=self.controller_app)

assert check_socket(address, CONTROLLER_PORT)

@pytest.mark.abort_on_fail
async def test_authorizer(self, ops_test: OpsTest):

address = await get_address(ops_test=ops_test)
port = SECURITY_PROTOCOL_PORTS["SASL_PLAINTEXT", "SCRAM-SHA-512"].internal

await create_test_topic(ops_test, f"{address}:{port}")

0 comments on commit b5d3c3b

Please sign in to comment.