-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DPE-5232;DPE-5233] feat: support for scaling operations in KRaft mode (single & multi-app) #281
base: main
Are you sure you want to change the base?
Changes from all commits
be262f3
98afa3b
dc11ffb
cc5c90a
f8540f1
76b4d00
9631796
cf274b7
5f8778a
275828b
0f35c71
d7b8f38
c492942
4628288
434c948
b3b7ac8
ce50ad8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -161,6 +161,9 @@ def peer_cluster_orchestrator(self) -> PeerCluster: | |
{ | ||
"controller_quorum_uris": self.cluster.controller_quorum_uris, | ||
"controller_password": self.cluster.controller_password, | ||
"bootstrap_controller": self.cluster.bootstrap_controller, | ||
"bootstrap_unit_id": self.cluster.bootstrap_unit_id, | ||
"bootstrap_replica_id": self.cluster.bootstrap_replica_id, | ||
} | ||
) | ||
|
||
|
@@ -183,6 +186,9 @@ def peer_cluster(self) -> PeerCluster: | |
"balancer_uris": self.cluster.balancer_uris, | ||
"controller_quorum_uris": self.cluster.controller_quorum_uris, | ||
"controller_password": self.cluster.controller_password, | ||
"bootstrap_controller": self.cluster.bootstrap_controller, | ||
"bootstrap_unit_id": self.cluster.bootstrap_unit_id, | ||
"bootstrap_replica_id": self.cluster.bootstrap_replica_id, | ||
} | ||
) | ||
|
||
|
@@ -226,6 +232,13 @@ def unit_broker(self) -> KafkaBroker: | |
substrate=self.substrate, | ||
) | ||
|
||
@property | ||
def kraft_unit_id(self) -> int: | ||
"""Returns current unit ID in KRaft Quorum Manager.""" | ||
if self.runs_broker and self.runs_controller: | ||
return KRAFT_NODE_ID_OFFSET + self.unit_broker.unit_id | ||
return self.unit_broker.unit_id | ||
|
||
@cached_property | ||
def peer_units_data_interfaces(self) -> dict[Unit, DataPeerOtherUnitData]: | ||
"""The cluster peer relation.""" | ||
|
@@ -455,6 +468,13 @@ def controller_quorum_uris(self) -> str: | |
) | ||
return "" | ||
|
||
@property | ||
def bootstrap_controller(self) -> str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that If this is the expected use of them, there are a few things that would need to be checked and fixed:
@property
def _broker_status(self) -> Status: # noqa: C901
"""Checks for role=broker specific readiness."""
. . .
if self.kraft_mode:
if not (self.peer_cluster.bootstrap_controller and self.peer_cluster.bootstrap_unit_id):
return Status.NO_QUORUM_URIS
if not self.cluster.cluster_uuid:
return Status.NO_CLUSTER_UUID
. . .
|
||
"""Returns the controller listener in the format HOST:PORT.""" | ||
if self.runs_controller: | ||
return f"{self.unit_broker.internal_address}:{CONTROLLER_PORT}" | ||
return "" | ||
|
||
@property | ||
def log_dirs(self) -> str: | ||
"""Builds the necessary log.dirs based on mounted storage volumes. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
|
||
import requests | ||
from charms.data_platform_libs.v0.data_interfaces import ( | ||
PROV_SECRET_PREFIX, | ||
Data, | ||
DataPeerData, | ||
DataPeerUnitData, | ||
|
@@ -99,6 +100,9 @@ def __init__( | |
broker_uris: str = "", | ||
cluster_uuid: str = "", | ||
controller_quorum_uris: str = "", | ||
bootstrap_controller: str = "", | ||
bootstrap_unit_id: str = "", | ||
bootstrap_replica_id: str = "", | ||
racks: int = 0, | ||
broker_capacities: BrokerCapacities = {}, | ||
zk_username: str = "", | ||
|
@@ -115,6 +119,9 @@ def __init__( | |
self._broker_uris = broker_uris | ||
self._cluster_uuid = cluster_uuid | ||
self._controller_quorum_uris = controller_quorum_uris | ||
self._bootstrap_controller = bootstrap_controller | ||
self._bootstrap_unit_id = bootstrap_unit_id | ||
self._bootstrap_replica_id = bootstrap_replica_id | ||
self._racks = racks | ||
self._broker_capacities = broker_capacities | ||
self._zk_username = zk_username | ||
|
@@ -125,6 +132,18 @@ def __init__( | |
self._balancer_uris = balancer_uris | ||
self._controller_password = controller_password | ||
|
||
def _fetch_from_secrets(self, group, field) -> str: | ||
if not self.relation: | ||
return "" | ||
|
||
if secrets_uri := self.relation.data[self.relation.app].get( | ||
f"{PROV_SECRET_PREFIX}{group}" | ||
): | ||
if secret := self.data_interface._model.get_secret(id=secrets_uri): | ||
return secret.get_content().get(field, "") | ||
|
||
return "" | ||
|
||
@property | ||
def roles(self) -> str: | ||
"""All the roles pass from the related application.""" | ||
|
@@ -145,12 +164,7 @@ def broker_username(self) -> str: | |
if not self.relation or not self.relation.app: | ||
return "" | ||
|
||
return self.data_interface._fetch_relation_data_with_secrets( | ||
component=self.relation.app, | ||
req_secret_fields=BALANCER.requested_secrets, | ||
relation=self.relation, | ||
fields=BALANCER.requested_secrets, | ||
).get("broker-username", "") | ||
return self._fetch_from_secrets("broker", "broker-username") | ||
imanenami marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@property | ||
def broker_password(self) -> str: | ||
|
@@ -161,12 +175,7 @@ def broker_password(self) -> str: | |
if not self.relation or not self.relation.app: | ||
return "" | ||
|
||
return self.data_interface._fetch_relation_data_with_secrets( | ||
component=self.relation.app, | ||
req_secret_fields=BALANCER.requested_secrets, | ||
relation=self.relation, | ||
fields=BALANCER.requested_secrets, | ||
).get("broker-password", "") | ||
return self._fetch_from_secrets("broker", "broker-password") | ||
|
||
@property | ||
def broker_uris(self) -> str: | ||
|
@@ -232,6 +241,54 @@ def cluster_uuid(self) -> str: | |
or "" | ||
) | ||
|
||
@property | ||
def bootstrap_controller(self) -> str: | ||
"""Bootstrap controller in KRaft mode.""" | ||
if self._bootstrap_controller: | ||
return self._bootstrap_controller | ||
|
||
if not self.relation or not self.relation.app: | ||
return "" | ||
|
||
return ( | ||
self.data_interface.fetch_relation_field( | ||
relation_id=self.relation.id, field="bootstrap-controller" | ||
) | ||
or "" | ||
) | ||
|
||
@property | ||
def bootstrap_unit_id(self) -> str: | ||
"""Bootstrap unit ID in KRaft mode.""" | ||
if self._bootstrap_unit_id: | ||
return self._bootstrap_unit_id | ||
|
||
if not self.relation or not self.relation.app: | ||
return "" | ||
|
||
return ( | ||
self.data_interface.fetch_relation_field( | ||
relation_id=self.relation.id, field="bootstrap-unit-id" | ||
) | ||
or "" | ||
) | ||
|
||
@property | ||
def bootstrap_replica_id(self) -> str: | ||
"""Directory ID of the bootstrap node in KRaft mode.""" | ||
if self._bootstrap_replica_id: | ||
return self._bootstrap_replica_id | ||
|
||
if not self.relation or not self.relation.app: | ||
return "" | ||
|
||
return ( | ||
self.data_interface.fetch_relation_field( | ||
relation_id=self.relation.id, field="bootstrap-replica-id" | ||
) | ||
or "" | ||
) | ||
|
||
@property | ||
def racks(self) -> int: | ||
"""The number of racks for the brokers.""" | ||
|
@@ -489,6 +546,21 @@ def cluster_uuid(self) -> str: | |
"""Cluster uuid used for initializing storages.""" | ||
return self.relation_data.get("cluster-uuid", "") | ||
|
||
@property | ||
def bootstrap_replica_id(self) -> str: | ||
"""Directory ID of the bootstrap controller.""" | ||
return self.relation_data.get("bootstrap-replica-id", "") | ||
|
||
@property | ||
def bootstrap_controller(self) -> str: | ||
"""HOST:PORT address of the bootstrap controller.""" | ||
return self.relation_data.get("bootstrap-controller", "") | ||
|
||
@property | ||
def bootstrap_unit_id(self) -> str: | ||
"""Unit ID of the bootstrap controller.""" | ||
return self.relation_data.get("bootstrap-unit-id", "") | ||
|
||
|
||
class KafkaBroker(RelationState): | ||
"""State collection metadata for a unit.""" | ||
|
@@ -640,6 +712,16 @@ def node_ip(self) -> str: | |
""" | ||
return self.k8s.get_node_ip(self.pod_name) | ||
|
||
@property | ||
def directory_id(self) -> str: | ||
"""Directory ID of the node as saved in `meta.properties`.""" | ||
return self.relation_data.get("directory-id", "") | ||
marcoppenheimer marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@property | ||
def added_to_quorum(self) -> bool: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A couple of important points here, not really blocking but I think they are good for future improvements
Even if we unset the property, a failing unit still is problematic. Chances are |
||
"""Whether or not this node is added to dynamic quorum in KRaft mode.""" | ||
return bool(self.relation_data.get("added-to-quorum", False)) | ||
|
||
|
||
class ZooKeeper(RelationState): | ||
"""State collection metadata for a the Zookeeper relation.""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
) | ||
|
||
from events.actions import ActionEvents | ||
from events.controller import KRaftHandler | ||
from events.oauth import OAuthHandler | ||
from events.provider import KafkaProvider | ||
from events.upgrade import KafkaDependencyModel, KafkaUpgrade | ||
|
@@ -35,7 +36,6 @@ | |
CONTROLLER, | ||
DEPENDENCIES, | ||
GROUP, | ||
INTERNAL_USERS, | ||
PEER, | ||
PROFILE_TESTING, | ||
REL_NAME, | ||
|
@@ -94,6 +94,7 @@ def __init__(self, charm) -> None: | |
|
||
self.provider = KafkaProvider(self) | ||
self.oauth = OAuthHandler(self) | ||
self.kraft = KRaftHandler(self) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question it may be a naive question, but don't we want to enable this based on the if statement above? I would have logically thought that it is either ZooKeeper or Kraft... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could enable it based on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Uhm, my preference here would be to have "general" checks in another "general" handler, and the custom ones related to controllers in the |
||
|
||
# MANAGERS | ||
|
||
|
@@ -167,20 +168,13 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901 | |
if not self.upgrade.idle: | ||
return | ||
|
||
if self.charm.state.kraft_mode: | ||
self._init_kraft_mode() | ||
|
||
# FIXME ready to start probably needs to account for credentials being created beforehand | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this comment can be removed. Credentials issues on the start process were solved already |
||
current_status = self.charm.state.ready_to_start | ||
if current_status is not Status.ACTIVE: | ||
self.charm._set_status(current_status) | ||
event.defer() | ||
return | ||
|
||
if self.charm.state.kraft_mode: | ||
self.config_manager.set_server_properties() | ||
self._format_storages() | ||
|
||
self.update_external_services() | ||
|
||
# required settings given zookeeper connection config has been created | ||
|
@@ -429,61 +423,6 @@ def healthy(self) -> bool: | |
|
||
return True | ||
|
||
def _init_kraft_mode(self) -> None: | ||
"""Initialize the server when running controller mode.""" | ||
# NOTE: checks for `runs_broker` in this method should be `is_cluster_manager` in | ||
# the large deployment feature. | ||
if not self.model.unit.is_leader(): | ||
return | ||
|
||
if not self.charm.state.cluster.internal_user_credentials and self.charm.state.runs_broker: | ||
credentials = [ | ||
(username, self.charm.workload.generate_password()) for username in INTERNAL_USERS | ||
] | ||
for username, password in credentials: | ||
self.charm.state.cluster.update({f"{username}-password": password}) | ||
|
||
# cluster-uuid is only created on the broker (`cluster-manager` in large deployments) | ||
if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker: | ||
uuid = self.workload.run_bin_command( | ||
bin_keyword="storage", bin_args=["random-uuid"] | ||
).strip() | ||
|
||
self.charm.state.cluster.update({"cluster-uuid": uuid}) | ||
self.charm.state.peer_cluster.update({"cluster-uuid": uuid}) | ||
|
||
# Controller is tasked with populating quorum uris and the `controller` user password | ||
if self.charm.state.runs_controller: | ||
quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris} | ||
self.charm.state.cluster.update(quorum_uris) | ||
|
||
generated_password = self.charm.workload.generate_password() | ||
|
||
if self.charm.state.peer_cluster_orchestrator: | ||
self.charm.state.peer_cluster_orchestrator.update(quorum_uris) | ||
|
||
if not self.charm.state.peer_cluster_orchestrator.controller_password: | ||
self.charm.state.peer_cluster_orchestrator.update( | ||
{"controller-password": generated_password} | ||
) | ||
elif not self.charm.state.peer_cluster.controller_password: | ||
# single mode, controller & leader | ||
self.charm.state.cluster.update({"controller-password": generated_password}) | ||
|
||
def _format_storages(self) -> None: | ||
"""Format storages provided relevant keys exist.""" | ||
if self.charm.state.runs_broker: | ||
credentials = self.charm.state.cluster.internal_user_credentials | ||
elif self.charm.state.runs_controller: | ||
credentials = { | ||
self.charm.state.peer_cluster.broker_username: self.charm.state.peer_cluster.broker_password | ||
} | ||
|
||
self.workload.format_storages( | ||
uuid=self.charm.state.peer_cluster.cluster_uuid, | ||
internal_user_credentials=credentials, | ||
) | ||
|
||
def update_external_services(self) -> None: | ||
"""Attempts to update any external Kubernetes services.""" | ||
if not self.charm.substrate == "k8s": | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: this needs to be called
bootstrap_unit_id
to be consistent with models no?