From 49b5b2d751b53d92f22cd09a0daf6c9c863db493 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Tue, 3 Oct 2023 12:02:21 +0200 Subject: [PATCH] fix IP self-heal --- src/charm.py | 6 ++++-- src/config.py | 13 +++++++++++++ src/health.py | 9 ++++++--- tests/integration/ha/test_ha.py | 6 ++++-- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/charm.py b/src/charm.py index 113739de..de808800 100755 --- a/src/charm.py +++ b/src/charm.py @@ -206,13 +206,15 @@ def _on_update_status(self, event: EventBase) -> None: # NOTE: integration with kafka-broker-rack-awareness charm. # Load current properties set in the charm workload and check - # if rack.properties file exists + # if rack.properties file exists. + # Also, check if listeners changed (IP change) properties = safe_get_file(self.kafka_config.server_properties_filepath) if properties and ( - self.kafka_config.rack_properties != [] + (self.kafka_config.rack_properties != [] or not self.kafka_config.listeners_updated) and (set(self.kafka_config.server_properties) ^ set(properties)) ): self._on_config_changed(event) + self._restart(event) if not broker_active( unit=self.unit, diff --git a/src/config.py b/src/config.py index cd9dd21b..dfb5b615 100644 --- a/src/config.py +++ b/src/config.py @@ -416,6 +416,19 @@ def all_listeners(self) -> List[Listener]: """Return a list with all expected listeners.""" return [self.internal_listener] + self.client_listeners + @property + def listeners_updated(self) -> bool: + """Check if listeners in properties file match the ones expected. + + This method can be used to verify if a IP change happened and there is a divergence between + the properties file and actual IP. + """ + properties = safe_get_file(self.server_properties_filepath) or [] + advertised_listeners = [listener.advertised_listener for listener in self.all_listeners] + advertised_listeners = f"advertised.listeners={','.join(advertised_listeners)}" + + return advertised_listeners in properties + @property def super_users(self) -> str: """Generates all users with super/admin permissions for the cluster from relations. diff --git a/src/health.py b/src/health.py index 759ae8df..e7996659 100644 --- a/src/health.py +++ b/src/health.py @@ -84,9 +84,12 @@ def _get_partitions_size(self) -> Tuple[int, int]: f"--bootstrap-server {','.join(self.charm.kafka_config.bootstrap_server)}", f"--command-config {self.charm.kafka_config.client_properties_filepath}", ] - log_dirs = self.charm.snap.run_bin_command( - bin_keyword="log-dirs", bin_args=log_dirs_command - ) + try: + log_dirs = self.charm.snap.run_bin_command( + bin_keyword="log-dirs", bin_args=log_dirs_command + ) + except subprocess.CalledProcessError: + return (0, 0) dirs = {} for line in log_dirs.splitlines(): diff --git a/tests/integration/ha/test_ha.py b/tests/integration/ha/test_ha.py index fe681dd6..fd3f3a2e 100644 --- a/tests/integration/ha/test_ha.py +++ b/tests/integration/ha/test_ha.py @@ -315,6 +315,9 @@ async def test_network_cut( c_writes: ContinuousWrites, c_writes_runner: ContinuousWrites, ): + # Let some time pass for messages to be produced + await asyncio.sleep(10) + topic_description = await get_topic_description( ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME ) @@ -349,12 +352,11 @@ async def test_network_cut( ops_test=ops_test, topic=ContinuousWrites.TOPIC_NAME, unit_name=available_unit ) - assert int(next_offsets[-1]) > int(initial_offsets[-1]) + assert int(next_offsets[-1]) > int(initial_offsets[-1]), "Messages not increasing" # Release the network logger.info(f"Restoring network of broker: {initial_leader_num}") network_restore(machine_name=leader_machine_name) - await asyncio.sleep(REELECTION_TIME) async with ops_test.fast_forward(): result = c_writes.stop()