Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5232;DPE-5233] feat: support for scaling operations in KRaft mode (single & multi-app) #281

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
20 changes: 20 additions & 0 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ def peer_cluster_orchestrator(self) -> PeerCluster:
{
"controller_quorum_uris": self.cluster.controller_quorum_uris,
"controller_password": self.cluster.controller_password,
"bootstrap_controller": self.cluster.bootstrap_controller,
"bootstrap_unit_id": self.cluster.bootstrap_unit_id,
"bootstrap_replica_id": self.cluster.bootstrap_replica_id,
}
)

Expand All @@ -183,6 +186,9 @@ def peer_cluster(self) -> PeerCluster:
"balancer_uris": self.cluster.balancer_uris,
"controller_quorum_uris": self.cluster.controller_quorum_uris,
"controller_password": self.cluster.controller_password,
"bootstrap_controller": self.cluster.bootstrap_controller,
"bootstrap_unit_id": self.cluster.bootstrap_unit_id,
"bootstrap_replica_id": self.cluster.bootstrap_replica_id,
}
)

Expand Down Expand Up @@ -226,6 +232,13 @@ def unit_broker(self) -> KafkaBroker:
substrate=self.substrate,
)

@property
def kraft_unit_id(self) -> int:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: this needs to be called bootstrap_unit_id to be consistent with models no?

"""Returns current unit ID in KRaft Quorum Manager."""
if self.runs_broker and self.runs_controller:
return KRAFT_NODE_ID_OFFSET + self.unit_broker.unit_id
return self.unit_broker.unit_id

@cached_property
def peer_units_data_interfaces(self) -> dict[Unit, DataPeerOtherUnitData]:
"""The cluster peer relation."""
Expand Down Expand Up @@ -455,6 +468,13 @@ def controller_quorum_uris(self) -> str:
)
return ""

@property
def bootstrap_controller(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that controller_quorum_uris are not being used anymore for the workload and they have been replaced by a combination of bootstrap_controller and kraft_unit_id keys.

If this is the expected use of them, there are a few things that would need to be checked and fixed:

  • Remove controller_quorum_uris from the codebase
  • On ClusterState._broker_status() add the correct checks:
    @property
    def _broker_status(self) -> Status:  # noqa: C901
        """Checks for role=broker specific readiness."""
        . . .

        if self.kraft_mode:
            if not (self.peer_cluster.bootstrap_controller and self.peer_cluster.bootstrap_unit_id):
                return Status.NO_QUORUM_URIS
            if not self.cluster.cluster_uuid:
                return Status.NO_CLUSTER_UUID
        . . .

"""Returns the controller listener in the format HOST:PORT."""
if self.runs_controller:
return f"{self.unit_broker.internal_address}:{CONTROLLER_PORT}"
return ""

@property
def log_dirs(self) -> str:
"""Builds the necessary log.dirs based on mounted storage volumes.
Expand Down
106 changes: 94 additions & 12 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import requests
from charms.data_platform_libs.v0.data_interfaces import (
PROV_SECRET_PREFIX,
Data,
DataPeerData,
DataPeerUnitData,
Expand Down Expand Up @@ -99,6 +100,9 @@ def __init__(
broker_uris: str = "",
cluster_uuid: str = "",
controller_quorum_uris: str = "",
bootstrap_controller: str = "",
bootstrap_unit_id: str = "",
bootstrap_replica_id: str = "",
racks: int = 0,
broker_capacities: BrokerCapacities = {},
zk_username: str = "",
Expand All @@ -115,6 +119,9 @@ def __init__(
self._broker_uris = broker_uris
self._cluster_uuid = cluster_uuid
self._controller_quorum_uris = controller_quorum_uris
self._bootstrap_controller = bootstrap_controller
self._bootstrap_unit_id = bootstrap_unit_id
self._bootstrap_replica_id = bootstrap_replica_id
self._racks = racks
self._broker_capacities = broker_capacities
self._zk_username = zk_username
Expand All @@ -125,6 +132,18 @@ def __init__(
self._balancer_uris = balancer_uris
self._controller_password = controller_password

def _fetch_from_secrets(self, group, field) -> str:
if not self.relation:
return ""

if secrets_uri := self.relation.data[self.relation.app].get(
f"{PROV_SECRET_PREFIX}{group}"
):
if secret := self.data_interface._model.get_secret(id=secrets_uri):
return secret.get_content().get(field, "")

return ""

@property
def roles(self) -> str:
"""All the roles pass from the related application."""
Expand All @@ -145,12 +164,7 @@ def broker_username(self) -> str:
if not self.relation or not self.relation.app:
return ""

return self.data_interface._fetch_relation_data_with_secrets(
component=self.relation.app,
req_secret_fields=BALANCER.requested_secrets,
relation=self.relation,
fields=BALANCER.requested_secrets,
).get("broker-username", "")
return self._fetch_from_secrets("broker", "broker-username")
imanenami marked this conversation as resolved.
Show resolved Hide resolved

@property
def broker_password(self) -> str:
Expand All @@ -161,12 +175,7 @@ def broker_password(self) -> str:
if not self.relation or not self.relation.app:
return ""

return self.data_interface._fetch_relation_data_with_secrets(
component=self.relation.app,
req_secret_fields=BALANCER.requested_secrets,
relation=self.relation,
fields=BALANCER.requested_secrets,
).get("broker-password", "")
return self._fetch_from_secrets("broker", "broker-password")

@property
def broker_uris(self) -> str:
Expand Down Expand Up @@ -232,6 +241,54 @@ def cluster_uuid(self) -> str:
or ""
)

@property
def bootstrap_controller(self) -> str:
"""Bootstrap controller in KRaft mode."""
if self._bootstrap_controller:
return self._bootstrap_controller

if not self.relation or not self.relation.app:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="bootstrap-controller"
)
or ""
)

@property
def bootstrap_unit_id(self) -> str:
"""Bootstrap unit ID in KRaft mode."""
if self._bootstrap_unit_id:
return self._bootstrap_unit_id

if not self.relation or not self.relation.app:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="bootstrap-unit-id"
)
or ""
)

@property
def bootstrap_replica_id(self) -> str:
"""Directory ID of the bootstrap node in KRaft mode."""
if self._bootstrap_replica_id:
return self._bootstrap_replica_id

if not self.relation or not self.relation.app:
return ""

return (
self.data_interface.fetch_relation_field(
relation_id=self.relation.id, field="bootstrap-replica-id"
)
or ""
)

@property
def racks(self) -> int:
"""The number of racks for the brokers."""
Expand Down Expand Up @@ -489,6 +546,21 @@ def cluster_uuid(self) -> str:
"""Cluster uuid used for initializing storages."""
return self.relation_data.get("cluster-uuid", "")

@property
def bootstrap_replica_id(self) -> str:
"""Directory ID of the bootstrap controller."""
return self.relation_data.get("bootstrap-replica-id", "")

@property
def bootstrap_controller(self) -> str:
"""HOST:PORT address of the bootstrap controller."""
return self.relation_data.get("bootstrap-controller", "")

@property
def bootstrap_unit_id(self) -> str:
"""Unit ID of the bootstrap controller."""
return self.relation_data.get("bootstrap-unit-id", "")


class KafkaBroker(RelationState):
"""State collection metadata for a unit."""
Expand Down Expand Up @@ -640,6 +712,16 @@ def node_ip(self) -> str:
"""
return self.k8s.get_node_ip(self.pod_name)

@property
def directory_id(self) -> str:
"""Directory ID of the node as saved in `meta.properties`."""
return self.relation_data.get("directory-id", "")
marcoppenheimer marked this conversation as resolved.
Show resolved Hide resolved

@property
def added_to_quorum(self) -> bool:
Copy link
Contributor

@zmraul zmraul Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of important points here, not really blocking but I think they are good for future improvements

  • Logic around added_to_quorum should probably be checked against the cluster itself, and be tracked as little as possible on the charm.
  • Something like an upgrade or a failure on the unit will not be correctly resolved.
    At the moment there is no unsetting of this property, it will stay as true once the unit is first added to the quorum, so a failing unit would look at the databag, see added_to_quorum=True when the reality is that the unit is on a recovery state.

Even if we unset the property, a failing unit still is problematic. Chances are remove_from_quorum was never called before the failure, thus still leading to a wrong recovery state on the unit. This is why I'm suggesting to generally have added_to_quorum be a live check against Kafka itself

"""Whether or not this node is added to dynamic quorum in KRaft mode."""
return bool(self.relation_data.get("added-to-quorum", False))


class ZooKeeper(RelationState):
"""State collection metadata for a the Zookeeper relation."""
Expand Down
65 changes: 2 additions & 63 deletions src/events/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)

from events.actions import ActionEvents
from events.controller import KRaftHandler
from events.oauth import OAuthHandler
from events.provider import KafkaProvider
from events.upgrade import KafkaDependencyModel, KafkaUpgrade
Expand All @@ -35,7 +36,6 @@
CONTROLLER,
DEPENDENCIES,
GROUP,
INTERNAL_USERS,
PEER,
PROFILE_TESTING,
REL_NAME,
Expand Down Expand Up @@ -94,6 +94,7 @@ def __init__(self, charm) -> None:

self.provider = KafkaProvider(self)
self.oauth = OAuthHandler(self)
self.kraft = KRaftHandler(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question it may be a naive question, but don't we want to enable this based on the if statement above? I would have logically thought that it is either ZooKeeper or Kraft...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could enable it based on ClusterState.kraft_mode which includes condition for peer_cluster role, but there's no harm in enabling it globally because of checks existent in controller handlers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

globally because of checks existent in controller handlers.

Uhm, my preference here would be to have "general" checks in another "general" handler, and the custom ones related to controllers in the KraftHandler handler, which is run only on kraft mode. But double check with the other engineers, this is non-blocking. My only point - from a bit of an outsider perspective - is that the code above looks slightly counter intuitive to me


# MANAGERS

Expand Down Expand Up @@ -167,20 +168,13 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None: # noqa: C901
if not self.upgrade.idle:
return

if self.charm.state.kraft_mode:
self._init_kraft_mode()

# FIXME ready to start probably needs to account for credentials being created beforehand
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment can be removed. Credentials issues on the start process were solved already

current_status = self.charm.state.ready_to_start
if current_status is not Status.ACTIVE:
self.charm._set_status(current_status)
event.defer()
return

if self.charm.state.kraft_mode:
self.config_manager.set_server_properties()
self._format_storages()

self.update_external_services()

# required settings given zookeeper connection config has been created
Expand Down Expand Up @@ -429,61 +423,6 @@ def healthy(self) -> bool:

return True

def _init_kraft_mode(self) -> None:
"""Initialize the server when running controller mode."""
# NOTE: checks for `runs_broker` in this method should be `is_cluster_manager` in
# the large deployment feature.
if not self.model.unit.is_leader():
return

if not self.charm.state.cluster.internal_user_credentials and self.charm.state.runs_broker:
credentials = [
(username, self.charm.workload.generate_password()) for username in INTERNAL_USERS
]
for username, password in credentials:
self.charm.state.cluster.update({f"{username}-password": password})

# cluster-uuid is only created on the broker (`cluster-manager` in large deployments)
if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker:
uuid = self.workload.run_bin_command(
bin_keyword="storage", bin_args=["random-uuid"]
).strip()

self.charm.state.cluster.update({"cluster-uuid": uuid})
self.charm.state.peer_cluster.update({"cluster-uuid": uuid})

# Controller is tasked with populating quorum uris and the `controller` user password
if self.charm.state.runs_controller:
quorum_uris = {"controller-quorum-uris": self.charm.state.controller_quorum_uris}
self.charm.state.cluster.update(quorum_uris)

generated_password = self.charm.workload.generate_password()

if self.charm.state.peer_cluster_orchestrator:
self.charm.state.peer_cluster_orchestrator.update(quorum_uris)

if not self.charm.state.peer_cluster_orchestrator.controller_password:
self.charm.state.peer_cluster_orchestrator.update(
{"controller-password": generated_password}
)
elif not self.charm.state.peer_cluster.controller_password:
# single mode, controller & leader
self.charm.state.cluster.update({"controller-password": generated_password})

def _format_storages(self) -> None:
"""Format storages provided relevant keys exist."""
if self.charm.state.runs_broker:
credentials = self.charm.state.cluster.internal_user_credentials
elif self.charm.state.runs_controller:
credentials = {
self.charm.state.peer_cluster.broker_username: self.charm.state.peer_cluster.broker_password
}

self.workload.format_storages(
uuid=self.charm.state.peer_cluster.cluster_uuid,
internal_user_credentials=credentials,
)

def update_external_services(self) -> None:
"""Attempts to update any external Kubernetes services."""
if not self.charm.substrate == "k8s":
Expand Down
Loading
Loading