Skip to content

Commit

Permalink
refactor: ha_cluster_info: cleanup
Browse files Browse the repository at this point in the history
Implementing changes proposed in code review
  • Loading branch information
tomjelinek committed Nov 28, 2024
1 parent 4562939 commit 5cf96e5
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 304 deletions.
42 changes: 22 additions & 20 deletions library/ha_cluster_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ def export_cluster_configuration(module: AnsibleModule) -> Dict[str, Any]:
result: dict[str, Any] = dict()
cmd_runner = get_cmd_runner(module)

result["ha_cluster_start_on_boot"] = loader.get_start_on_boot(cmd_runner)
corosync_enabled = loader.is_service_enabled(cmd_runner, "corosync")
pacemaker_enabled = loader.is_service_enabled(cmd_runner, "pacemaker")
result["ha_cluster_start_on_boot"] = exporter.export_start_on_boot(
corosync_enabled, pacemaker_enabled
)

# Corosync config is availabe via CLI since pcs-0.10.8, via API v2 since
# pcs-0.12.0 and pcs-0.11.9. For old pcs versions, CLI must be used, and
Expand All @@ -118,25 +122,23 @@ def export_cluster_configuration(module: AnsibleModule) -> Dict[str, Any]:
known_hosts_pcs = loader.get_pcsd_known_hosts()

# Convert corosync config to role format
corosync_conf_role = exporter.export_corosync_options(corosync_conf_pcs)
for key in (
"ha_cluster_cluster_name",
"ha_cluster_transport",
"ha_cluster_totem",
"ha_cluster_quorum",
):
if key in corosync_conf_role:
result[key] = corosync_conf_role[key]

# Convert cluster definition to role format
try:
result["ha_cluster_node_options"] = exporter.export_cluster_nodes(
corosync_conf_pcs["nodes"], known_hosts_pcs
)
except KeyError as e:
raise exporter.JsonMissingKey(
e.args[0], corosync_conf_pcs, "corosync configuration"
) from e
result["ha_cluster_cluster_name"] = exporter.export_corosync_cluster_name(
corosync_conf_pcs
)
result["ha_cluster_transport"] = exporter.export_corosync_transport(
corosync_conf_pcs
)
exported_totem = exporter.export_corosync_totem(corosync_conf_pcs)
if exported_totem:
result["ha_cluster_totem"] = exported_totem
exported_quorum = exporter.export_corosync_quorum(corosync_conf_pcs)
if exported_quorum:
result["ha_cluster_quorum"] = exported_quorum

# Convert nodes definition to role format
result["ha_cluster_node_options"] = exporter.export_cluster_nodes(
corosync_conf_pcs, known_hosts_pcs
)

return result

Expand Down
131 changes: 86 additions & 45 deletions module_utils/ha_cluster_lsr/info/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# pylint: disable=invalid-name
__metaclass__ = type

from typing import Any, Dict, List
from contextlib import contextmanager
from typing import Any, Dict, Iterator, List


class JsonMissingKey(Exception):
Expand Down Expand Up @@ -39,18 +40,42 @@ def _dict_to_nv_list(input_dict: Dict[str, Any]) -> List[Dict[str, Any]]:
return [dict(name=name, value=value) for name, value in input_dict.items()]


def export_corosync_options(
@contextmanager
def _handle_missing_key(data: Dict[str, Any], data_desc: str) -> Iterator[None]:
try:
yield
except KeyError as e:
raise JsonMissingKey(e.args[0], data, data_desc) from e


def export_start_on_boot(
corosync_enabled: bool, pacemaker_enabled: bool
) -> bool:
"""
Transform cluster servis status to start_on_boot
"""
return corosync_enabled or pacemaker_enabled


def export_corosync_cluster_name(corosync_conf_dict: Dict[str, Any]) -> str:
"""
Extract cluster name form corosync config in pcs format
corosync_conf_dict -- corosync config structure provided by pcs
"""
with _handle_missing_key(corosync_conf_dict, "corosync configuration"):
return corosync_conf_dict["cluster_name"]


def export_corosync_transport(
corosync_conf_dict: Dict[str, Any]
) -> Dict[str, Any]:
"""
Transform corosync config from pcs format to role format excluding nodes
Export transport options in role format from corosync config in pcs format
corosync_conf_dict -- corosync config structure provided by pcs
"""
result: Dict[str, Any] = dict()
try:
result["ha_cluster_cluster_name"] = corosync_conf_dict["cluster_name"]

with _handle_missing_key(corosync_conf_dict, "corosync configuration"):
transport = dict(type=corosync_conf_dict["transport"].lower())
if corosync_conf_dict["transport_options"]:
transport["options"] = _dict_to_nv_list(
Expand All @@ -71,58 +96,74 @@ def export_corosync_options(
transport["crypto"] = _dict_to_nv_list(
corosync_conf_dict["crypto_options"]
)
result["ha_cluster_transport"] = transport
return transport


def export_corosync_totem(corosync_conf_dict: Dict[str, Any]) -> Dict[str, Any]:
"""
Export totem options in role format from corosync config in pcs format
corosync_conf_dict -- corosync config structure provided by pcs
"""
with _handle_missing_key(corosync_conf_dict, "corosync configuration"):
result: Dict[str, Any] = dict()
if corosync_conf_dict["totem_options"]:
result["ha_cluster_totem"] = dict(
options=_dict_to_nv_list(corosync_conf_dict["totem_options"])
result["options"] = _dict_to_nv_list(
corosync_conf_dict["totem_options"]
)
return result


def export_corosync_quorum(
corosync_conf_dict: Dict[str, Any]
) -> Dict[str, Any]:
"""
Export quorum options in role format from corosync config in pcs format
corosync_conf_dict -- corosync config structure provided by pcs
"""
with _handle_missing_key(corosync_conf_dict, "corosync configuration"):
result: Dict[str, Any] = dict()
if corosync_conf_dict["quorum_options"]:
result["ha_cluster_quorum"] = dict(
options=_dict_to_nv_list(corosync_conf_dict["quorum_options"])
result["options"] = _dict_to_nv_list(
corosync_conf_dict["quorum_options"]
)
except KeyError as e:
raise JsonMissingKey(
e.args[0], corosync_conf_dict, "corosync configuration"
) from e
return result
return result


def export_cluster_nodes(
corosync_conf_nodes: List[Dict[str, Any]], pcs_node_addr: Dict[str, str]
corosync_conf_dict: Dict[str, Any], pcs_node_addr: Dict[str, str]
) -> List[Dict[str, Any]]:
"""
Transform node configuration from pcs format to role format
corosync_conf_dict -- corosync config structure provided by pcs
pcs_node_addr -- dict holding pcs address for cluster nodes
"""
node_list: List[Dict[str, Any]] = []
if not corosync_conf_nodes:
return node_list
for index, node_dict in enumerate(corosync_conf_nodes):
# corosync node configuration
try:
one_node = dict(
node_name=node_dict["name"],
corosync_addresses=[
addr_dict["addr"]
for addr_dict in sorted(
node_dict["addrs"],
key=lambda item: item["link"],
)
],
)
except KeyError as e:
raise JsonMissingKey(
e.args[0],
dict(nodes=corosync_conf_nodes),
with _handle_missing_key(corosync_conf_dict, "corosync configuration"):
node_list: List[Dict[str, Any]] = []
corosync_nodes = corosync_conf_dict["nodes"]
if not corosync_nodes:
return node_list
for index, node_dict in enumerate(corosync_nodes):
# corosync node configuration
with _handle_missing_key(
corosync_conf_dict,
f"corosync configuration for node on index {index}",
) from e
# pcs node configuration
if one_node["node_name"] in pcs_node_addr:
one_node["pcs_address"] = pcs_node_addr[one_node["node_name"]]
# finish one node export
node_list.append(one_node)
return node_list
):
one_node = dict(
node_name=node_dict["name"],
corosync_addresses=[
addr_dict["addr"]
for addr_dict in sorted(
node_dict["addrs"],
key=lambda item: item["link"],
)
],
)
# pcs node configuration
if one_node["node_name"] in pcs_node_addr:
one_node["pcs_address"] = pcs_node_addr[one_node["node_name"]]
# finish one node export
node_list.append(one_node)
return node_list
11 changes: 1 addition & 10 deletions module_utils/ha_cluster_lsr/info/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def kwargs(self) -> Dict[str, Any]:
)


def _is_service_enabled(run_command: CommandRunner, service: str) -> bool:
def is_service_enabled(run_command: CommandRunner, service: str) -> bool:
"""
Check whether a specified service is enabled in the OS
Expand All @@ -102,15 +102,6 @@ def _is_service_enabled(run_command: CommandRunner, service: str) -> bool:
return rc == 0


def get_start_on_boot(run_command: CommandRunner) -> bool:
"""
Detect wheter a cluster is configured to start on boot
"""
return _is_service_enabled(run_command, "corosync") or _is_service_enabled(
run_command, "pacemaker"
)


def _call_pcs_cli(
run_command: CommandRunner, command: List[str]
) -> Dict[str, Any]:
Expand Down
Loading

0 comments on commit 5cf96e5

Please sign in to comment.