Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5232;DPE-5233] feat: support for scaling operations in KRaft mode (single & multi-app) #281

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
20 changes: 20 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ def peer_cluster_orchestrator(self) -> PeerCluster:
{
"controller_quorum_uris": self.cluster.controller_quorum_uris,
"controller_password": self.cluster.controller_password,
"bootstrap_controller": self.cluster.bootstrap_controller,
"bootstrap_unit_id": self.cluster.bootstrap_unit_id,
"bootstrap_replica_id": self.cluster.bootstrap_replica_id,
}
)

Expand All @@ -183,6 +186,9 @@ def peer_cluster(self) -> PeerCluster:
"balancer_uris": self.cluster.balancer_uris,
"controller_quorum_uris": self.cluster.controller_quorum_uris,
"controller_password": self.cluster.controller_password,
"bootstrap_controller": self.cluster.bootstrap_controller,
"bootstrap_unit_id": self.cluster.bootstrap_unit_id,
"bootstrap_replica_id": self.cluster.bootstrap_replica_id,
}
)

Expand Down Expand Up @@ -226,6 +232,13 @@ def unit_broker(self) -> KafkaBroker:
substrate=self.substrate,
)

@property
def kraft_unit_id(self) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: this needs to be called bootstrap_unit_id to be consistent with models no?

"""Returns current unit ID in KRaft Quorum Manager."""
if self.runs_broker and self.runs_controller:
return KRAFT_NODE_ID_OFFSET + self.unit_broker.unit_id
return self.unit_broker.unit_id

@cached_property
def peer_units_data_interfaces(self) -> dict[Unit, DataPeerOtherUnitData]:
"""The cluster peer relation."""
Expand Down Expand Up @@ -455,6 +468,13 @@ def controller_quorum_uris(self) -> str:
)
return ""

@property
def bootstrap_controller(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that controller_quorum_uris are not being used anymore for the workload and they have been replaced by a combination of bootstrap_controller and kraft_unit_id keys.

If this is the expected use of them, there are a few things that would need to be checked and fixed:

  • Remove controller_quorum_uris from the codebase
  • On ClusterState._broker_status() add the correct checks:
    @property
    def _broker_status(self) -> Status:  # noqa: C901
        """Checks for role=broker specific readiness."""
        . . .

        if self.kraft_mode:
            if not (self.peer_cluster.bootstrap_controller and self.peer_cluster.bootstrap_unit_id):
                return Status.NO_QUORUM_URIS
            if not self.cluster.cluster_uuid:
                return Status.NO_CLUSTER_UUID
        . . .

"""Returns the controller listener in the format HOST:PORT."""
if self.runs_controller:
return f"{self.unit_broker.internal_address}:{CONTROLLER_PORT}"
return ""

@property
def log_dirs(self) -> str:
"""Builds the necessary log.dirs based on mounted storage volumes.
Expand Down
96 changes: 96 additions & 0 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import requests
from charms.data_platform_libs.v0.data_interfaces import (
PROV_SECRET_PREFIX,
Data,
DataPeerData,
DataPeerUnitData,
Expand Down Expand Up @@ -99,6 +100,9 @@ def __init__(
broker_uris: str = "",
cluster_uuid: str = "",
controller_quorum_uris: str = "",
bootstrap_controller: str = "",
bootstrap_unit_id: str = "",
bootstrap_replica_id: str = "",
racks: int = 0,
broker_capacities: BrokerCapacities = {},
zk_username: str = "",
Expand All @@ -115,6 +119,9 @@ def __init__(
self._broker_uris = broker_uris
self._cluster_uuid = cluster_uuid
self._controller_quorum_uris = controller_quorum_uris
self._bootstrap_controller = bootstrap_controller
self._bootstrap_unit_id = bootstrap_unit_id
self._bootstrap_replica_id = bootstrap_replica_id
self._racks = racks
self._broker_capacities = broker_capacities
self._zk_username = zk_username
Expand All @@ -125,6 +132,18 @@ def __init__(
self._balancer_uris = balancer_uris
self._controller_password = controller_password

def _fetch_from_secrets(self, group, field):
imanenami marked this conversation as resolved.
Show resolved Hide resolved
if not self.relation:
return ""

if secrets_uri := self.relation.data[self.relation.app].get(
f"{PROV_SECRET_PREFIX}{group}"
):
if secret := self.data_interface._model.get_secret(id=secrets_uri):
return secret.get_content().get(field, "")

return ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: This is much better 👍🏾 - thank you for the improvement!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also use this for the other properties that require the painful _fetch_relation_data_with_secrets`?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I'm a little afraid to do so given the nature of _fetch_relation_data_with_secrets which has a bit of legacy :(


@property
def roles(self) -> str:
"""All the roles pass from the related application."""
Expand All @@ -145,6 +164,8 @@ def broker_username(self) -> str:
if not self.relation or not self.relation.app:
return ""

return self._fetch_from_secrets("broker", "broker-username")
imanenami marked this conversation as resolved.
Show resolved Hide resolved

return self.data_interface._fetch_relation_data_with_secrets(
component=self.relation.app,
req_secret_fields=BALANCER.requested_secrets,
Expand All @@ -161,6 +182,8 @@ def broker_password(self) -> str:
if not self.relation or not self.relation.app:
return ""

return self._fetch_from_secrets("broker", "broker-password")

return self.data_interface._fetch_relation_data_with_secrets(
component=self.relation.app,
req_secret_fields=BALANCER.requested_secrets,
Expand Down Expand Up @@ -232,6 +255,54 @@ def cluster_uuid(self) -> str:
or ""
)

@property
def bootstrap_controller(self) -> str:
"""Bootstrap controller in KRaft mode."""
if self._bootstrap_controller:
return self._bootstrap_controller

if not self.relation or not self.relation.app:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="bootstrap-controller"
)
or ""
)

@property
def bootstrap_unit_id(self) -> str:
"""Bootstrap unit ID in KRaft mode."""
if self._bootstrap_unit_id:
return self._bootstrap_unit_id

if not self.relation or not self.relation.app:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="bootstrap-unit-id"
)
or ""
)

@property
def bootstrap_replica_id(self) -> str:
"""Directory ID of the bootstrap node in KRaft mode."""
if self._bootstrap_replica_id:
return self._bootstrap_replica_id

if not self.relation or not self.relation.app:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="bootstrap-replica-id"
)
or ""
)

@property
def racks(self) -> int:
"""The number of racks for the brokers."""
Expand Down Expand Up @@ -489,6 +560,21 @@ def cluster_uuid(self) -> str:
"""Cluster uuid used for initializing storages."""
return self.relation_data.get("cluster-uuid", "")

@property
def bootstrap_replica_id(self) -> str:
"""Directory ID of the bootstrap controller."""
return self.relation_data.get("bootstrap-replica-id", "")

@property
def bootstrap_controller(self) -> str:
"""HOST:PORT address of the bootstrap controller."""
return self.relation_data.get("bootstrap-controller", "")

@property
def bootstrap_unit_id(self) -> str:
"""Unit ID of the bootstrap controller."""
return self.relation_data.get("bootstrap-unit-id", "")


class KafkaBroker(RelationState):
"""State collection metadata for a unit."""
Expand Down Expand Up @@ -640,6 +726,16 @@ def node_ip(self) -> str:
"""
return self.k8s.get_node_ip(self.pod_name)

@property
def directory_id(self) -> str:
"""Directory ID of the node as saved in `meta.properties`."""
return self.relation_data.get("directory-id", "")
marcoppenheimer marked this conversation as resolved.
Show resolved Hide resolved

@property
def added_to_quorum(self) -> bool:
Copy link
Contributor

@zmraul zmraul Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of important points here, not really blocking but I think they are good for future improvements

  • Logic around added_to_quorum should probably be checked against the cluster itself, and be tracked as little as possible on the charm.
  • Something like an upgrade or a failure on the unit will not be correctly resolved.
    At the moment there is no unsetting of this property, it will stay as true once the unit is first added to the quorum, so a failing unit would look at the databag, see added_to_quorum=True when the reality is that the unit is on a recovery state.

Even if we unset the property, a failing unit still is problematic. Chances are remove_from_quorum was never called before the failure, thus still leading to a wrong recovery state on the unit. This is why I'm suggesting to generally have added_to_quorum be a live check against Kafka itself

"""Whether or not this node is added to dynamic quorum in KRaft mode."""
return bool(self.relation_data.get("added-to-quorum", False))


class ZooKeeper(RelationState):
"""State collection metadata for a the Zookeeper relation."""
Expand Down
Loading
Loading