Skip to content

Commit

Permalink
Rename parameters of is_healthy and is_state_healthy methods of Slurm…
Browse files Browse the repository at this point in the history
…Nodes

From a SlurmNode perspective "terminate_down/drain_nodes" does not have any sense.
We can instead pass a flag to the is_healthy function to say if we want to consider
down/drain nodes as unhealthy.

Signed-off-by: Enrico Usai <[email protected]>
  • Loading branch information
enrico-usai committed Nov 3, 2023
1 parent b344653 commit 04c8b03
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
5 changes: 4 additions & 1 deletion src/slurm_plugin/clustermgtd.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,10 @@ def _find_unhealthy_slurm_nodes(self, slurm_nodes):
)

for node in slurm_nodes:
if not node.is_healthy(self._config.terminate_drain_nodes, self._config.terminate_down_nodes):
if not node.is_healthy(
consider_drain_as_unhealthy=self._config.terminate_drain_nodes,
consider_down_as_unhealthy=self._config.terminate_down_nodes,
):
if not self._config.disable_capacity_blocks_management and node.name in reserved_nodenames:
# do not consider as unhealthy the nodes reserved for capacity blocks
continue
Expand Down
24 changes: 12 additions & 12 deletions src/slurm_plugin/slurm_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def is_invalid_slurm_registration(self):
return self.SLURM_SCONTROL_INVALID_REGISTRATION_STATE in self.states

@abstractmethod
def is_state_healthy(self, terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=True):
def is_state_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
"""Check if a slurm node's scheduler state is considered healthy."""
pass

Expand All @@ -394,7 +394,7 @@ def is_bootstrap_timeout(self):
pass

@abstractmethod
def is_healthy(self, terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=True):
def is_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
"""Check if a slurm node is considered healthy."""
pass

Expand Down Expand Up @@ -478,23 +478,23 @@ def __init__(
reservation_name=reservation_name,
)

def is_healthy(self, terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=True):
def is_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
"""Check if a slurm node is considered healthy."""
return (
self._is_static_node_ip_configuration_valid(log_warn_if_unhealthy=log_warn_if_unhealthy)
and self.is_backing_instance_valid(log_warn_if_unhealthy=log_warn_if_unhealthy)
and self.is_state_healthy(
terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=log_warn_if_unhealthy
consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=log_warn_if_unhealthy
)
)

def is_state_healthy(self, terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=True):
def is_state_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
"""Check if a slurm node's scheduler state is considered healthy."""
# Check if node is rebooting: if so, the node is healthy
if self.is_rebooting():
return True
# Check to see if node is in DRAINED, ignoring any node currently being replaced or in POWER_DOWN
if self.is_drained() and not self.is_power_down() and terminate_drain_nodes:
if self.is_drained() and not self.is_power_down() and consider_drain_as_unhealthy:
if self.is_being_replaced:
logger.debug(
"Node state check: node %s in DRAINED but is currently being replaced, ignoring, node state: %s",
Expand All @@ -507,7 +507,7 @@ def is_state_healthy(self, terminate_drain_nodes, terminate_down_nodes, log_warn
logger.warning("Node state check: node %s in DRAINED, node state: %s", self, self.state_string)
return False
# Check to see if node is in DOWN, ignoring any node currently being replaced
elif self.is_down() and terminate_down_nodes:
elif self.is_down() and consider_down_as_unhealthy:
if self.is_being_replaced:
logger.debug(
"Node state check: node %s in DOWN but is currently being replaced, ignoring. Node state: ",
Expand Down Expand Up @@ -597,18 +597,18 @@ def __init__(
reservation_name=reservation_name,
)

def is_state_healthy(self, terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=True):
def is_state_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
"""Check if a slurm node's scheduler state is considered healthy."""
# Check if node is rebooting: if so, the node is healthy
if self.is_rebooting():
return True
# Check to see if node is in DRAINED, ignoring any node currently being replaced or in POWER_DOWN
if self.is_drained() and not self.is_power_down() and terminate_drain_nodes:
if self.is_drained() and not self.is_power_down() and consider_drain_as_unhealthy:
if log_warn_if_unhealthy:
logger.warning("Node state check: node %s in DRAINED, node state: %s", self, self.state_string)
return False
# Check to see if node is in DOWN, ignoring any node currently being replaced
elif self.is_down() and terminate_down_nodes:
elif self.is_down() and consider_down_as_unhealthy:
if not self.is_nodeaddr_set():
# Silently handle failed to launch dynamic node to clean up normal logging
logger.debug("Node state check: node %s in DOWN, node state: %s", self, self.state_string)
Expand All @@ -618,10 +618,10 @@ def is_state_healthy(self, terminate_drain_nodes, terminate_down_nodes, log_warn
return False
return True

def is_healthy(self, terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=True):
def is_healthy(self, consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=True):
"""Check if a slurm node is considered healthy."""
return self.is_backing_instance_valid(log_warn_if_unhealthy=log_warn_if_unhealthy) and self.is_state_healthy(
terminate_drain_nodes, terminate_down_nodes, log_warn_if_unhealthy=log_warn_if_unhealthy
consider_drain_as_unhealthy, consider_down_as_unhealthy, log_warn_if_unhealthy=log_warn_if_unhealthy
)

def is_bootstrap_failure(self):
Expand Down
12 changes: 8 additions & 4 deletions tests/slurm_plugin/slurm_resources/test_slurm_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ def test_partition_is_inactive(nodes, expected_output):


@pytest.mark.parametrize(
"node, terminate_drain_nodes, terminate_down_nodes, mock_is_node_being_replaced, expected_result",
"node, consider_drain_as_unhealthy, consider_down_as_unhealthy, mock_is_node_being_replaced, expected_result",
[
pytest.param(
DynamicNode("queue-dy-c5xlarge-1", "some_ip", "hostname", "MIXED+CLOUD", "queue"),
Expand Down Expand Up @@ -709,10 +709,12 @@ def test_partition_is_inactive(nodes, expected_output):
],
)
def test_slurm_node_is_state_healthy(
node, mock_is_node_being_replaced, terminate_drain_nodes, terminate_down_nodes, expected_result
node, mock_is_node_being_replaced, consider_drain_as_unhealthy, consider_down_as_unhealthy, expected_result
):
node.is_being_replaced = mock_is_node_being_replaced
assert_that(node.is_state_healthy(terminate_drain_nodes, terminate_down_nodes)).is_equal_to(expected_result)
assert_that(node.is_state_healthy(consider_drain_as_unhealthy, consider_down_as_unhealthy)).is_equal_to(
expected_result
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1005,7 +1007,9 @@ def test_slurm_node_is_bootstrap_failure(
)
def test_slurm_node_is_healthy(node, instance, expected_result):
node.instance = instance
assert_that(node.is_healthy(terminate_drain_nodes=True, terminate_down_nodes=True)).is_equal_to(expected_result)
assert_that(node.is_healthy(consider_drain_as_unhealthy=True, consider_down_as_unhealthy=True)).is_equal_to(
expected_result
)


@pytest.mark.parametrize(
Expand Down

0 comments on commit 04c8b03

Please sign in to comment.