Skip to content

Commit

Permalink
Build map of nodes assigned to instances
Browse files Browse the repository at this point in the history
Build a map of launched instances associated to nodes, so that on the processing of the node_resume list, the nodes for which there is already an associated running instance are discarded

Signed-off-by: Luca Carrogu <[email protected]>
  • Loading branch information
lukeseawalker committed Sep 18, 2023
1 parent 5c591bb commit e4908ee
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 6 deletions.
11 changes: 9 additions & 2 deletions src/slurm_plugin/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def __init__(
self._boto3_resource_factory = lambda resource_name: boto3.session.Session().resource(
resource_name, region_name=region, config=boto3_config
)
self.nodes_assigned_to_instances = {}

def _clear_failed_nodes(self):
"""Clear and reset failed nodes list."""
Expand Down Expand Up @@ -250,10 +251,15 @@ def _parse_nodes_resume_list(self, node_list: List[str]) -> defaultdict[str, def
Sample NodeName: queue1-st-computeres1-2
"""
nodes_to_launch = defaultdict(lambda: defaultdict(list))
logger.debug("Nodes already assigned to running instances: %s", self.nodes_assigned_to_instances)
for node in node_list:
try:
queue_name, node_type, compute_resource_name = parse_nodename(node)
nodes_to_launch[queue_name][compute_resource_name].append(node)
if node in self.nodes_assigned_to_instances.get(queue_name, {}).get(compute_resource_name, []):
# skip node for which there is already an instance assigned (oversubscribe case)
logger.info("Discarding NodeName already assigned to running instance: %s", node)
else:
nodes_to_launch[queue_name][compute_resource_name].append(node)
except (InvalidNodenameError, KeyError):
logger.warning("Discarding NodeName with invalid format: %s", node)
self._update_failed_nodes({node}, "InvalidNodenameError")
Expand Down Expand Up @@ -928,7 +934,7 @@ def best_effort_node_assignment(
"all" if len(successful_launched_nodes) == len(nodes_resume_list) else "partial",
print_with_count(successful_launched_nodes),
)

self._update_dict(self.nodes_assigned_to_instances, nodes_resume_mapping)
if len(successful_launched_nodes) < len(nodes_resume_list):
# set limited capacity on the failed to launch nodes
self._update_failed_nodes(set(failed_launch_nodes), "LimitedInstanceCapacity", override=False)
Expand Down Expand Up @@ -961,6 +967,7 @@ def all_or_nothing_node_assignment(
"Successful launched all instances for nodes %s",
print_with_count(nodes_resume_list),
)
self._update_dict(self.nodes_assigned_to_instances, nodes_resume_mapping)
except InstanceToNodeAssignmentError:
# Failed to assign EC2 instances to nodes
# EC2 Instances already assigned, are going to be terminated by
Expand Down
67 changes: 63 additions & 4 deletions tests/slurm_plugin/test_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ def test_update_dns_hostnames(
instance_manager._update_dns_hostnames(assigned_nodes)

@pytest.mark.parametrize(
("node_list", "expected_results", "expected_failed_nodes", "job_level_scaling"),
("node_list", "expected_results", "expected_failed_nodes", "nodes_assigned_to_instances"),
[
(
[
Expand Down Expand Up @@ -502,13 +502,72 @@ def test_update_dns_hostnames(
"in-valid/queue.name-st-c5xlarge-2",
}
},
False,
{},
),
(
[
"queue1-st-c5xlarge-1",
"queue1-st-c5xlarge-2",
"queue1-dy-c5xlarge-201",
"queue2-st-g34xlarge-1",
"in-valid/queue.name-st-c5xlarge-2",
"noBrackets-st-c5xlarge-[1-2]",
"queue2-dy-g38xlarge-1",
"queue2-st-u6tb1metal-1",
"queue2-invalidnodetype-c5xlarge-12",
"queuename-with-dash-and_underscore-st-i3enmetal2tb-1",
],
{},
{
"InvalidNodenameError": {
"queue2-invalidnodetype-c5xlarge-12",
"noBrackets-st-c5xlarge-[1-2]",
"queuename-with-dash-and_underscore-st-i3enmetal2tb-1",
"in-valid/queue.name-st-c5xlarge-2",
}
},
{
"queue1": {
"c5xlarge": [
"queue1-st-c5xlarge-1",
"queue1-st-c5xlarge-2",
"queue1-dy-c5xlarge-201",
]
},
"queue2": {
"g34xlarge": ["queue2-st-g34xlarge-1"],
"g38xlarge": ["queue2-dy-g38xlarge-1"],
"u6tb1metal": ["queue2-st-u6tb1metal-1"],
},
},
),
(
[
"queue1-st-c5xlarge-1",
"queue1-st-c5xlarge-2",
],
{
"queue1": {
"c5xlarge": [
"queue1-st-c5xlarge-1",
]
},
},
{},
{
"queue1": {
"c5xlarge": [
"queue1-st-c5xlarge-2",
]
},
},
),
],
)
def test_parse_requested_instances(
self, node_list, expected_results, expected_failed_nodes, instance_manager, job_level_scaling
def test_parse_nodes_resume_list(
self, node_list, expected_results, expected_failed_nodes, instance_manager, nodes_assigned_to_instances
):
instance_manager.nodes_assigned_to_instances = nodes_assigned_to_instances
assert_that(instance_manager._parse_nodes_resume_list(node_list)).is_equal_to(expected_results)
assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes)

Expand Down

0 comments on commit e4908ee

Please sign in to comment.