Skip to content

Commit

Permalink
[DPE-5826] - fix: remove lost+found from new storages (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoppenheimer authored Nov 25, 2024
1 parent cac830e commit 4571d08
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 24 deletions.
12 changes: 10 additions & 2 deletions src/core/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Objects representing the state of KafkaCharm."""

import logging
import os
from functools import cached_property
from ipaddress import IPv4Address, IPv6Address
Expand Down Expand Up @@ -58,6 +59,8 @@
if TYPE_CHECKING:
from charm import KafkaCharm

logger = logging.getLogger(__name__)

custom_secret_groups = SECRET_GROUPS
setattr(custom_secret_groups, "BROKER", "broker")
setattr(custom_secret_groups, "BALANCER", "balancer")
Expand Down Expand Up @@ -393,7 +396,12 @@ def bootstrap_server(self) -> str:
return ""

if self.config.expose_external: # implicitly checks for k8s in structured_config
return self.bootstrap_servers_external
# service might not be created yet by the broker
try:
return self.bootstrap_servers_external
except LightKubeApiError as e:
logger.debug(e)
return ""

return ",".join(
sorted(
Expand All @@ -413,7 +421,7 @@ def controller_quorum_uris(self) -> str:
node_offset = KRAFT_NODE_ID_OFFSET if self.runs_broker else 0
return ",".join(
[
f"{broker.unit_id + node_offset}@{broker.host}:{CONTROLLER_PORT}"
f"{broker.unit_id + node_offset}@{broker.internal_address}:{CONTROLLER_PORT}"
for broker in self.brokers
]
)
Expand Down
8 changes: 0 additions & 8 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,14 +503,6 @@ def internal_address(self) -> str:

return addr

@property
def host(self) -> str:
"""Return the hostname of a unit."""
if self.substrate == "vm":
return self.internal_address
else:
return self.node_ip or self.internal_address

# --- TLS ---

@property
Expand Down
5 changes: 4 additions & 1 deletion src/core/structured_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ def expose_external_validator(cls, value: str) -> str | None:
if SUBSTRATE == "vm":
return

if value == "none":
if value not in ["false", "nodeport"]:
raise ValueError("Value not one of 'false' or 'nodeport'")

if value == "false":
return

return value
Expand Down
2 changes: 1 addition & 1 deletion src/events/balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None:
payload = {
"balancer-username": BALANCER_WEBSERVER_USER,
"balancer-password": self.charm.workload.generate_password(),
"balancer-uris": f"{self.charm.state.unit_broker.host}:{BALANCER_WEBSERVER_PORT}",
"balancer-uris": f"{self.charm.state.unit_broker.internal_address}:{BALANCER_WEBSERVER_PORT}",
}
# Update relation data intra & extra cluster (if it exists)
self.charm.state.cluster.update(payload)
Expand Down
13 changes: 9 additions & 4 deletions src/events/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,6 @@ def _on_storage_attached(self, event: StorageAttachedEvent) -> None:
# set status only for running services, not on startup
# FIXME re-add this
self.workload.exec(["chmod", "-R", "750", f"{self.workload.paths.data_path}"])
self.workload.exec(
["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}"]
)
self.workload.exec(
[
"bash",
Expand All @@ -378,6 +375,13 @@ def _on_storage_attached(self, event: StorageAttachedEvent) -> None:
]
)

# all mounted data dirs should have correct ownership
self.workload.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.data_path}"])

# run this regardless of role, needed for cloud storages + ceph
for storage in self.charm.state.log_dirs.split(","):
self.workload.exec(["rm", "-rf", f"{storage}/lost+found"])

# checks first whether the broker is active before warning
if self.workload.active():
# new dirs won't be used until topic partitions are assigned to it
Expand Down Expand Up @@ -434,8 +438,9 @@ def _init_kraft_mode(self) -> None:
# cluster-uuid is only created on the broker (`cluster-manager` in large deployments)
if not self.charm.state.cluster.cluster_uuid and self.charm.state.runs_broker:
uuid = self.workload.run_bin_command(
bin_keyword="storage", bin_args=["random-uuid", "2>", "/dev/null"]
bin_keyword="storage", bin_args=["random-uuid"]
).strip()

self.charm.state.cluster.update({"cluster-uuid": uuid})
self.charm.state.peer_cluster.update({"cluster-uuid": uuid})

Expand Down
4 changes: 3 additions & 1 deletion src/events/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ def _trusted_relation_joined(self, event: RelationJoinedEvent) -> None:
relation_id=event.relation.id,
)
subject = (
os.uname()[1] if self.charm.substrate == "k8s" else self.charm.state.unit_broker.host
os.uname()[1]
if self.charm.substrate == "k8s"
else self.charm.state.unit_broker.internal_address
)
sans = self.charm.broker.tls_manager.build_sans()
csr = (
Expand Down
8 changes: 6 additions & 2 deletions src/events/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None:

try:
internal_user_credentials = self._create_internal_credentials()
except (KeyError, RuntimeError, subprocess.CalledProcessError, ExecError) as e:
logger.warning(str(e))
except (KeyError, RuntimeError) as e:
logger.warning(e)
event.defer()
return
except (subprocess.CalledProcessError, ExecError) as e:
logger.warning(f"{e.stdout}, {e.stderr}")
event.defer()
return

Expand Down
2 changes: 1 addition & 1 deletion src/managers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def external_listeners(self) -> list[Listener]:
Listener(
auth_map=auth,
scope="EXTERNAL",
host=self.state.unit_broker.host,
host=self.state.unit_broker.node_ip,
# default in case service not created yet during cluster init
# will resolve during config-changed
node_port=node_port,
Expand Down
2 changes: 1 addition & 1 deletion src/managers/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def build_sans(self) -> Sans:
if self.substrate == "vm":
return {
"sans_ip": [
self.state.unit_broker.host,
self.state.unit_broker.internal_address,
],
"sans_dns": [self.state.unit_broker.unit.name, socket.getfqdn()]
+ self._build_extra_sans(),
Expand Down
4 changes: 1 addition & 3 deletions tests/unit/test_kraft.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def test_ready_to_start_no_peer_cluster(charm_configuration, base_state: State):
def test_ready_to_start_missing_data_as_controller(charm_configuration, base_state: State):
# Given
charm_configuration["options"]["roles"]["default"] = "controller"
charm_configuration["options"]["expose_external"]["default"] = "none"
ctx = Context(
KafkaCharm,
meta=METADATA,
Expand All @@ -112,7 +111,6 @@ def test_ready_to_start_missing_data_as_controller(charm_configuration, base_sta
def test_ready_to_start_missing_data_as_broker(charm_configuration, base_state: State):
# Given
charm_configuration["options"]["roles"]["default"] = "broker"
charm_configuration["options"]["expose_external"]["default"] = "none"
ctx = Context(
KafkaCharm,
meta=METADATA,
Expand All @@ -136,7 +134,6 @@ def test_ready_to_start_missing_data_as_broker(charm_configuration, base_state:
def test_ready_to_start(charm_configuration, base_state: State):
# Given
charm_configuration["options"]["roles"]["default"] = "broker,controller"
charm_configuration["options"]["expose_external"]["default"] = "none"
ctx = Context(
KafkaCharm,
meta=METADATA,
Expand All @@ -153,6 +150,7 @@ def test_ready_to_start(charm_configuration, base_state: State):
) as patched_run_bin_command,
patch("health.KafkaHealth.machine_configured", return_value=True),
patch("workload.KafkaWorkload.start"),
patch("workload.KafkaWorkload.active", return_value=True),
patch("charms.operator_libs_linux.v1.snap.SnapCache"),
):
state_out = ctx.run(ctx.on.start(), state_in)
Expand Down

0 comments on commit 4571d08

Please sign in to comment.