Skip to content

Commit

Permalink
fix IP self-heal
Browse files Browse the repository at this point in the history
  • Loading branch information
zmraul committed Oct 3, 2023
1 parent 3c9ae11 commit 49b5b2d
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 7 deletions.
6 changes: 4 additions & 2 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,15 @@ def _on_update_status(self, event: EventBase) -> None:

# NOTE: integration with kafka-broker-rack-awareness charm.
# Load current properties set in the charm workload and check
# if rack.properties file exists
# if rack.properties file exists.
# Also, check if listeners changed (IP change)
properties = safe_get_file(self.kafka_config.server_properties_filepath)
if properties and (
self.kafka_config.rack_properties != []
(self.kafka_config.rack_properties != [] or not self.kafka_config.listeners_updated)
and (set(self.kafka_config.server_properties) ^ set(properties))
):
self._on_config_changed(event)
self._restart(event)

if not broker_active(
unit=self.unit,
Expand Down
13 changes: 13 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,19 @@ def all_listeners(self) -> List[Listener]:
"""Return a list with all expected listeners."""
return [self.internal_listener] + self.client_listeners

@property
def listeners_updated(self) -> bool:
"""Check if listeners in properties file match the ones expected.
This method can be used to verify if a IP change happened and there is a divergence between
the properties file and actual IP.
"""
properties = safe_get_file(self.server_properties_filepath) or []
advertised_listeners = [listener.advertised_listener for listener in self.all_listeners]
advertised_listeners = f"advertised.listeners={','.join(advertised_listeners)}"

return advertised_listeners in properties

@property
def super_users(self) -> str:
"""Generates all users with super/admin permissions for the cluster from relations.
Expand Down
9 changes: 6 additions & 3 deletions src/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ def _get_partitions_size(self) -> Tuple[int, int]:
f"--bootstrap-server {','.join(self.charm.kafka_config.bootstrap_server)}",
f"--command-config {self.charm.kafka_config.client_properties_filepath}",
]
log_dirs = self.charm.snap.run_bin_command(
bin_keyword="log-dirs", bin_args=log_dirs_command
)
try:
log_dirs = self.charm.snap.run_bin_command(
bin_keyword="log-dirs", bin_args=log_dirs_command
)
except subprocess.CalledProcessError:
return (0, 0)

dirs = {}
for line in log_dirs.splitlines():
Expand Down
6 changes: 4 additions & 2 deletions tests/integration/ha/test_ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ async def test_network_cut(
c_writes: ContinuousWrites,
c_writes_runner: ContinuousWrites,
):
# Let some time pass for messages to be produced
await asyncio.sleep(10)

topic_description = await get_topic_description(
ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME
)
Expand Down Expand Up @@ -349,12 +352,11 @@ async def test_network_cut(
ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME, unit_name=available_unit
)

assert int(next_offsets[-1]) > int(initial_offsets[-1])
assert int(next_offsets[-1]) > int(initial_offsets[-1]), "Messages not increasing"

# Release the network
logger.info(f"Restoring network of broker: {initial_leader_num}")
network_restore(machine_name=leader_machine_name)
await asyncio.sleep(REELECTION_TIME)

async with ops_test.fast_forward():
result = c_writes.stop()
Expand Down

0 comments on commit 49b5b2d

Please sign in to comment.