diff --git a/actions.yaml b/actions.yaml index 9cf312c1..6cc39eaa 100644 --- a/actions.yaml +++ b/actions.yaml @@ -31,6 +31,9 @@ get-admin-credentials: The returned client_properties can be used for Kafka bin commands using `--bootstrap-server` and `--command-config` for admin level administration This action must be called on the leader unit. +get-listeners: + description: Get all active listeners and their port allocations + pre-upgrade-check: description: Run necessary pre-upgrade checks before executing a charm upgrade. diff --git a/config.yaml b/config.yaml index 881de916..2fa91c65 100644 --- a/config.yaml +++ b/config.yaml @@ -92,6 +92,10 @@ options: description: Config options to add extra-sans to the ones used when requesting server certificates. The extra-sans are specified by comma-separated names to be added when requesting signed certificates. Use "{unit}" as a placeholder to be filled with the unit number, e.g. "worker-{unit}" will be translated as "worker-0" for unit 0 and "worker-1" for unit 1 when requesting the certificate. type: string default: "" + extra_listeners: + description: "Config options to add extra SANs to the ones used when requesting server certificates, and to define custom `advertised.listeners` and ports for clients external to the Juju model. These items are comma-separated. Use '{unit}' as a placeholder to be filled with the unit number if necessary. For port allocations, providing the port for a given listener will offset the generated port number by that amount, with an accepted value range of 20001-50000. For example, a provided value of 'worker-{unit}.domain.com:30000' will generate listeners for unit 0 with name 'worker-0.domain.com', and be allocated ports 39092, 39093 etc for each authentication scheme." + type: string + default: "" log_level: description: "Level of logging for the different components operated by the charm. Possible values: ERROR, WARNING, INFO, DEBUG" type: string diff --git a/src/charm.py b/src/charm.py index 339126f8..61d24ccc 100755 --- a/src/charm.py +++ b/src/charm.py @@ -7,6 +7,7 @@ import logging import time +import ops from charms.data_platform_libs.v0.data_models import TypedCharmBase from charms.grafana_agent.v0.cos_agent import COSAgentProvider from charms.operator_libs_linux.v0 import sysctl @@ -17,7 +18,6 @@ EventBase, StatusBase, ) -from ops.main import main from core.cluster import ClusterState from core.models import Substrates @@ -188,4 +188,4 @@ def _on_collect_status(self, event: CollectStatusEvent): if __name__ == "__main__": - main(KafkaCharm) + ops.main(KafkaCharm) # pyright: ignore[reportCallIssue] diff --git a/src/core/structured_config.py b/src/core/structured_config.py index 3a18ec50..933c9487 100644 --- a/src/core/structured_config.py +++ b/src/core/structured_config.py @@ -74,6 +74,7 @@ class CharmConfig(BaseConfigModel): zookeeper_ssl_cipher_suites: str | None profile: str certificate_extra_sans: str | None + extra_listeners: list[str] log_level: str network_bandwidth: int = Field(default=50000, validate_default=False, gt=0) cruisecontrol_balance_threshold: float = Field(default=1.1, validate_default=False, ge=1) @@ -265,3 +266,45 @@ def roles_values(cls, value: str) -> str: raise ValueError("Unknown role(s):", unknown_roles) return ",".join(sorted(roles)) # this has to be a string as it goes in to properties + + @validator("certificate_extra_sans") + @classmethod + def certificate_extra_sans_values(cls, value: str) -> list[str]: + """Formats certificate_extra_sans values to a list.""" + return value.split(",") if value else [] + + @validator("extra_listeners", pre=True) + @classmethod + def extra_listeners_port_values(cls, value: str) -> list[str]: + """Check extra_listeners port values for each listener, and format values to a list.""" + if not value: + return [] + + listeners = value.split(",") + + ports = [] + for listener in listeners: + if ":" not in listener or not listener.split(":")[1].isdigit(): + raise ValueError("Value for listener does not contain a valid port.") + + port = int(listener.split(":")[1]) + if not 20000 < port < 50000: + raise ValueError( + "Value for port out of accepted values. Accepted values for port are greater than 20000 and less than 50000" + ) + + ports.append(port) + + current_port = 0 + for port in ports: + if not current_port - 100 < int(port) > current_port + 100: + raise ValueError( + "Value for port is too close to other value for port. Accepted values must be at least 100 apart from each other" + ) + + current_port = int(port) + + if len(ports) != len(set(ports)): + raise ValueError("Value for port is not unique for each listener.") + + return listeners diff --git a/src/events/password_actions.py b/src/events/actions.py similarity index 76% rename from src/events/password_actions.py rename to src/events/actions.py index 2049d408..9e360a0a 100644 --- a/src/events/password_actions.py +++ b/src/events/actions.py @@ -2,7 +2,8 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Event handlers for password-related Juju Actions.""" +"""Event handlers for Juju Actions.""" + import logging from typing import TYPE_CHECKING @@ -18,11 +19,11 @@ logger = logging.getLogger(__name__) -class PasswordActionEvents(Object): - """Event handlers for password-related Juju Actions.""" +class ActionEvents(Object): + """Event handlers for Juju Actions.""" def __init__(self, dependent: "BrokerOperator") -> None: - super().__init__(dependent, "password_events") + super().__init__(dependent, "action_events") self.dependent = dependent self.charm: "KafkaCharm" = dependent.charm @@ -33,6 +34,31 @@ def __init__(self, dependent: "BrokerOperator") -> None: getattr(self.charm.on, "get_admin_credentials_action"), self._get_admin_credentials_action, ) + self.framework.observe( + getattr(self.charm.on, "get_listeners_action"), self._get_listeners_action + ) + + def _get_listeners_action(self, event: ActionEvent) -> None: + """Handler for get-listeners action.""" + listeners = self.dependent.config_manager.all_listeners + + result = {} + for listener in listeners: + key = listener.name.replace("_", "-").lower() + result.update( + { + key: { + "name": listener.name, + "scope": listener.scope, + "port": listener.port, + "protocol": listener.protocol, + "auth-mechanism": listener.mechanism, + "advertised-listener": listener.advertised_listener, + } + } + ) + + event.set_results(result) def _set_password_action(self, event: ActionEvent) -> None: """Handler for set-password action. diff --git a/src/events/broker.py b/src/events/broker.py index 1e2c9b3f..b0274c9a 100644 --- a/src/events/broker.py +++ b/src/events/broker.py @@ -24,8 +24,8 @@ UpdateStatusEvent, ) +from events.actions import ActionEvents from events.oauth import OAuthHandler -from events.password_actions import PasswordActionEvents from events.provider import KafkaProvider from events.upgrade import KafkaDependencyModel, KafkaUpgrade from events.zookeeper import ZooKeeperHandler @@ -88,7 +88,8 @@ def __init__(self, charm) -> None: **DEPENDENCIES # pyright: ignore[reportArgumentType] ), ) - self.password_action_events = PasswordActionEvents(self) + self.action_events = ActionEvents(self) + if not self.charm.state.runs_controller: self.zookeeper = ZooKeeperHandler(self) diff --git a/src/events/tls.py b/src/events/tls.py index d3d906e5..e0d9459f 100644 --- a/src/events/tls.py +++ b/src/events/tls.py @@ -9,6 +9,7 @@ import logging import os import re +import warnings from typing import TYPE_CHECKING from charms.tls_certificates_interface.v1.tls_certificates import ( @@ -296,6 +297,13 @@ def _request_certificate(self): sans = self.charm.broker.tls_manager.build_sans() + # only warn during certificate creation, not every event if in structured_config + if self.charm.config.certificate_extra_sans: + warnings.warn( + "'certificate_extra_sans' config option is deprecated, use 'extra_listeners' instead", + DeprecationWarning, + ) + csr = generate_csr( private_key=self.charm.state.unit_broker.private_key.encode("utf-8"), subject=self.charm.state.unit_broker.relation_data.get("private-address", ""), diff --git a/src/literals.py b/src/literals.py index 5a2768a0..13739792 100644 --- a/src/literals.py +++ b/src/literals.py @@ -72,11 +72,12 @@ class Ports: client: int internal: int external: int + extra: int = 0 AuthProtocol = Literal["SASL_PLAINTEXT", "SASL_SSL", "SSL"] AuthMechanism = Literal["SCRAM-SHA-512", "OAUTHBEARER", "SSL"] -Scope = Literal["INTERNAL", "CLIENT", "EXTERNAL"] +Scope = Literal["INTERNAL", "CLIENT", "EXTERNAL", "EXTRA"] AuthMap = NamedTuple("AuthMap", protocol=AuthProtocol, mechanism=AuthMechanism) SECURITY_PROTOCOL_PORTS: dict[AuthMap, Ports] = { diff --git a/src/managers/config.py b/src/managers/config.py index 69152417..b60fe197 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -76,6 +76,7 @@ "profile", "log_level", "certificate_extra_sans", + "extra_listeners", "roles", "expose_external", ] @@ -86,19 +87,28 @@ class Listener: Args: auth_map: AuthMap representing the auth.protocol and auth.mechanism for the listener - scope: scope of the listener, CLIENT, INTERNAL or EXTERNAL + scope: scope of the listener, CLIENT, INTERNAL, EXTERNAL or EXTRA host: string with the host that will be announced + baseport (optional): integer port to offset CLIENT port numbers for EXTRA listeners node_port (optional): the node-port for the listener if scope=EXTERNAL """ def __init__( - self, auth_map: AuthMap, scope: Scope, host: str = "", node_port: int | None = None + self, + auth_map: AuthMap, + scope: Scope, + host: str = "", + baseport: int = 30000, + extra_count: int = -1, + node_port: int | None = None, ): self.auth_map = auth_map self.protocol = auth_map.protocol self.mechanism = auth_map.mechanism self.host = host self.scope = scope + self.baseport = baseport + self.extra_count = extra_count self.node_port = node_port @property @@ -109,8 +119,8 @@ def scope(self) -> Scope: @scope.setter def scope(self, value): """Internal scope validator.""" - if value not in ["CLIENT", "INTERNAL", "EXTERNAL"]: - raise ValueError("Only CLIENT, INTERNAL and EXTERNAL scopes are accepted") + if value not in ["CLIENT", "INTERNAL", "EXTERNAL", "EXTRA"]: + raise ValueError("Only CLIENT, INTERNAL, EXTERNAL and EXTRA scopes are accepted") self._scope = value @@ -121,12 +131,18 @@ def port(self) -> int: Returns: Integer of port number """ + # generates ports 39092, 39192, 39292 etc for listener auth if baseport=30000 + if self.scope == "EXTRA": + return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], "client") + self.baseport + return getattr(SECURITY_PROTOCOL_PORTS[self.auth_map], self.scope.lower()) @property def name(self) -> str: """Name of the listener.""" - return f"{self.scope}_{self.protocol}_{self.mechanism.replace('-', '_')}" + return f"{self.scope}_{self.protocol}_{self.mechanism.replace('-', '_')}" + ( + f"_{self.extra_count}" if self.extra_count >= 0 else "" + ) @property def protocol_map(self) -> str: @@ -383,7 +399,7 @@ def scram_properties(self) -> list[str]: f'listener.name.{listener_name}.{listener_mechanism}.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="{username}" password="{password}";', f"listener.name.{listener_name}.sasl.enabled.mechanisms={self.internal_listener.mechanism}", ] - for auth in self.client_listeners + self.external_listeners: + for auth in self.client_listeners + self.external_listeners + self.extra_listeners: if not auth.mechanism.startswith("SCRAM"): continue @@ -463,8 +479,34 @@ def controller_listener(self) -> None: pass # TODO: No good abstraction in place for the controller use case @property - def client_listeners(self) -> list[Listener]: + def extra_listeners(self) -> list[Listener]: """Return a list of extra listeners.""" + extra_host_baseports = [ + tuple(listener.split(":")) for listener in self.config.extra_listeners + ] + + extra_listeners = [] + extra_count = 0 + for host, baseport in extra_host_baseports: + for auth_map in self.state.enabled_auth: + host = host.replace("{unit}", str(self.state.unit_broker.unit_id)) + extra_listeners.append( + Listener( + host=host, + auth_map=auth_map, + scope="EXTRA", + baseport=int(baseport), + extra_count=extra_count, + ) + ) + + extra_count += 1 + + return extra_listeners + + @property + def client_listeners(self) -> list[Listener]: + """Return a list of client listeners.""" return [ Listener( host=self.state.unit_broker.internal_address, auth_map=auth_map, scope="CLIENT" @@ -508,7 +550,12 @@ def external_listeners(self) -> list[Listener]: @property def all_listeners(self) -> list[Listener]: """Return a list with all expected listeners.""" - return [self.internal_listener] + self.client_listeners + self.external_listeners + return ( + [self.internal_listener] + + self.client_listeners + + self.external_listeners + + self.extra_listeners + ) @property def inter_broker_protocol_version(self) -> str: diff --git a/src/managers/tls.py b/src/managers/tls.py index b6b64efa..d6e3fbc5 100644 --- a/src/managers/tls.py +++ b/src/managers/tls.py @@ -129,14 +129,11 @@ def remove_cert(self, alias: str) -> None: def _build_extra_sans(self) -> list[str]: """Parse the certificate_extra_sans config option.""" - extra_sans = self.config.certificate_extra_sans or "" - parsed_sans = [] - - if extra_sans == "": - return parsed_sans - - for sans in extra_sans.split(","): - parsed_sans.append(sans.replace("{unit}", str(self.state.unit_broker.unit_id))) + extra_sans = self.config.extra_listeners or self.config.certificate_extra_sans or [] + clean_sans = [san.split(":")[0] for san in extra_sans] + parsed_sans = [ + san.replace("{unit}", str(self.state.unit_broker.unit_id)) for san in clean_sans + ] return parsed_sans diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index da50edbc..924abb9b 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -331,10 +331,10 @@ async def test_observability_integration(ops_test: OpsTest): @pytest.mark.abort_on_fail async def test_deploy_with_existing_storage(ops_test: OpsTest): - unit_to_remove, *_ = await ops_test.model.applications[APP_NAME].add_units(count=3) - await ops_test.model.block_until(lambda: len(ops_test.model.applications[APP_NAME].units) == 4) + unit_to_remove, *_ = await ops_test.model.applications[APP_NAME].add_units(count=1) + await ops_test.model.block_until(lambda: len(ops_test.model.applications[APP_NAME].units) == 2) await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", timeout=1000, idle_period=30 + apps=[APP_NAME], status="active", timeout=2000, idle_period=30 ) _, stdout, _ = await ops_test.juju("storage", "--format", "json") @@ -347,7 +347,7 @@ async def test_deploy_with_existing_storage(ops_test: OpsTest): break await unit_to_remove.remove(destroy_storage=False) - await ops_test.model.block_until(lambda: len(ops_test.model.applications[APP_NAME].units) == 3) + await ops_test.model.block_until(lambda: len(ops_test.model.applications[APP_NAME].units) == 1) add_unit_cmd = f"add-unit {APP_NAME} --model={ops_test.model.info.name} --attach-storage={data_storage_id}".split() await ops_test.juju(*add_unit_cmd) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 4976983b..f9ef283f 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -10,6 +10,7 @@ import pytest import yaml from ops.testing import Harness +from pydantic import ValidationError from charm import KafkaCharm from literals import ( @@ -160,6 +161,88 @@ def test_listeners_in_server_properties(harness: Harness[KafkaCharm]): ) +def test_extra_listeners_in_server_properties(harness: Harness[KafkaCharm]): + """Checks that the extra-listeners are properly set from config.""" + # verifying structured config validators + for value in [ + "missing.port", + "low.port:15000", + "high.port:60000", + "non.unique:30000,other.non.unique:30000", + "close.port:30000,other.close.port:30001", + ]: + with pytest.raises(ValidationError): + harness._update_config({"extra_listeners": value}) + harness.charm.broker.config_manager.config = harness.charm.config + + harness._update_config( + {"extra_listeners": "worker-{unit}.foo.com:30000,worker-{unit}.bar.com:40000"} + ) + harness.charm.broker.config_manager.config = harness.charm.config + + peer_relation_id = harness.add_relation(PEER, CHARM_KEY) + harness.add_relation_unit(peer_relation_id, f"{CHARM_KEY}/1") + harness.update_relation_data( + peer_relation_id, f"{CHARM_KEY}/0", {"private-address": "treebeard"} + ) + + # adding client + client_relation_id = harness.add_relation("kafka-client", "app") + harness.update_relation_data(client_relation_id, "app", {"extra-user-roles": "admin,producer"}) + assert ( + len(harness.charm.broker.config_manager.all_listeners) == 4 + ) # 2 extra, 1 internal, 1 client + + # adding SSL + harness.update_relation_data(peer_relation_id, CHARM_KEY, {"tls": "enabled"}) + assert ( + len(harness.charm.broker.config_manager.all_listeners) == 4 + ) # 2 extra, 1 internal, 1 client + + # adding SSL + harness.update_relation_data(peer_relation_id, CHARM_KEY, {"mtls": "enabled"}) + assert ( + len(harness.charm.broker.config_manager.all_listeners) == 7 + ) # 2 extra sasl_ssl, 2 extra ssl, 1 internal, 2 client + + expected_listener_names = { + "INTERNAL_SASL_PLAINTEXT_SCRAM_SHA_512", + "CLIENT_SASL_PLAINTEXT_SCRAM_SHA_512", + "CLIENT_SSL_SSL", + "EXTRA_SASL_PLAINTEXT_SCRAM_SHA_512_0", + "EXTRA_SASL_PLAINTEXT_SCRAM_SHA_512_1", + "EXTRA_SSL_SSL_0", + "EXTRA_SSL_SSL_1", + } + + advertised_listeners_prop = "" + for prop in harness.charm.broker.config_manager.server_properties: + if "advertised.listener" in prop: + advertised_listeners_prop = prop + + # validating every expected listener is present + for name in expected_listener_names: + assert name in advertised_listeners_prop + + # validating their allocated ports are expected + ports = [] + for listener in advertised_listeners_prop.split("=")[1].split(","): + name, _, port = listener.split(":") + + if name.endswith("_0") or name.endswith("_1"): + # verifying allocation uses the baseport + digit = 10**4 + assert int(port) // digit * digit in (30000, 40000) + + # verifying allocation is in steps of 100 + digit = 10**2 + assert int(port) // digit * digit in (39000, 39100, 49000, 49100) + + # verifying all ports are unique + assert port not in ports + ports.append(port) + + def test_oauth_client_listeners_in_server_properties(harness: Harness[KafkaCharm]): """Checks that oauth client listeners are properly set when a relating through oauth.""" harness.add_relation(ZK, CHARM_KEY) diff --git a/tests/unit/test_tls.py b/tests/unit/test_tls.py index 6ea6ade4..62db76d6 100644 --- a/tests/unit/test_tls.py +++ b/tests/unit/test_tls.py @@ -120,6 +120,19 @@ def test_extra_sans_config(harness: Harness[KafkaCharm]): assert "worker0.com" in "".join(manager._build_extra_sans()) assert "0.example" in "".join(manager._build_extra_sans()) + # verifying that sans can be built with both certificate_extra_sans and extra_listeners + harness._update_config( + { + "certificate_extra_sans": "", + "extra_listeners": "worker{unit}.com:30000,{unit}.example:40000,nonunit.domain.com:45000", + } + ) + manager.config = harness.charm.config + assert manager._build_extra_sans + assert "worker0.com" in "".join(manager._build_extra_sans()) + assert "0.example" in "".join(manager._build_extra_sans()) + assert "nonunit.domain.com" in "".join(manager._build_extra_sans()) + def test_sans(harness: Harness[KafkaCharm], patched_node_ip): # Create peer relation diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index 757d511f..10042f9e 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -111,7 +111,7 @@ def test_run_password_rotation_while_upgrading(harness: Harness[KafkaCharm], upg ), patch("managers.auth.AuthManager.add_user"), ): - harness.charm.broker.password_action_events._set_password_action(mock_event) + harness.charm.broker.action_events._set_password_action(mock_event) if not upgrade_stack: mock_event.set_results.assert_called()