From 5cf96e570c543066ecca7f3bf0bb6fde73f6d781 Mon Sep 17 00:00:00 2001 From: Tomas Jelinek Date: Mon, 7 Oct 2024 17:19:32 +0200 Subject: [PATCH] refactor: ha_cluster_info: cleanup Implementing changes proposed in code review --- library/ha_cluster_info.py | 42 +- module_utils/ha_cluster_lsr/info/exporter.py | 131 ++++-- module_utils/ha_cluster_lsr/info/loader.py | 11 +- tests/unit/test_ha_cluster_info.py | 81 ++-- tests/unit/test_info_exporter.py | 436 +++++++++++-------- tests/unit/test_info_loader.py | 26 +- 6 files changed, 423 insertions(+), 304 deletions(-) diff --git a/library/ha_cluster_info.py b/library/ha_cluster_info.py index f6c3e45..653a861 100644 --- a/library/ha_cluster_info.py +++ b/library/ha_cluster_info.py @@ -104,7 +104,11 @@ def export_cluster_configuration(module: AnsibleModule) -> Dict[str, Any]: result: dict[str, Any] = dict() cmd_runner = get_cmd_runner(module) - result["ha_cluster_start_on_boot"] = loader.get_start_on_boot(cmd_runner) + corosync_enabled = loader.is_service_enabled(cmd_runner, "corosync") + pacemaker_enabled = loader.is_service_enabled(cmd_runner, "pacemaker") + result["ha_cluster_start_on_boot"] = exporter.export_start_on_boot( + corosync_enabled, pacemaker_enabled + ) # Corosync config is availabe via CLI since pcs-0.10.8, via API v2 since # pcs-0.12.0 and pcs-0.11.9. For old pcs versions, CLI must be used, and @@ -118,25 +122,23 @@ def export_cluster_configuration(module: AnsibleModule) -> Dict[str, Any]: known_hosts_pcs = loader.get_pcsd_known_hosts() # Convert corosync config to role format - corosync_conf_role = exporter.export_corosync_options(corosync_conf_pcs) - for key in ( - "ha_cluster_cluster_name", - "ha_cluster_transport", - "ha_cluster_totem", - "ha_cluster_quorum", - ): - if key in corosync_conf_role: - result[key] = corosync_conf_role[key] - - # Convert cluster definition to role format - try: - result["ha_cluster_node_options"] = exporter.export_cluster_nodes( - corosync_conf_pcs["nodes"], known_hosts_pcs - ) - except KeyError as e: - raise exporter.JsonMissingKey( - e.args[0], corosync_conf_pcs, "corosync configuration" - ) from e + result["ha_cluster_cluster_name"] = exporter.export_corosync_cluster_name( + corosync_conf_pcs + ) + result["ha_cluster_transport"] = exporter.export_corosync_transport( + corosync_conf_pcs + ) + exported_totem = exporter.export_corosync_totem(corosync_conf_pcs) + if exported_totem: + result["ha_cluster_totem"] = exported_totem + exported_quorum = exporter.export_corosync_quorum(corosync_conf_pcs) + if exported_quorum: + result["ha_cluster_quorum"] = exported_quorum + + # Convert nodes definition to role format + result["ha_cluster_node_options"] = exporter.export_cluster_nodes( + corosync_conf_pcs, known_hosts_pcs + ) return result diff --git a/module_utils/ha_cluster_lsr/info/exporter.py b/module_utils/ha_cluster_lsr/info/exporter.py index 78dcfdb..8e8f0f5 100644 --- a/module_utils/ha_cluster_lsr/info/exporter.py +++ b/module_utils/ha_cluster_lsr/info/exporter.py @@ -11,7 +11,8 @@ # pylint: disable=invalid-name __metaclass__ = type -from typing import Any, Dict, List +from contextlib import contextmanager +from typing import Any, Dict, Iterator, List class JsonMissingKey(Exception): @@ -39,18 +40,42 @@ def _dict_to_nv_list(input_dict: Dict[str, Any]) -> List[Dict[str, Any]]: return [dict(name=name, value=value) for name, value in input_dict.items()] -def export_corosync_options( +@contextmanager +def _handle_missing_key(data: Dict[str, Any], data_desc: str) -> Iterator[None]: + try: + yield + except KeyError as e: + raise JsonMissingKey(e.args[0], data, data_desc) from e + + +def export_start_on_boot( + corosync_enabled: bool, pacemaker_enabled: bool +) -> bool: + """ + Transform cluster servis status to start_on_boot + """ + return corosync_enabled or pacemaker_enabled + + +def export_corosync_cluster_name(corosync_conf_dict: Dict[str, Any]) -> str: + """ + Extract cluster name form corosync config in pcs format + + corosync_conf_dict -- corosync config structure provided by pcs + """ + with _handle_missing_key(corosync_conf_dict, "corosync configuration"): + return corosync_conf_dict["cluster_name"] + + +def export_corosync_transport( corosync_conf_dict: Dict[str, Any] ) -> Dict[str, Any]: """ - Transform corosync config from pcs format to role format excluding nodes + Export transport options in role format from corosync config in pcs format corosync_conf_dict -- corosync config structure provided by pcs """ - result: Dict[str, Any] = dict() - try: - result["ha_cluster_cluster_name"] = corosync_conf_dict["cluster_name"] - + with _handle_missing_key(corosync_conf_dict, "corosync configuration"): transport = dict(type=corosync_conf_dict["transport"].lower()) if corosync_conf_dict["transport_options"]: transport["options"] = _dict_to_nv_list( @@ -71,26 +96,43 @@ def export_corosync_options( transport["crypto"] = _dict_to_nv_list( corosync_conf_dict["crypto_options"] ) - result["ha_cluster_transport"] = transport + return transport + +def export_corosync_totem(corosync_conf_dict: Dict[str, Any]) -> Dict[str, Any]: + """ + Export totem options in role format from corosync config in pcs format + + corosync_conf_dict -- corosync config structure provided by pcs + """ + with _handle_missing_key(corosync_conf_dict, "corosync configuration"): + result: Dict[str, Any] = dict() if corosync_conf_dict["totem_options"]: - result["ha_cluster_totem"] = dict( - options=_dict_to_nv_list(corosync_conf_dict["totem_options"]) + result["options"] = _dict_to_nv_list( + corosync_conf_dict["totem_options"] ) + return result + + +def export_corosync_quorum( + corosync_conf_dict: Dict[str, Any] +) -> Dict[str, Any]: + """ + Export quorum options in role format from corosync config in pcs format + corosync_conf_dict -- corosync config structure provided by pcs + """ + with _handle_missing_key(corosync_conf_dict, "corosync configuration"): + result: Dict[str, Any] = dict() if corosync_conf_dict["quorum_options"]: - result["ha_cluster_quorum"] = dict( - options=_dict_to_nv_list(corosync_conf_dict["quorum_options"]) + result["options"] = _dict_to_nv_list( + corosync_conf_dict["quorum_options"] ) - except KeyError as e: - raise JsonMissingKey( - e.args[0], corosync_conf_dict, "corosync configuration" - ) from e - return result + return result def export_cluster_nodes( - corosync_conf_nodes: List[Dict[str, Any]], pcs_node_addr: Dict[str, str] + corosync_conf_dict: Dict[str, Any], pcs_node_addr: Dict[str, str] ) -> List[Dict[str, Any]]: """ Transform node configuration from pcs format to role format @@ -98,31 +140,30 @@ def export_cluster_nodes( corosync_conf_dict -- corosync config structure provided by pcs pcs_node_addr -- dict holding pcs address for cluster nodes """ - node_list: List[Dict[str, Any]] = [] - if not corosync_conf_nodes: - return node_list - for index, node_dict in enumerate(corosync_conf_nodes): - # corosync node configuration - try: - one_node = dict( - node_name=node_dict["name"], - corosync_addresses=[ - addr_dict["addr"] - for addr_dict in sorted( - node_dict["addrs"], - key=lambda item: item["link"], - ) - ], - ) - except KeyError as e: - raise JsonMissingKey( - e.args[0], - dict(nodes=corosync_conf_nodes), + with _handle_missing_key(corosync_conf_dict, "corosync configuration"): + node_list: List[Dict[str, Any]] = [] + corosync_nodes = corosync_conf_dict["nodes"] + if not corosync_nodes: + return node_list + for index, node_dict in enumerate(corosync_nodes): + # corosync node configuration + with _handle_missing_key( + corosync_conf_dict, f"corosync configuration for node on index {index}", - ) from e - # pcs node configuration - if one_node["node_name"] in pcs_node_addr: - one_node["pcs_address"] = pcs_node_addr[one_node["node_name"]] - # finish one node export - node_list.append(one_node) - return node_list + ): + one_node = dict( + node_name=node_dict["name"], + corosync_addresses=[ + addr_dict["addr"] + for addr_dict in sorted( + node_dict["addrs"], + key=lambda item: item["link"], + ) + ], + ) + # pcs node configuration + if one_node["node_name"] in pcs_node_addr: + one_node["pcs_address"] = pcs_node_addr[one_node["node_name"]] + # finish one node export + node_list.append(one_node) + return node_list diff --git a/module_utils/ha_cluster_lsr/info/loader.py b/module_utils/ha_cluster_lsr/info/loader.py index f755826..408566e 100644 --- a/module_utils/ha_cluster_lsr/info/loader.py +++ b/module_utils/ha_cluster_lsr/info/loader.py @@ -85,7 +85,7 @@ def kwargs(self) -> Dict[str, Any]: ) -def _is_service_enabled(run_command: CommandRunner, service: str) -> bool: +def is_service_enabled(run_command: CommandRunner, service: str) -> bool: """ Check whether a specified service is enabled in the OS @@ -102,15 +102,6 @@ def _is_service_enabled(run_command: CommandRunner, service: str) -> bool: return rc == 0 -def get_start_on_boot(run_command: CommandRunner) -> bool: - """ - Detect wheter a cluster is configured to start on boot - """ - return _is_service_enabled(run_command, "corosync") or _is_service_enabled( - run_command, "pacemaker" - ) - - def _call_pcs_cli( run_command: CommandRunner, command: List[str] ) -> Dict[str, Any]: diff --git a/tests/unit/test_ha_cluster_info.py b/tests/unit/test_ha_cluster_info.py index 52443dc..e1fbe1e 100644 --- a/tests/unit/test_ha_cluster_info.py +++ b/tests/unit/test_ha_cluster_info.py @@ -11,6 +11,7 @@ import json import sys from importlib import import_module +from typing import List from unittest import TestCase, mock sys.modules["ansible.module_utils.ha_cluster_lsr"] = import_module( @@ -23,10 +24,28 @@ class ExportClusterConfiguration(TestCase): maxDiff = None - @mock.patch("ha_cluster_info.loader.get_pcsd_known_hosts") - def test_export_minimal( + @staticmethod + def fixture_expected_runner_calls() -> List[mock._Call]: + common_args = dict(check_rc=False, environ_update={"LC_ALL": "C"}) + return [ + mock.call( + ["systemctl", "is-enabled", "corosync.service"], **common_args + ), + mock.call( + ["systemctl", "is-enabled", "pacemaker.service"], **common_args + ), + mock.call( + ["pcs", "cluster", "config", "--output-format=json"], + **common_args, + ), + ] + + def assert_export_minimal( self, mock_load_pcsd_known_hosts: mock.Mock, + corosync_enabled: bool, + pacemaker_enabled: bool, + cluster_start_on_boot: bool, ) -> None: module_mock = mock.Mock() module_mock.run_command = mock.Mock() @@ -50,7 +69,8 @@ def test_export_minimal( ], ) runner_mock.side_effect = [ - (0, "", ""), + (0 if corosync_enabled else 1, "", ""), + (0 if pacemaker_enabled else 1, "", ""), (0, json.dumps(corosync_conf_data), ""), ] @@ -59,7 +79,7 @@ def test_export_minimal( self.assertEqual( ha_cluster_info.export_cluster_configuration(module_mock), dict( - ha_cluster_start_on_boot=True, + ha_cluster_start_on_boot=cluster_start_on_boot, ha_cluster_cluster_name="my-cluster", ha_cluster_transport=dict(type="knet"), ha_cluster_node_options=[ @@ -71,21 +91,30 @@ def test_export_minimal( ), ) - common_args = dict(check_rc=False, environ_update={"LC_ALL": "C"}) - expected_calls = [ - mock.call( - ["systemctl", "is-enabled", "corosync.service"], **common_args - ), - mock.call( - ["pcs", "cluster", "config", "--output-format=json"], - **common_args, - ), - ] + expected_calls = self.fixture_expected_runner_calls() runner_mock.assert_has_calls(expected_calls) self.assertEqual(runner_mock.call_count, len(expected_calls)) mock_load_pcsd_known_hosts.assert_called_once_with() + @mock.patch("ha_cluster_info.loader.get_pcsd_known_hosts") + def test_export_minimal_enabled( + self, + mock_load_pcsd_known_hosts: mock.Mock, + ) -> None: + self.assert_export_minimal( + mock_load_pcsd_known_hosts, True, False, True + ) + + @mock.patch("ha_cluster_info.loader.get_pcsd_known_hosts") + def test_export_minimal_disabled( + self, + mock_load_pcsd_known_hosts: mock.Mock, + ) -> None: + self.assert_export_minimal( + mock_load_pcsd_known_hosts, False, False, False + ) + @mock.patch("ha_cluster_info.loader.get_pcsd_known_hosts") def test_export( self, @@ -118,6 +147,7 @@ def test_export( ], ) runner_mock.side_effect = [ + (0, "", ""), (0, "", ""), (0, json.dumps(corosync_conf_data), ""), ] @@ -157,16 +187,7 @@ def test_export( ), ) - common_args = dict(check_rc=False, environ_update={"LC_ALL": "C"}) - expected_calls = [ - mock.call( - ["systemctl", "is-enabled", "corosync.service"], **common_args - ), - mock.call( - ["pcs", "cluster", "config", "--output-format=json"], - **common_args, - ), - ] + expected_calls = self.fixture_expected_runner_calls() runner_mock.assert_has_calls(expected_calls) self.assertEqual(runner_mock.call_count, len(expected_calls)) @@ -192,6 +213,7 @@ def test_missing_corosync_nodes_key( quorum_options=dict(), ) runner_mock.side_effect = [ + (0, "", ""), (0, "", ""), (0, json.dumps(corosync_conf_data), ""), ] @@ -212,16 +234,7 @@ def test_missing_corosync_nodes_key( ), ) - common_args = dict(check_rc=False, environ_update={"LC_ALL": "C"}) - expected_calls = [ - mock.call( - ["systemctl", "is-enabled", "corosync.service"], **common_args - ), - mock.call( - ["pcs", "cluster", "config", "--output-format=json"], - **common_args, - ), - ] + expected_calls = self.fixture_expected_runner_calls() runner_mock.assert_has_calls(expected_calls) self.assertEqual(runner_mock.call_count, len(expected_calls)) diff --git a/tests/unit/test_info_exporter.py b/tests/unit/test_info_exporter.py index 4680e0d..a8d5106 100644 --- a/tests/unit/test_info_exporter.py +++ b/tests/unit/test_info_exporter.py @@ -9,7 +9,7 @@ import sys from importlib import import_module -from typing import Any, Dict, List +from typing import Any, Dict from unittest import TestCase sys.modules["ansible.module_utils.ha_cluster_lsr"] = import_module( @@ -40,167 +40,152 @@ def test_two_items(self) -> None: ) -class ExportCorosyncConf(TestCase): +class ExportStartOnBoot(TestCase): + def test_main(self) -> None: + self.assertFalse(exporter.export_start_on_boot(False, False)) + self.assertTrue(exporter.export_start_on_boot(False, True)) + self.assertTrue(exporter.export_start_on_boot(True, False)) + self.assertTrue(exporter.export_start_on_boot(True, True)) + + +class ExportCorosyncClusterName(TestCase): maxDiff = None - def assert_missing_key(self, data: Dict[str, Any], key: str) -> None: + def test_missing_key(self) -> None: + corosync_data: Dict[str, Any] = dict() with self.assertRaises(exporter.JsonMissingKey) as cm: - exporter.export_corosync_options(data) + exporter.export_corosync_cluster_name(corosync_data) self.assertEqual( cm.exception.kwargs, - dict(data=data, key=key, data_desc="corosync configuration"), + dict( + data=corosync_data, + key="cluster_name", + data_desc="corosync configuration", + ), ) - def test_missing_keys(self) -> None: - self.assert_missing_key(dict(), "cluster_name") - self.assert_missing_key(dict(cluster_name="x"), "transport") - self.assert_missing_key( - dict(cluster_name="x", transport="x"), "transport_options" + def test_minimal(self) -> None: + corosync_data: Dict[str, Any] = dict( + cluster_name="my-cluster", ) + role_data = exporter.export_corosync_cluster_name(corosync_data) + self.assertEqual(role_data, "my-cluster") + + +class ExportCorosyncTransport(TestCase): + maxDiff = None + + def assert_missing_key( + self, corosync_data: Dict[str, Any], key: str + ) -> None: + with self.assertRaises(exporter.JsonMissingKey) as cm: + exporter.export_corosync_transport(corosync_data) + self.assertEqual( + cm.exception.kwargs, + dict( + data=corosync_data, key=key, data_desc="corosync configuration" + ), + ) + + def test_missing_key(self) -> None: self.assert_missing_key( - dict(cluster_name="x", transport="x", transport_options=dict()), - "links_options", + dict(), + "transport", ) self.assert_missing_key( dict( - cluster_name="x", transport="x", - transport_options=dict(), - links_options=dict(), ), - "compression_options", + "transport_options", ) self.assert_missing_key( dict( - cluster_name="x", transport="x", transport_options=dict(), - links_options=dict(), - compression_options=dict(), ), - "crypto_options", + "links_options", ) self.assert_missing_key( dict( - cluster_name="x", transport="x", transport_options=dict(), links_options=dict(), - compression_options=dict(), - crypto_options=dict(), ), - "totem_options", + "compression_options", ) self.assert_missing_key( dict( - cluster_name="x", transport="x", transport_options=dict(), links_options=dict(), compression_options=dict(), - crypto_options=dict(), - totem_options=dict(), ), - "quorum_options", + "crypto_options", ) def test_minimal(self) -> None: - pcs_data = dict( - cluster_name="my-cluster", + corosync_data: Dict[str, Any] = dict( transport="KNET", transport_options=dict(), links_options=dict(), compression_options=dict(), crypto_options=dict(), - totem_options=dict(), - quorum_options=dict(), - ) - role_data = exporter.export_corosync_options(pcs_data) - self.assertEqual( - role_data, - dict( - ha_cluster_cluster_name="my-cluster", - ha_cluster_transport=dict(type="knet"), - ), ) + role_data = exporter.export_corosync_transport(corosync_data) + self.assertEqual(role_data, dict(type="knet")) def test_simple_options_mirroring(self) -> None: - pcs_data = dict( - cluster_name="my-cluster", + corosync_data: Dict[str, Any] = dict( transport="KNET", - totem_options=dict(totem1="a", totem2="b"), transport_options=dict(transport1="c", transport2="d"), compression_options=dict(compression1="e", compression2="f"), crypto_options=dict(crypto1="g", crypto2="h"), - quorum_options=dict(quorum1="i", quorum2="j"), links_options=dict(), ) - role_data = exporter.export_corosync_options(pcs_data) + role_data = exporter.export_corosync_transport(corosync_data) self.assertEqual( role_data, dict( - ha_cluster_cluster_name="my-cluster", - ha_cluster_transport=dict( - type="knet", - options=[ - dict(name="transport1", value="c"), - dict(name="transport2", value="d"), - ], - compression=[ - dict(name="compression1", value="e"), - dict(name="compression2", value="f"), - ], - crypto=[ - dict(name="crypto1", value="g"), - dict(name="crypto2", value="h"), - ], - ), - ha_cluster_totem=dict( - options=[ - dict(name="totem1", value="a"), - dict(name="totem2", value="b"), - ], - ), - ha_cluster_quorum=dict( - options=[ - dict(name="quorum1", value="i"), - dict(name="quorum2", value="j"), - ], - ), + type="knet", + options=[ + dict(name="transport1", value="c"), + dict(name="transport2", value="d"), + ], + compression=[ + dict(name="compression1", value="e"), + dict(name="compression2", value="f"), + ], + crypto=[ + dict(name="crypto1", value="g"), + dict(name="crypto2", value="h"), + ], ), ) def test_one_link(self) -> None: - pcs_data = dict( - cluster_name="my-cluster", + corosync_data: Dict[str, Any] = dict( transport="KNET", transport_options=dict(), links_options={"0": dict(name1="value1", name2="value2")}, compression_options=dict(), crypto_options=dict(), - totem_options=dict(), - quorum_options=dict(), ) - role_data = exporter.export_corosync_options(pcs_data) + role_data = exporter.export_corosync_transport(corosync_data) self.assertEqual( role_data, dict( - ha_cluster_cluster_name="my-cluster", - ha_cluster_transport=dict( - type="knet", - links=[ - [ - dict(name="name1", value="value1"), - dict(name="name2", value="value2"), - ] - ], - ), + type="knet", + links=[ + [ + dict(name="name1", value="value1"), + dict(name="name2", value="value2"), + ] + ], ), ) def test_more_links(self) -> None: - pcs_data = dict( - cluster_name="my-cluster", + corosync_data: Dict[str, Any] = dict( transport="KNET", transport_options=dict(), links_options={ @@ -210,31 +195,104 @@ def test_more_links(self) -> None: }, compression_options=dict(), crypto_options=dict(), + ) + role_data = exporter.export_corosync_transport(corosync_data) + self.assertEqual( + role_data, + dict( + type="knet", + links=[ + [ + dict(name="linknumber", value="0"), + dict(name="name0", value="value0"), + ], + [ + dict(name="linknumber", value="7"), + dict(name="name7", value="value7"), + ], + [ + dict(name="linknumber", value="3"), + dict(name="name3", value="value3"), + ], + ], + ), + ) + + +class ExportCorosyncTotem(TestCase): + maxDiff = None + + def test_missing_key(self) -> None: + corosync_data: Dict[str, Any] = dict() + with self.assertRaises(exporter.JsonMissingKey) as cm: + exporter.export_corosync_totem(corosync_data) + self.assertEqual( + cm.exception.kwargs, + dict( + data=corosync_data, + key="totem_options", + data_desc="corosync configuration", + ), + ) + + def test_minimal(self) -> None: + corosync_data: Dict[str, Any] = dict( totem_options=dict(), + ) + role_data = exporter.export_corosync_totem(corosync_data) + self.assertEqual(role_data, dict()) + + def test_simple_options_mirroring(self) -> None: + corosync_data: Dict[str, Any] = dict( + totem_options=dict(totem1="a", totem2="b"), + ) + role_data = exporter.export_corosync_totem(corosync_data) + self.assertEqual( + role_data, + dict( + options=[ + dict(name="totem1", value="a"), + dict(name="totem2", value="b"), + ], + ), + ) + + +class ExportCorosyncQuorum(TestCase): + maxDiff = None + + def test_missing_key(self) -> None: + corosync_data: Dict[str, Any] = dict() + with self.assertRaises(exporter.JsonMissingKey) as cm: + exporter.export_corosync_quorum(corosync_data) + self.assertEqual( + cm.exception.kwargs, + dict( + data=corosync_data, + key="quorum_options", + data_desc="corosync configuration", + ), + ) + + def test_minimal(self) -> None: + corosync_data: Dict[str, Any] = dict( quorum_options=dict(), ) - role_data = exporter.export_corosync_options(pcs_data) + role_data = exporter.export_corosync_quorum(corosync_data) + self.assertEqual(role_data, dict()) + + def test_simple_options_mirroring(self) -> None: + corosync_data: Dict[str, Any] = dict( + quorum_options=dict(quorum1="i", quorum2="j"), + ) + role_data = exporter.export_corosync_quorum(corosync_data) self.assertEqual( role_data, dict( - ha_cluster_cluster_name="my-cluster", - ha_cluster_transport=dict( - type="knet", - links=[ - [ - dict(name="linknumber", value="0"), - dict(name="name0", value="value0"), - ], - [ - dict(name="linknumber", value="7"), - dict(name="name7", value="value7"), - ], - [ - dict(name="linknumber", value="3"), - dict(name="name3", value="value3"), - ], - ], - ), + options=[ + dict(name="quorum1", value="i"), + dict(name="quorum2", value="j"), + ], ), ) @@ -242,49 +300,77 @@ def test_more_links(self) -> None: class ExportClusterNodes(TestCase): maxDiff = None - def assert_missing_key( - self, data: List[Dict[str, Any]], key: str, index: str = "0" + def assert_missing_node_key( + self, corosync_data: Dict[str, Any], key: str, index: int = 0 ) -> None: with self.assertRaises(exporter.JsonMissingKey) as cm: - exporter.export_cluster_nodes(data, {}) + exporter.export_cluster_nodes(corosync_data, {}) self.assertEqual( cm.exception.kwargs, dict( - data=dict(nodes=data), + data=corosync_data, key=key, data_desc=f"corosync configuration for node on index {index}", ), ) + def test_missing_key(self) -> None: + corosync_data: Dict[str, Any] = dict() + with self.assertRaises(exporter.JsonMissingKey) as cm: + exporter.export_cluster_nodes(corosync_data, {}) + self.assertEqual( + cm.exception.kwargs, + dict( + data=corosync_data, + key="nodes", + data_desc="corosync configuration", + ), + ) + def test_no_nodes(self) -> None: - self.assertEqual(exporter.export_cluster_nodes([], {}), []) + self.assertEqual( + exporter.export_cluster_nodes(dict(nodes=[]), {}), + [], + ) def test_corosync_nodes_missing_keys(self) -> None: - corosync_data: List[Dict[str, Any]] = [dict()] - self.assert_missing_key(corosync_data, "name") + corosync_data: Dict[str, Any] = dict(nodes=[dict()]) + self.assert_missing_node_key(corosync_data, "name") - corosync_data = [dict(name="nodename")] - self.assert_missing_key(corosync_data, "addrs") + corosync_data = dict(nodes=[dict(name="nodename")]) + self.assert_missing_node_key(corosync_data, "addrs") - corosync_data = [dict(name="nodename", addrs=[dict()])] - self.assert_missing_key(corosync_data, "link") + corosync_data = dict(nodes=[dict(name="nodename", addrs=[dict()])]) + self.assert_missing_node_key(corosync_data, "link") - corosync_data = [dict(name="nodename", addrs=[dict(link="0")])] - self.assert_missing_key(corosync_data, "addr") + corosync_data = dict( + nodes=[dict(name="nodename", addrs=[dict(link="0")])] + ) + self.assert_missing_node_key(corosync_data, "addr") + + corosync_data = dict( + nodes=[ + dict(name="nodename", addrs=[dict(link="0", addr="addr1")]), + dict(name="node2"), + ] + ) + self.assert_missing_node_key(corosync_data, "addrs", 1) def test_corosync_nodes_one_link(self) -> None: - corosync_data = [ - dict( - name="node1", - nodeid=1, - addrs=[dict(addr="node1addr", link="0", type="IPv4")], - ), - dict( - name="node2", - nodeid=2, - addrs=[dict(addr="node2addr", link="0", type="FQDN")], - ), - ] + corosync_data: Dict[str, Any] = dict( + nodes=[ + dict( + name="node1", + nodeid=1, + addrs=[dict(addr="node1addr", link="0", type="IPv4")], + ), + dict( + name="node2", + nodeid=2, + addrs=[dict(addr="node2addr", link="0", type="FQDN")], + ), + ] + ) role_data = exporter.export_cluster_nodes(corosync_data, {}) self.assertEqual( role_data, @@ -295,24 +381,26 @@ def test_corosync_nodes_one_link(self) -> None: ) def test_corosync_nodes_multiple_links(self) -> None: - corosync_data = [ - dict( - name="node1", - nodeid=1, - addrs=[ - dict(addr="node1addr1", link="0", type="IPv4"), - dict(addr="node1addr2", link="1", type="IPv6"), - ], - ), - dict( - name="node2", - nodeid=2, - addrs=[ - dict(addr="node2addr1", link="0", type="IPv4"), - dict(addr="node2addr2", link="1", type="IPv6"), - ], - ), - ] + corosync_data: Dict[str, Any] = dict( + nodes=[ + dict( + name="node1", + nodeid=1, + addrs=[ + dict(addr="node1addr1", link="0", type="IPv4"), + dict(addr="node1addr2", link="1", type="IPv6"), + ], + ), + dict( + name="node2", + nodeid=2, + addrs=[ + dict(addr="node2addr1", link="0", type="IPv4"), + dict(addr="node2addr2", link="1", type="IPv6"), + ], + ), + ] + ) role_data = exporter.export_cluster_nodes(corosync_data, {}) self.assertEqual( role_data, @@ -329,13 +417,15 @@ def test_corosync_nodes_multiple_links(self) -> None: ) def test_corosync_nodes_no_address(self) -> None: - corosync_data = [ - dict( - name="node1", - nodeid=1, - addrs=[], - ), - ] + corosync_data: Dict[str, Any] = dict( + nodes=[ + dict( + name="node1", + nodeid=1, + addrs=[], + ), + ] + ) role_data = exporter.export_cluster_nodes(corosync_data, {}) self.assertEqual( role_data, @@ -345,7 +435,7 @@ def test_corosync_nodes_no_address(self) -> None: ) def test_pcs_nodes_no_cluster_nodes(self) -> None: - corosync_data: List[Dict[str, Any]] = [] + corosync_data: Dict[str, Any] = dict(nodes=[]) pcs_data = dict(node1="node1A") role_data = exporter.export_cluster_nodes(corosync_data, pcs_data) self.assertEqual( @@ -354,18 +444,20 @@ def test_pcs_nodes_no_cluster_nodes(self) -> None: ) def test_pcs_nodes(self) -> None: - corosync_data = [ - dict( - name="node1", - nodeid=1, - addrs=[dict(addr="node1addr", link="0", type="FQDN")], - ), - dict( - name="node2", - nodeid=2, - addrs=[dict(addr="node2addr", link="0", type="FQDN")], - ), - ] + corosync_data: Dict[str, Any] = dict( + nodes=[ + dict( + name="node1", + nodeid=1, + addrs=[dict(addr="node1addr", link="0", type="FQDN")], + ), + dict( + name="node2", + nodeid=2, + addrs=[dict(addr="node2addr", link="0", type="FQDN")], + ), + ] + ) pcs_data = dict(node1="node1A", node3="node3A") role_data = exporter.export_cluster_nodes(corosync_data, pcs_data) self.assertEqual( diff --git a/tests/unit/test_info_loader.py b/tests/unit/test_info_loader.py index 05268df..4316b17 100644 --- a/tests/unit/test_info_loader.py +++ b/tests/unit/test_info_loader.py @@ -20,15 +20,12 @@ class IsServiceEnabled(TestCase): - # pylint: disable=protected-access def setUp(self) -> None: self.runner_mock = mock.Mock() def test_is_enabled(self) -> None: self.runner_mock.return_value = (0, "enabled", "") - self.assertTrue( - loader._is_service_enabled(self.runner_mock, "corosync") - ) + self.assertTrue(loader.is_service_enabled(self.runner_mock, "corosync")) self.runner_mock.assert_called_once_with( ["systemctl", "is-enabled", "corosync.service"], {"LC_ALL": "C"}, @@ -37,7 +34,7 @@ def test_is_enabled(self) -> None: def test_is_disabled(self) -> None: self.runner_mock.return_value = (1, "disabled", "") self.assertFalse( - loader._is_service_enabled(self.runner_mock, "pacemaker") + loader.is_service_enabled(self.runner_mock, "pacemaker") ) self.runner_mock.assert_called_once_with( ["systemctl", "is-enabled", "pacemaker.service"], @@ -46,30 +43,13 @@ def test_is_disabled(self) -> None: def test_unexpected_output(self) -> None: self.runner_mock.return_value = (4, "not-found", "") - self.assertFalse(loader._is_service_enabled(self.runner_mock, "pcmk")) + self.assertFalse(loader.is_service_enabled(self.runner_mock, "pcmk")) self.runner_mock.assert_called_once_with( ["systemctl", "is-enabled", "pcmk.service"], {"LC_ALL": "C"}, ) -class GetStartOnBoot(TestCase): - @mock.patch("ha_cluster_lsr.info.loader._is_service_enabled") - def test_main(self, mock_is_enabled: mock.Mock) -> None: - runner_mock = mock.Mock() - mock_is_enabled.side_effect = [False, False] - self.assertFalse(loader.get_start_on_boot(runner_mock)) - - mock_is_enabled.side_effect = [True, False] - self.assertTrue(loader.get_start_on_boot(runner_mock)) - - mock_is_enabled.side_effect = [False, True] - self.assertTrue(loader.get_start_on_boot(runner_mock)) - - mock_is_enabled.side_effect = [True, True] - self.assertTrue(loader.get_start_on_boot(runner_mock)) - - class CallPcsCli(TestCase): # pylint: disable=protected-access def test_success(self) -> None: