From fd1a3744c538ced388dc8b4446dd3f8dc9c7af9d Mon Sep 17 00:00:00 2001 From: Luca Carrogu Date: Thu, 14 Sep 2023 11:26:19 +0200 Subject: [PATCH] Add best-effort launch strategy for job-level scaling Add best-effort launch strategy for job-level scaling. All-or-nothing is now the new default. When set to "False", best-effort will be performed. Small refactoring on log string messages. Tests done: given the following submission command: ``` sbatch --wrap "sleep 10" -N 4 --constraint="[(c5.4xlarge)*3&(p4d.24xlarge)*1]" -p q4 --exclusive; sbatch --wrap "sleep 10" -N 2 --constraint="[(c5.4xlarge)*1&(p4d.24xlarge)*1]" -p q4 --exclusive; sbatch --wrap "sleep 10" -N 3 --constraint="[(c5.4xlarge)*3]" -p q4 --exclusive ``` where there is capacity for c5.4xlarge but not for p4d.24xlarge the two scaling strategies were tested: all_or_nothing_batch = true expected nodes running at the end of the resume call: (x3) q4-dy-c4-1-* resume log: ``` 2023-09-14 09:03:09,530 - [slurm_plugin.resume:main] - INFO - ResumeProgram startup. 2023-09-14 09:03:09,531 - [slurm_plugin.resume:_get_config] - INFO - Reading /etc/parallelcluster/slurm_plugin/parallelcluster_slurm_resume.conf 2023-09-14 09:03:09,533 - [slurm_plugin.resume:main] - INFO - ResumeProgram config: SlurmResumeConfig(region='us-east-1', cluster_name='bootstrap', dynamodb_table='parallelcluster-slurm-bootstrap', hosted_zone='Z09815256PBUS3QRIMRV', dns_domain='bootstrap.pcluster.', use_private_hostname=False, head_node_private_ip='192.168.24.99', head_node_hostname='ip-192-168-24-99.ec2.internal', launch_max_batch_size=500, assign_node_max_batch_size=500, terminate_max_batch_size=1000, update_node_address=True, all_or_nothing_batch=True, job_level_scaling=True, temp_jls_for_node_sharing=False, fleet_config={'q1': {'c1': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'c5.xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}}, 'q2': {'c2': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'c5.2xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}}, 'q3': {'c3': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'c5.4xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}}, 'q4': {'c4-1': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'c5.4xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}, 'c4-2': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'p4d.24xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}}}, run_instances_overrides={}, create_fleet_overrides={}, clustermgtd_timeout=300, clustermgtd_heartbeat_file_path='/opt/slurm/etc/pcluster/.slurm_plugin/clustermgtd_heartbeat', _boto3_retry=1, _boto3_config={'retries': {'max_attempts': 1, 'mode': 'standard'}}, boto3_config=, logging_config='/opt/parallelcluster/pyenv/versions/3.9.16/envs/node_virtualenv/lib/python3.9/site-packages/slurm_plugin/logging/parallelcluster_resume_logging.conf', head_node_instance_id='i-0145afe796a5e375a') 2023-09-14 09:03:09,533 - [slurm_plugin.resume:_get_slurm_resume] - INFO - Slurm Resume File content: {'jobs': [{'extra': None, 'job_id': 185, 'features': '[(c5.4xlarge)*3&(p4d.24xlarge)*1]', 'nodes_alloc': 'q4-dy-c4-1-[1-3],q4-dy-c4-2-1', 'nodes_resume': 'q4-dy-c4-1-[1-3],q4-dy-c4-2-1', 'oversubscribe': 'NO', 'partition': 'q4', 'reservation': None}, {'extra': None, 'job_id': 186, 'features': '[(c5.4xlarge)*1&(p4d.24xlarge)*1]', 'nodes_alloc': 'q4-dy-c4-1-4,q4-dy-c4-2-2', 'nodes_resume': 'q4-dy-c4-1-4,q4-dy-c4-2-2', 'oversubscribe': 'NO', 'partition': 'q4', 'reservation': None}, {'extra': None, 'job_id': 187, 'features': '[(c5.4xlarge)*3]', 'nodes_alloc': 'q4-dy-c4-1-[5-7]', 'nodes_resume': 'q4-dy-c4-1-[5-7]', 'oversubscribe': 'NO', 'partition': 'q4', 'reservation': None}], 'all_nodes_resume': 'q4-dy-c4-1-[1-7],q4-dy-c4-2-[1-2]'} 2023-09-14 09:03:09,537 - [slurm_plugin.common:is_clustermgtd_heartbeat_valid] - INFO - Latest heartbeat from clustermgtd: 2023-09-14 09:02:27.308559+00:00 2023-09-14 09:03:09,538 - [slurm_plugin.resume:_resume] - INFO - Launching EC2 instances for the following Slurm nodes: q4-dy-c4-1-[1-7],q4-dy-c4-2-[1-2] 2023-09-14 09:03:09,594 - [slurm_plugin.resume:_resume] - INFO - Current state of Slurm nodes to resume: [('q4-dy-c4-1-1', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-2', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-3', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-4', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-5', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-6', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-7', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-2-1', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-2-2', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP')] 2023-09-14 09:03:09,620 - [botocore.credentials:load] - INFO - Found credentials from IAM Role: bootstrap-RoleHeadNode-NKATKTSA4IIU 2023-09-14 09:03:09,660 - [slurm_plugin.instance_manager:_launch_instances] - INFO - Launching all-or-nothing instances for nodes (x7) ['q4-dy-c4-1-1', 'q4-dy-c4-1-2', 'q4-dy-c4-1-3', 'q4-dy-c4-1-4', 'q4-dy-c4-1-5', 'q4-dy-c4-1-6', 'q4-dy-c4-1-7'] 2023-09-14 09:03:09,661 - [slurm_plugin.fleet_manager:create_fleet] - INFO - Launching instances with create_fleet API. Parameters: {'LaunchTemplateConfigs': [{'LaunchTemplateSpecification': {'LaunchTemplateName': 'bootstrap-q4-c4-1', 'Version': '$Latest'}, 'Overrides': [{'InstanceType': 'c5.4xlarge', 'SubnetId': 'subnet-0b48ed99988e56110'}]}], 'TargetCapacitySpecification': {'TotalTargetCapacity': 7, 'DefaultTargetCapacityType': 'on-demand'}, 'Type': 'instant', 'OnDemandOptions': {'AllocationStrategy': 'lowest-price', 'SingleInstanceType': True, 'SingleAvailabilityZone': True, 'MinTargetCapacity': 7, 'CapacityReservationOptions': {'UsageStrategy': 'use-capacity-reservations-first'}}} 2023-09-14 09:03:12,930 - [slurm_plugin.fleet_manager:launch_ec2_instances] - INFO - Launched the following instances (x7) ['i-09ba3d3b0753ddc33', 'i-095c89ec9f1e389d8', 'i-0414b54e1cfb7f5b8', 'i-01ac20db646a75ffa', 'i-03bdd4851aa584786', 'i-0b5adaef26df1187d', 'i-08584b017f57195b0'] 2023-09-14 09:03:12,931 - [slurm_plugin.instance_manager:_launch_instances] - INFO - Launching all-or-nothing instances for nodes (x2) ['q4-dy-c4-2-1', 'q4-dy-c4-2-2'] 2023-09-14 09:03:12,931 - [slurm_plugin.fleet_manager:create_fleet] - INFO - Launching instances with create_fleet API. Parameters: {'LaunchTemplateConfigs': [{'LaunchTemplateSpecification': {'LaunchTemplateName': 'bootstrap-q4-c4-2', 'Version': '$Latest'}, 'Overrides': [{'InstanceType': 'p4d.24xlarge', 'SubnetId': 'subnet-0b48ed99988e56110'}]}], 'TargetCapacitySpecification': {'TotalTargetCapacity': 2, 'DefaultTargetCapacityType': 'on-demand'}, 'Type': 'instant', 'OnDemandOptions': {'AllocationStrategy': 'lowest-price', 'SingleInstanceType': True, 'SingleAvailabilityZone': True, 'MinTargetCapacity': 2, 'CapacityReservationOptions': {'UsageStrategy': 'use-capacity-reservations-first'}}} 2023-09-14 09:03:13,971 - [slurm_plugin.fleet_manager:_launch_instances] - ERROR - Error in CreateFleet request (7e76aa68-8d69-42a8-bead-7de1a50f9037): InsufficientInstanceCapacity - We currently do not have sufficient p4d.24xlarge capacity in the Availability Zone you requested (us-east-1d). Our system will be working on provisioning additional capacity. You can currently get p4d.24xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1a, us-east-1b. 2023-09-14 09:03:14,072 - [slurm_plugin.instance_manager:_add_instances_for_job] - INFO - JobID 185 - The job nodes_resume list is (x4) ['q4-dy-c4-1-1', 'q4-dy-c4-1-2', 'q4-dy-c4-1-3', 'q4-dy-c4-2-1'] 2023-09-14 09:03:14,072 - [slurm_plugin.instance_manager:_resize_slurm_node_list] - INFO - JobID 185 - Booking already launched instances for nodes (x3) ['q4-dy-c4-1-1', 'q4-dy-c4-1-2', 'q4-dy-c4-1-3']: 2023-09-14 09:03:14,072 - [slurm_plugin.instance_manager:_launch_instances] - INFO - JobID 185 - Launching all-or-nothing instances for nodes (x1) ['q4-dy-c4-2-1'] 2023-09-14 09:03:14,072 - [slurm_plugin.fleet_manager:create_fleet] - INFO - JobID 185 - Launching instances with create_fleet API. Parameters: {'LaunchTemplateConfigs': [{'LaunchTemplateSpecification': {'LaunchTemplateName': 'bootstrap-q4-c4-2', 'Version': '$Latest'}, 'Overrides': [{'InstanceType': 'p4d.24xlarge', 'SubnetId': 'subnet-0b48ed99988e56110'}]}], 'TargetCapacitySpecification': {'TotalTargetCapacity': 1, 'DefaultTargetCapacityType': 'on-demand'}, 'Type': 'instant', 'OnDemandOptions': {'AllocationStrategy': 'lowest-price', 'SingleInstanceType': True, 'SingleAvailabilityZone': True, 'MinTargetCapacity': 1, 'CapacityReservationOptions': {'UsageStrategy': 'use-capacity-reservations-first'}}} 2023-09-14 09:03:15,050 - [slurm_plugin.fleet_manager:_launch_instances] - ERROR - JobID 185 - Error in CreateFleet request (044cbd43-2925-4874-af52-40ca1240e179): InsufficientInstanceCapacity - We currently do not have sufficient p4d.24xlarge capacity in the Availability Zone you requested (us-east-1d). Our system will be working on provisioning additional capacity. You can currently get p4d.24xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1a, us-east-1b. 2023-09-14 09:03:15,151 - [slurm_plugin.instance_manager:all_or_nothing_node_assignment] - INFO - JobID 185 - Releasing booked instances (x3) ["('q4', 'c4-1', EC2Instance(id='i-09ba3d3b0753ddc33', private_ip='192.168.109.64', hostname='ip-192-168-109-64', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None))", "('q4', 'c4-1', EC2Instance(id='i-095c89ec9f1e389d8', private_ip='192.168.107.253', hostname='ip-192-168-107-253', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None))", "('q4', 'c4-1', EC2Instance(id='i-0414b54e1cfb7f5b8', private_ip='192.168.111.135', hostname='ip-192-168-111-135', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None))"] 2023-09-14 09:03:15,151 - [slurm_plugin.instance_manager:_add_instances_for_job] - INFO - JobID 186 - The job nodes_resume list is (x2) ['q4-dy-c4-1-4', 'q4-dy-c4-2-2'] 2023-09-14 09:03:15,151 - [slurm_plugin.instance_manager:_resize_slurm_node_list] - INFO - JobID 186 - Booking already launched instances for nodes (x1) ['q4-dy-c4-1-4']: 2023-09-14 09:03:15,151 - [slurm_plugin.instance_manager:_launch_instances] - INFO - JobID 186 - Launching all-or-nothing instances for nodes (x1) ['q4-dy-c4-2-2'] 2023-09-14 09:03:15,152 - [slurm_plugin.fleet_manager:create_fleet] - INFO - JobID 186 - Launching instances with create_fleet API. Parameters: {'LaunchTemplateConfigs': [{'LaunchTemplateSpecification': {'LaunchTemplateName': 'bootstrap-q4-c4-2', 'Version': '$Latest'}, 'Overrides': [{'InstanceType': 'p4d.24xlarge', 'SubnetId': 'subnet-0b48ed99988e56110'}]}], 'TargetCapacitySpecification': {'TotalTargetCapacity': 1, 'DefaultTargetCapacityType': 'on-demand'}, 'Type': 'instant', 'OnDemandOptions': {'AllocationStrategy': 'lowest-price', 'SingleInstanceType': True, 'SingleAvailabilityZone': True, 'MinTargetCapacity': 1, 'CapacityReservationOptions': {'UsageStrategy': 'use-capacity-reservations-first'}}} 2023-09-14 09:03:16,162 - [slurm_plugin.fleet_manager:_launch_instances] - ERROR - JobID 186 - Error in CreateFleet request (f1829b1d-4426-4dfa-8f27-3cf306b784e1): InsufficientInstanceCapacity - We currently do not have sufficient p4d.24xlarge capacity in the Availability Zone you requested (us-east-1d). Our system will be working on provisioning additional capacity. You can currently get p4d.24xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1a, us-east-1b. 2023-09-14 09:03:16,262 - [slurm_plugin.instance_manager:all_or_nothing_node_assignment] - INFO - JobID 186 - Releasing booked instances (x1) ["('q4', 'c4-1', EC2Instance(id='i-01ac20db646a75ffa', private_ip='192.168.108.115', hostname='ip-192-168-108-115', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None))"] 2023-09-14 09:03:16,262 - [slurm_plugin.instance_manager:_add_instances_for_job] - INFO - JobID 187 - The job nodes_resume list is (x3) ['q4-dy-c4-1-5', 'q4-dy-c4-1-6', 'q4-dy-c4-1-7'] 2023-09-14 09:03:16,262 - [slurm_plugin.instance_manager:_resize_slurm_node_list] - INFO - JobID 187 - Booking already launched instances for nodes (x3) ['q4-dy-c4-1-5', 'q4-dy-c4-1-6', 'q4-dy-c4-1-7']: 2023-09-14 09:03:16,280 - [slurm_plugin.instance_manager:_update_slurm_node_addrs] - INFO - JobID 187 - Nodes are now configured with instances: (x3) ["('q4-dy-c4-1-5', EC2Instance(id='i-03bdd4851aa584786', private_ip='192.168.107.163', hostname='ip-192-168-107-163', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None))", "('q4-dy-c4-1-6', EC2Instance(id='i-0b5adaef26df1187d', private_ip='192.168.106.37', hostname='ip-192-168-106-37', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None))", "('q4-dy-c4-1-7', EC2Instance(id='i-08584b017f57195b0', private_ip='192.168.110.115', hostname='ip-192-168-110-115', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None))"] 2023-09-14 09:03:16,281 - [slurm_plugin.instance_manager:_store_assigned_hostnames] - INFO - JobID 187 - Saving assigned hostnames in DynamoDB 2023-09-14 09:03:16,327 - [slurm_plugin.instance_manager:_store_assigned_hostnames] - INFO - JobID 187 - Database update: COMPLETED 2023-09-14 09:03:16,327 - [slurm_plugin.instance_manager:_update_dns_hostnames] - INFO - JobID 187 - Updating DNS records for Z09815256PBUS3QRIMRV - bootstrap.pcluster. 2023-09-14 09:03:16,652 - [slurm_plugin.instance_manager:_update_dns_hostnames] - INFO - JobID 187 - DNS records update: COMPLETED 2023-09-14 09:03:16,653 - [slurm_plugin.instance_manager:all_or_nothing_node_assignment] - INFO - JobID 187 - Successful launched all instances for nodes (x3) ['q4-dy-c4-1-5', 'q4-dy-c4-1-6', 'q4-dy-c4-1-7'] 2023-09-14 09:03:16,653 - [slurm_plugin.instance_manager:_terminate_unassigned_launched_instances] - INFO - Terminating unassigned launched instances: {'q4': {'c4-1': [EC2Instance(id='i-09ba3d3b0753ddc33', private_ip='192.168.109.64', hostname='ip-192-168-109-64', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None), EC2Instance(id='i-095c89ec9f1e389d8', private_ip='192.168.107.253', hostname='ip-192-168-107-253', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None), EC2Instance(id='i-0414b54e1cfb7f5b8', private_ip='192.168.111.135', hostname='ip-192-168-111-135', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None), EC2Instance(id='i-01ac20db646a75ffa', private_ip='192.168.108.115', hostname='ip-192-168-108-115', launch_time=datetime.datetime(2023, 9, 14, 9, 3, 11, tzinfo=tzlocal()), slurm_node=None)]}} 2023-09-14 09:03:16,662 - [slurm_plugin.instance_manager:delete_instances] - INFO - Terminating instances (x4) ['i-09ba3d3b0753ddc33', 'i-095c89ec9f1e389d8', 'i-0414b54e1cfb7f5b8', 'i-01ac20db646a75ffa'] 2023-09-14 09:03:17,131 - [slurm_plugin.resume:_resume] - INFO - Successfully launched nodes (x3) ['q4-dy-c4-1-5', 'q4-dy-c4-1-6', 'q4-dy-c4-1-7'] 2023-09-14 09:03:17,131 - [slurm_plugin.resume:_resume] - ERROR - Failed to launch following nodes, setting nodes to DOWN: (x6) ['q4-dy-c4-1-1', 'q4-dy-c4-1-3', 'q4-dy-c4-1-4', 'q4-dy-c4-2-2', 'q4-dy-c4-2-1', 'q4-dy-c4-1-2'] 2023-09-14 09:03:17,131 - [slurm_plugin.resume:_handle_failed_nodes] - INFO - Setting following failed nodes into DOWN state (x2) ['q4-dy-c4-2-2', 'q4-dy-c4-2-1'] with reason: (Code:InsufficientInstanceCapacity)Failure when resuming nodes 2023-09-14 09:03:17,149 - [slurm_plugin.resume:_handle_failed_nodes] - INFO - Setting following failed nodes into DOWN state (x4) ['q4-dy-c4-1-1', 'q4-dy-c4-1-3', 'q4-dy-c4-1-4', 'q4-dy-c4-1-2'] with reason: (Code:LimitedInstanceCapacity)Failure when resuming nodes 2023-09-14 09:03:17,169 - [slurm_plugin.resume:main] - INFO - ResumeProgram finished. ``` all_or_nothing_batch = false expected nodes running at the end of the resume call: (x7) q4-dy-c4-1-* resume log: ``` 2023-09-14 09:08:09,554 - [slurm_plugin.resume:main] - INFO - ResumeProgram startup. 2023-09-14 09:08:09,555 - [slurm_plugin.resume:_get_config] - INFO - Reading /etc/parallelcluster/slurm_plugin/parallelcluster_slurm_resume.conf 2023-09-14 09:08:09,556 - [slurm_plugin.resume:main] - INFO - ResumeProgram config: SlurmResumeConfig(region='us-east-1', cluster_name='bootstrap', dynamodb_table='parallelcluster-slurm-bootstrap', hosted_zone='Z09815256PBUS3QRIMRV', dns_domain='bootstrap.pcluster.', use_private_hostname=False, head_node_private_ip='192.168.24.99', head_node_hostname='ip-192-168-24-99.ec2.internal', launch_max_batch_size=500, assign_node_max_batch_size=500, terminate_max_batch_size=1000, update_node_address=True, all_or_nothing_batch=False, job_level_scaling=True, temp_jls_for_node_sharing=False, fleet_config={'q1': {'c1': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'c5.xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}}, 'q2': {'c2': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'c5.2xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}}, 'q3': {'c3': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'c5.4xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}}, 'q4': {'c4-1': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'c5.4xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}, 'c4-2': {'Api': 'create-fleet', 'CapacityType': 'on-demand', 'AllocationStrategy': 'lowest-price', 'Instances': [{'InstanceType': 'p4d.24xlarge'}], 'Networking': {'SubnetIds': ['subnet-0b48ed99988e56110']}}}}, run_instances_overrides={}, create_fleet_overrides={}, clustermgtd_timeout=300, clustermgtd_heartbeat_file_path='/opt/slurm/etc/pcluster/.slurm_plugin/clustermgtd_heartbeat', _boto3_retry=1, _boto3_config={'retries': {'max_attempts': 1, 'mode': 'standard'}}, boto3_config=, logging_config='/opt/parallelcluster/pyenv/versions/3.9.16/envs/node_virtualenv/lib/python3.9/site-packages/slurm_plugin/logging/parallelcluster_resume_logging.conf', head_node_instance_id='i-0145afe796a5e375a') 2023-09-14 09:08:09,557 - [slurm_plugin.resume:_get_slurm_resume] - INFO - Slurm Resume File content: {'jobs': [{'extra': None, 'job_id': 188, 'features': '[(c5.4xlarge)*3&(p4d.24xlarge)*1]', 'nodes_alloc': 'q4-dy-c4-1-[1-3],q4-dy-c4-2-1', 'nodes_resume': 'q4-dy-c4-1-[1-3],q4-dy-c4-2-1', 'oversubscribe': 'NO', 'partition': 'q4', 'reservation': None}, {'extra': None, 'job_id': 189, 'features': '[(c5.4xlarge)*1&(p4d.24xlarge)*1]', 'nodes_alloc': 'q4-dy-c4-1-4,q4-dy-c4-2-2', 'nodes_resume': 'q4-dy-c4-1-4,q4-dy-c4-2-2', 'oversubscribe': 'NO', 'partition': 'q4', 'reservation': None}, {'extra': None, 'job_id': 190, 'features': '[(c5.4xlarge)*3]', 'nodes_alloc': 'q4-dy-c4-1-[8-10]', 'nodes_resume': 'q4-dy-c4-1-[8-10]', 'oversubscribe': 'NO', 'partition': 'q4', 'reservation': None}], 'all_nodes_resume': 'q4-dy-c4-1-[1-4,8-10],q4-dy-c4-2-[1-2]'} 2023-09-14 09:08:09,561 - [slurm_plugin.common:is_clustermgtd_heartbeat_valid] - INFO - Latest heartbeat from clustermgtd: 2023-09-14 09:07:27.471205+00:00 2023-09-14 09:08:09,561 - [slurm_plugin.resume:_resume] - INFO - Launching EC2 instances for the following Slurm nodes: q4-dy-c4-1-[1-4,8-10],q4-dy-c4-2-[1-2] 2023-09-14 09:08:09,616 - [slurm_plugin.resume:_resume] - INFO - Current state of Slurm nodes to resume: [('q4-dy-c4-1-1', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-2', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-3', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-4', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-8', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-9', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-1-10', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-2-1', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP'), ('q4-dy-c4-2-2', 'ALLOCATED+CLOUD+NOT_RESPONDING+POWERING_UP')] 2023-09-14 09:08:09,643 - [botocore.credentials:load] - INFO - Found credentials from IAM Role: bootstrap-RoleHeadNode-NKATKTSA4IIU 2023-09-14 09:08:09,683 - [slurm_plugin.instance_manager:_launch_instances] - INFO - Launching best-effort instances for nodes (x7) ['q4-dy-c4-1-1', 'q4-dy-c4-1-2', 'q4-dy-c4-1-3', 'q4-dy-c4-1-4', 'q4-dy-c4-1-8', 'q4-dy-c4-1-9', 'q4-dy-c4-1-10'] 2023-09-14 09:08:09,683 - [slurm_plugin.fleet_manager:create_fleet] - INFO - Launching instances with create_fleet API. Parameters: {'LaunchTemplateConfigs': [{'LaunchTemplateSpecification': {'LaunchTemplateName': 'bootstrap-q4-c4-1', 'Version': '$Latest'}, 'Overrides': [{'InstanceType': 'c5.4xlarge', 'SubnetId': 'subnet-0b48ed99988e56110'}]}], 'TargetCapacitySpecification': {'TotalTargetCapacity': 7, 'DefaultTargetCapacityType': 'on-demand'}, 'Type': 'instant', 'OnDemandOptions': {'AllocationStrategy': 'lowest-price', 'SingleInstanceType': True, 'SingleAvailabilityZone': True, 'MinTargetCapacity': 1, 'CapacityReservationOptions': {'UsageStrategy': 'use-capacity-reservations-first'}}} 2023-09-14 09:08:12,914 - [slurm_plugin.fleet_manager:launch_ec2_instances] - INFO - Launched the following instances (x7) ['i-0250d4e661b9d86eb', 'i-0d23930fc5b09fd33', 'i-07dad6e5f1eed664d', 'i-0ad5528556d13495b', 'i-0365529c953588fab', 'i-03a19e86c0d73e84b', 'i-05b6109e7c0940a9c'] 2023-09-14 09:08:12,915 - [slurm_plugin.instance_manager:_launch_instances] - INFO - Launching best-effort instances for nodes (x2) ['q4-dy-c4-2-1', 'q4-dy-c4-2-2'] 2023-09-14 09:08:12,915 - [slurm_plugin.fleet_manager:create_fleet] - INFO - Launching instances with create_fleet API. Parameters: {'LaunchTemplateConfigs': [{'LaunchTemplateSpecification': {'LaunchTemplateName': 'bootstrap-q4-c4-2', 'Version': '$Latest'}, 'Overrides': [{'InstanceType': 'p4d.24xlarge', 'SubnetId': 'subnet-0b48ed99988e56110'}]}], 'TargetCapacitySpecification': {'TotalTargetCapacity': 2, 'DefaultTargetCapacityType': 'on-demand'}, 'Type': 'instant', 'OnDemandOptions': {'AllocationStrategy': 'lowest-price', 'SingleInstanceType': True, 'SingleAvailabilityZone': True, 'MinTargetCapacity': 1, 'CapacityReservationOptions': {'UsageStrategy': 'use-capacity-reservations-first'}}} 2023-09-14 09:08:14,152 - [slurm_plugin.fleet_manager:_launch_instances] - ERROR - Error in CreateFleet request (af6b0eb4-086f-46ad-b58b-6c5f811d8280): InsufficientInstanceCapacity - We currently do not have sufficient p4d.24xlarge capacity in the Availability Zone you requested (us-east-1d). Our system will be working on provisioning additional capacity. You can currently get p4d.24xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1a, us-east-1b. 2023-09-14 09:08:14,253 - [slurm_plugin.instance_manager:_add_instances_for_job] - INFO - JobID 188 - The job nodes_resume list is (x4) ['q4-dy-c4-1-1', 'q4-dy-c4-1-2', 'q4-dy-c4-1-3', 'q4-dy-c4-2-1'] 2023-09-14 09:08:14,253 - [slurm_plugin.instance_manager:_resize_slurm_node_list] - INFO - JobID 188 - Booking already launched instances for nodes (x3) ['q4-dy-c4-1-1', 'q4-dy-c4-1-2', 'q4-dy-c4-1-3']: 2023-09-14 09:08:14,253 - [slurm_plugin.instance_manager:_launch_instances] - INFO - JobID 188 - Launching best-effort instances for nodes (x1) ['q4-dy-c4-2-1'] 2023-09-14 09:08:14,254 - [slurm_plugin.fleet_manager:create_fleet] - INFO - JobID 188 - Launching instances with create_fleet API. Parameters: {'LaunchTemplateConfigs': [{'LaunchTemplateSpecification': {'LaunchTemplateName': 'bootstrap-q4-c4-2', 'Version': '$Latest'}, 'Overrides': [{'InstanceType': 'p4d.24xlarge', 'SubnetId': 'subnet-0b48ed99988e56110'}]}], 'TargetCapacitySpecification': {'TotalTargetCapacity': 1, 'DefaultTargetCapacityType': 'on-demand'}, 'Type': 'instant', 'OnDemandOptions': {'AllocationStrategy': 'lowest-price', 'SingleInstanceType': True, 'SingleAvailabilityZone': True, 'MinTargetCapacity': 1, 'CapacityReservationOptions': {'UsageStrategy': 'use-capacity-reservations-first'}}} 2023-09-14 09:08:15,274 - [slurm_plugin.fleet_manager:_launch_instances] - ERROR - JobID 188 - Error in CreateFleet request (ff2ac807-49a8-41b4-8af9-2dcea2ed6dfb): InsufficientInstanceCapacity - We currently do not have sufficient p4d.24xlarge capacity in the Availability Zone you requested (us-east-1d). Our system will be working on provisioning additional capacity. You can currently get p4d.24xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1a, us-east-1b. 2023-09-14 09:08:15,409 - [slurm_plugin.instance_manager:_update_slurm_node_addrs] - INFO - JobID 188 - Nodes are now configured with instances: (x3) ["('q4-dy-c4-1-1', EC2Instance(id='i-0250d4e661b9d86eb', private_ip='192.168.111.231', hostname='ip-192-168-111-231', launch_time=datetime.datetime(2023, 9, 14, 9, 8, 11, tzinfo=tzlocal()), slurm_node=None))", "('q4-dy-c4-1-2', EC2Instance(id='i-0d23930fc5b09fd33', private_ip='192.168.110.38', hostname='ip-192-168-110-38', launch_time=datetime.datetime(2023, 9, 14, 9, 8, 11, tzinfo=tzlocal()), slurm_node=None))", "('q4-dy-c4-1-3', EC2Instance(id='i-07dad6e5f1eed664d', private_ip='192.168.104.249', hostname='ip-192-168-104-249', launch_time=datetime.datetime(2023, 9, 14, 9, 8, 11, tzinfo=tzlocal()), slurm_node=None))"] 2023-09-14 09:08:15,409 - [slurm_plugin.instance_manager:_store_assigned_hostnames] - INFO - JobID 188 - Saving assigned hostnames in DynamoDB 2023-09-14 09:08:15,447 - [slurm_plugin.instance_manager:_store_assigned_hostnames] - INFO - JobID 188 - Database update: COMPLETED 2023-09-14 09:08:15,447 - [slurm_plugin.instance_manager:_update_dns_hostnames] - INFO - JobID 188 - Updating DNS records for Z09815256PBUS3QRIMRV - bootstrap.pcluster. 2023-09-14 09:08:15,743 - [slurm_plugin.instance_manager:_update_dns_hostnames] - INFO - JobID 188 - DNS records update: COMPLETED 2023-09-14 09:08:15,744 - [slurm_plugin.instance_manager:best_effort_node_assignment] - INFO - JobID 188 - Successful launched partial instances for nodes (x3) ['q4-dy-c4-1-1', 'q4-dy-c4-1-2', 'q4-dy-c4-1-3'] 2023-09-14 09:08:15,744 - [slurm_plugin.instance_manager:_add_instances_for_job] - INFO - JobID 189 - The job nodes_resume list is (x2) ['q4-dy-c4-1-4', 'q4-dy-c4-2-2'] 2023-09-14 09:08:15,744 - [slurm_plugin.instance_manager:_resize_slurm_node_list] - INFO - JobID 189 - Booking already launched instances for nodes (x1) ['q4-dy-c4-1-4']: 2023-09-14 09:08:15,744 - [slurm_plugin.instance_manager:_launch_instances] - INFO - JobID 189 - Launching best-effort instances for nodes (x1) ['q4-dy-c4-2-2'] 2023-09-14 09:08:15,744 - [slurm_plugin.fleet_manager:create_fleet] - INFO - JobID 189 - Launching instances with create_fleet API. Parameters: {'LaunchTemplateConfigs': [{'LaunchTemplateSpecification': {'LaunchTemplateName': 'bootstrap-q4-c4-2', 'Version': '$Latest'}, 'Overrides': [{'InstanceType': 'p4d.24xlarge', 'SubnetId': 'subnet-0b48ed99988e56110'}]}], 'TargetCapacitySpecification': {'TotalTargetCapacity': 1, 'DefaultTargetCapacityType': 'on-demand'}, 'Type': 'instant', 'OnDemandOptions': {'AllocationStrategy': 'lowest-price', 'SingleInstanceType': True, 'SingleAvailabilityZone': True, 'MinTargetCapacity': 1, 'CapacityReservationOptions': {'UsageStrategy': 'use-capacity-reservations-first'}}} 2023-09-14 09:08:16,696 - [slurm_plugin.fleet_manager:_launch_instances] - ERROR - JobID 189 - Error in CreateFleet request (63180bc8-cad1-4754-a1c6-0b93fdd36461): InsufficientInstanceCapacity - We currently do not have sufficient p4d.24xlarge capacity in the Availability Zone you requested (us-east-1d). Our system will be working on provisioning additional capacity. You can currently get p4d.24xlarge capacity by not specifying an Availability Zone in your request or choosing us-east-1a, us-east-1b. 2023-09-14 09:08:16,814 - [slurm_plugin.instance_manager:_update_slurm_node_addrs] - INFO - JobID 189 - Nodes are now configured with instances: (x1) ["('q4-dy-c4-1-4', EC2Instance(id='i-0ad5528556d13495b', private_ip='192.168.104.152', hostname='ip-192-168-104-152', launch_time=datetime.datetime(2023, 9, 14, 9, 8, 11, tzinfo=tzlocal()), slurm_node=None))"] 2023-09-14 09:08:16,814 - [slurm_plugin.instance_manager:_store_assigned_hostnames] - INFO - JobID 189 - Saving assigned hostnames in DynamoDB 2023-09-14 09:08:16,821 - [slurm_plugin.instance_manager:_store_assigned_hostnames] - INFO - JobID 189 - Database update: COMPLETED 2023-09-14 09:08:16,822 - [slurm_plugin.instance_manager:_update_dns_hostnames] - INFO - JobID 189 - Updating DNS records for Z09815256PBUS3QRIMRV - bootstrap.pcluster. 2023-09-14 09:08:16,950 - [slurm_plugin.instance_manager:_update_dns_hostnames] - INFO - JobID 189 - DNS records update: COMPLETED 2023-09-14 09:08:16,951 - [slurm_plugin.instance_manager:best_effort_node_assignment] - INFO - JobID 189 - Successful launched partial instances for nodes (x1) ['q4-dy-c4-1-4'] 2023-09-14 09:08:16,952 - [slurm_plugin.instance_manager:_add_instances_for_job] - INFO - JobID 190 - The job nodes_resume list is (x3) ['q4-dy-c4-1-8', 'q4-dy-c4-1-9', 'q4-dy-c4-1-10'] 2023-09-14 09:08:16,952 - [slurm_plugin.instance_manager:_resize_slurm_node_list] - INFO - JobID 190 - Booking already launched instances for nodes (x3) ['q4-dy-c4-1-8', 'q4-dy-c4-1-9', 'q4-dy-c4-1-10']: 2023-09-14 09:08:16,969 - [slurm_plugin.instance_manager:_update_slurm_node_addrs] - INFO - JobID 190 - Nodes are now configured with instances: (x3) ["('q4-dy-c4-1-8', EC2Instance(id='i-0365529c953588fab', private_ip='192.168.108.102', hostname='ip-192-168-108-102', launch_time=datetime.datetime(2023, 9, 14, 9, 8, 11, tzinfo=tzlocal()), slurm_node=None))", "('q4-dy-c4-1-9', EC2Instance(id='i-03a19e86c0d73e84b', private_ip='192.168.105.222', hostname='ip-192-168-105-222', launch_time=datetime.datetime(2023, 9, 14, 9, 8, 11, tzinfo=tzlocal()), slurm_node=None))", "('q4-dy-c4-1-10', EC2Instance(id='i-05b6109e7c0940a9c', private_ip='192.168.111.72', hostname='ip-192-168-111-72', launch_time=datetime.datetime(2023, 9, 14, 9, 8, 11, tzinfo=tzlocal()), slurm_node=None))"] 2023-09-14 09:08:16,970 - [slurm_plugin.instance_manager:_store_assigned_hostnames] - INFO - JobID 190 - Saving assigned hostnames in DynamoDB 2023-09-14 09:08:16,980 - [slurm_plugin.instance_manager:_store_assigned_hostnames] - INFO - JobID 190 - Database update: COMPLETED 2023-09-14 09:08:16,980 - [slurm_plugin.instance_manager:_update_dns_hostnames] - INFO - JobID 190 - Updating DNS records for Z09815256PBUS3QRIMRV - bootstrap.pcluster. 2023-09-14 09:08:17,141 - [slurm_plugin.instance_manager:_update_dns_hostnames] - INFO - JobID 190 - DNS records update: COMPLETED 2023-09-14 09:08:17,142 - [slurm_plugin.instance_manager:best_effort_node_assignment] - INFO - JobID 190 - Successful launched all instances for nodes (x3) ['q4-dy-c4-1-8', 'q4-dy-c4-1-9', 'q4-dy-c4-1-10'] 2023-09-14 09:08:17,142 - [slurm_plugin.resume:_resume] - INFO - Successfully launched nodes (x7) ['q4-dy-c4-1-1', 'q4-dy-c4-1-2', 'q4-dy-c4-1-3', 'q4-dy-c4-1-4', 'q4-dy-c4-1-8', 'q4-dy-c4-1-9', 'q4-dy-c4-1-10'] 2023-09-14 09:08:17,143 - [slurm_plugin.resume:_resume] - ERROR - Failed to launch following nodes, setting nodes to DOWN: (x2) ['q4-dy-c4-2-1', 'q4-dy-c4-2-2'] 2023-09-14 09:08:17,143 - [slurm_plugin.resume:_handle_failed_nodes] - INFO - Setting following failed nodes into DOWN state (x2) ['q4-dy-c4-2-1', 'q4-dy-c4-2-2'] with reason: (Code:InsufficientInstanceCapacity)Failure when resuming nodes 2023-09-14 09:08:17,162 - [slurm_plugin.resume:main] - INFO - ResumeProgram finished. ``` Signed-off-by: Luca Carrogu --- src/slurm_plugin/fleet_manager.py | 8 +- src/slurm_plugin/instance_manager.py | 272 ++++-- src/slurm_plugin/resume.py | 15 +- tests/slurm_plugin/test_clustermgtd.py | 2 +- tests/slurm_plugin/test_instance_manager.py | 921 ++++++++++++++++++-- tests/slurm_plugin/test_resume.py | 2 +- 6 files changed, 1070 insertions(+), 150 deletions(-) diff --git a/src/slurm_plugin/fleet_manager.py b/src/slurm_plugin/fleet_manager.py index 28baf0db8..2e527fb06 100644 --- a/src/slurm_plugin/fleet_manager.py +++ b/src/slurm_plugin/fleet_manager.py @@ -19,6 +19,7 @@ from botocore.exceptions import ClientError from common.ec2_utils import get_private_ip_address_and_dns_name from common.utils import setup_logging_filter +from slurm_plugin.common import print_with_count logger = logging.getLogger(__name__) @@ -172,7 +173,12 @@ def launch_ec2_instances(self, count, job_id=None): launch_params = self._evaluate_launch_params(count) assigned_nodes = self._launch_instances(launch_params) - logger.debug("Launched the following instances: %s", assigned_nodes.get("Instances")) + if len(assigned_nodes.get("Instances")) > 0: + logger.info( + "Launched the following instances %s", + print_with_count([instance.get("InstanceId", "") for instance in assigned_nodes.get("Instances")]), + ) + logger.debug("Full launched instances information: %s", assigned_nodes.get("Instances")) return [EC2Instance.from_describe_instance_data(instance_info) for instance_info in assigned_nodes["Instances"]] diff --git a/src/slurm_plugin/instance_manager.py b/src/slurm_plugin/instance_manager.py index 26846b3bc..7d1547274 100644 --- a/src/slurm_plugin/instance_manager.py +++ b/src/slurm_plugin/instance_manager.py @@ -55,6 +55,14 @@ class HostnameDnsStoreError(Exception): """Raised when error occurs while writing into hostname DNS.""" +class InstanceToNodeAssignmentError(Exception): + """Raised when error occurs while assigning EC2 instance to Slurm node.""" + + +class NodeAddrUpdateError(Exception): + """Raised when error occurs while updating NodeAddrs in Slurm node.""" + + class InstanceManagerFactory: @staticmethod def get_manager( @@ -173,7 +181,7 @@ def _add_instances_for_nodes( all_or_nothing_batch: bool = False, ): """Launch requested EC2 instances for nodes.""" - nodes_to_launch = self._parse_requested_nodes(node_list) + nodes_to_launch = self._parse_nodes_resume_list(node_list) for queue, compute_resources in nodes_to_launch.items(): for compute_resource, slurm_node_list in compute_resources.items(): logger.info("Launching instances for Slurm nodes %s", print_with_count(slurm_node_list)) @@ -242,7 +250,7 @@ def _update_slurm_node_addrs_and_failed_nodes(self, slurm_nodes: List[str], laun nodehostnames=node_hostnames, ) logger.info( - "Nodes are now configured with instances: %s", + "Nodes are now configured with instances %s", print_with_count(zip(launched_nodes, launched_instances)), ) if fail_launch_nodes: @@ -341,7 +349,7 @@ def _update_dns_hostnames(self, nodes, update_dns_batch_size=500): ) logger.info("DNS records update: COMPLETED") - def _parse_requested_nodes(self, node_list: List[str]) -> defaultdict[str, defaultdict[str, List[str]]]: + def _parse_nodes_resume_list(self, node_list: List[str]) -> defaultdict[str, defaultdict[str, List[str]]]: """ Parse out which launch configurations (queue/compute resource) are requested by slurm nodes from NodeName. @@ -669,6 +677,7 @@ def _scaling_for_jobs( assign_node_batch_size: int, terminate_batch_size: int, update_node_address: bool, + all_or_nothing_batch: bool, ) -> None: """Scaling for job list.""" # Setup custom logging filter @@ -683,7 +692,7 @@ def _scaling_for_jobs( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=True, + all_or_nothing_batch=all_or_nothing_batch, ) self._terminate_unassigned_launched_instances(terminate_batch_size) @@ -711,6 +720,7 @@ def _scaling_for_jobs_single_node( assign_node_batch_size: int, terminate_batch_size: int, update_node_address: bool, + all_or_nothing_batch: bool, ) -> None: """Scaling for job single node list.""" if job_list: @@ -722,6 +732,7 @@ def _scaling_for_jobs_single_node( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) else: # Batch all single node no oversubscribe jobs in a single best-effort EC2 launch request @@ -768,6 +779,7 @@ def _add_instances_for_resume_file( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) self._scaling_for_jobs_multi_node( @@ -777,6 +789,7 @@ def _add_instances_for_resume_file( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) if not self.temp_jls_for_node_sharing: @@ -793,15 +806,22 @@ def _add_instances_for_resume_file( ) def _scaling_for_jobs_multi_node( - self, job_list, node_list, launch_batch_size, assign_node_batch_size, terminate_batch_size, update_node_address + self, + job_list, + node_list, + launch_batch_size, + assign_node_batch_size, + terminate_batch_size, + update_node_address, + all_or_nothing_batch: bool, ): # Optimize job level scaling with preliminary scale-all nodes attempt self._update_dict( self.unused_launched_instances, self._launch_instances( - nodes_to_launch=self._parse_requested_nodes(node_list), + nodes_to_launch=self._parse_nodes_resume_list(node_list), launch_batch_size=launch_batch_size, - all_or_nothing_batch=True, + all_or_nothing_batch=all_or_nothing_batch, ), ) @@ -811,6 +831,7 @@ def _scaling_for_jobs_multi_node( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) def _get_slurm_resume_data(self, slurm_resume: Dict[str, any], node_list: List[str]) -> SlurmResumeData: @@ -948,83 +969,147 @@ def _add_instances_for_job( update_node_address: bool = True, all_or_nothing_batch: bool = True, ): + """Launch requested EC2 instances for nodes.""" + logger.info("The nodes_resume list from Slurm Resume File is %s", print_with_count(job.nodes_resume)) + + nodes_resume_mapping = self._parse_nodes_resume_list(node_list=job.nodes_resume) + # nodes in the resume list, mapped for queues and compute resources, e.g. + # { + # queue_1: {cr_1: [nodes_1, nodes_2, nodes_3], cr_2: [nodes_4]}, + # queue_2: {cr_3: [nodes_5]} + # } + + nodes_resume_list = [] + for compute_resources in nodes_resume_mapping.values(): + for node_list in compute_resources.values(): + nodes_resume_list.extend(node_list) + # nodes in the resume flattened list, e.g. + # [nodes_1, nodes_2, nodes_3, nodes_4, nodes_5] + + instances_launched = self._launch_instances( + job=job, + nodes_to_launch=nodes_resume_mapping, + launch_batch_size=launch_batch_size, + all_or_nothing_batch=all_or_nothing_batch, + ) + # instances launched, e.g. + # { + # queue_1: {cr_1: list[EC2Instance], cr_2: list[EC2Instance], + # queue_2: {cr_3: list[EC2Instance]} + # } + + successful_launched_nodes = [] + failed_launch_nodes = [] + for queue, compute_resources in nodes_resume_mapping.items(): + for compute_resource, slurm_node_list in compute_resources.items(): + q_cr_instances_launched_length = len(instances_launched.get(queue, {}).get(compute_resource, [])) + successful_launched_nodes += slurm_node_list[:q_cr_instances_launched_length] + failed_launch_nodes += slurm_node_list[q_cr_instances_launched_length:] if all_or_nothing_batch: - """Launch requested EC2 instances for nodes.""" - logger.info("Adding instances for nodes %s", print_with_count(job.nodes_resume)) - - nodes_to_launch = self._parse_requested_nodes(node_list=job.nodes_resume) - # nodes to launch, e.g. - # { - # queue_1: {cr_1: [nodes_1, nodes_2, nodes_3], cr_2: [nodes_4]}, - # queue_2: {cr_3: [nodes_5]} - # } - - parsed_requested_node = [] - for compute_resources in nodes_to_launch.values(): - for node_list in compute_resources.values(): - parsed_requested_node.extend(node_list) - # parsed requested node, e.g. - # [nodes_1, nodes_2, nodes_3, nodes_4, nodes_5] + self.all_or_nothing_node_assignment( + assign_node_batch_size=assign_node_batch_size, + instances_launched=instances_launched, + nodes_resume_list=nodes_resume_list, + nodes_resume_mapping=nodes_resume_mapping, + successful_launched_nodes=successful_launched_nodes, + update_node_address=update_node_address, + ) + else: + self.best_effort_node_assignment( + assign_node_batch_size=assign_node_batch_size, + failed_launch_nodes=failed_launch_nodes, + instances_launched=instances_launched, + nodes_resume_list=nodes_resume_list, + nodes_resume_mapping=nodes_resume_mapping, + successful_launched_nodes=successful_launched_nodes, + update_node_address=update_node_address, + ) - instances_launched = self._launch_instances( - job=job, - nodes_to_launch=nodes_to_launch, - launch_batch_size=launch_batch_size, - all_or_nothing_batch=all_or_nothing_batch, + def best_effort_node_assignment( + self, + assign_node_batch_size, + failed_launch_nodes, + instances_launched, + nodes_resume_list, + nodes_resume_mapping, + successful_launched_nodes, + update_node_address, + ): + # best-effort job level scaling + if 0 < len(successful_launched_nodes) <= len(nodes_resume_list): + # All or partial requested EC2 capacity for the Job has been launched + # Assign launched EC2 instances to the requested Slurm nodes + self._assign_instances_to_nodes( + update_node_address=update_node_address, + nodes_to_launch=nodes_resume_mapping, + instances_launched=instances_launched, + assign_node_batch_size=assign_node_batch_size, + raise_on_error=False, ) - # instances launched, e.g. - # { - # queue_1: {cr_1: list[EC2Instance], cr_2: list[EC2Instance], - # queue_2: {cr_3: list[EC2Instance]} - # } - number_of_launched_instances = sum( - len(instance_list) - for compute_resources in instances_launched.values() - for instance_list in compute_resources.values() + logger.info( + "Successful launched %s instances for nodes %s", + "all" if len(successful_launched_nodes) == len(nodes_resume_list) else "partial", + print_with_count(successful_launched_nodes), ) - if number_of_launched_instances == len(parsed_requested_node): - # All requested capacity for the Job has been launched - # Assign launched EC2 instances to the requested Slurm nodes - try: - logger.info( - "Successful launched all instances for nodes %s", - print_with_count(parsed_requested_node), - ) - self._assign_instances_to_nodes( - update_node_address=update_node_address, - nodes_to_launch=nodes_to_launch, - instances_launched=instances_launched, - assign_node_batch_size=assign_node_batch_size, - ) - except (HostnameDnsStoreError, HostnameTableStoreError): - # Failed to assign EC2 instances to nodes - # EC2 Instances already assigned, are going to be terminated by - # setting the nodes into DOWN. - # EC2 instances not yet assigned, are going to fail during bootstrap, - # because no entry in the DynamoDB table would be found - self._update_failed_nodes(set(parsed_requested_node)) - elif 0 < number_of_launched_instances < len(parsed_requested_node): - # Try to reuse partial capacity of already launched EC2 instances + + if len(successful_launched_nodes) < len(nodes_resume_list): + # set limited capacity on the failed to launch nodes + self._update_failed_nodes(set(failed_launch_nodes), "LimitedInstanceCapacity", override=False) + else: + # No instances launched at all, e.g. CreateFleet API returns no EC2 instances + self._update_failed_nodes(set(nodes_resume_list), "InsufficientInstanceCapacity", override=False) + + def all_or_nothing_node_assignment( + self, + assign_node_batch_size, + instances_launched, + nodes_resume_list, + nodes_resume_mapping, + successful_launched_nodes, + update_node_address, + ): + # all-or-nothing job level scaling + if len(successful_launched_nodes) == len(nodes_resume_list): + # All requested EC2 capacity for the Job has been launched + # Assign launched EC2 instances to the requested Slurm nodes + try: + self._assign_instances_to_nodes( + update_node_address=update_node_address, + nodes_to_launch=nodes_resume_mapping, + instances_launched=instances_launched, + assign_node_batch_size=assign_node_batch_size, + raise_on_error=True, + ) logger.info( - "Launched instances that can be reused %s", - print_with_count( - [ - (queue, compute_resource, instance) - for queue, compute_resources in instances_launched.items() - for compute_resource, instances in compute_resources.items() - for instance in instances - ] - ), + "Successful launched all instances for nodes %s", + print_with_count(nodes_resume_list), ) - self._update_dict(self.unused_launched_instances, instances_launched) - self._update_failed_nodes(set(parsed_requested_node), "LimitedInstanceCapacity", override=False) - else: - # No instances launched at all, e.g. CreateFleet API returns no EC2 instances - self._update_failed_nodes(set(parsed_requested_node), "InsufficientInstanceCapacity", override=False) + except InstanceToNodeAssignmentError: + # Failed to assign EC2 instances to nodes + # EC2 Instances already assigned, are going to be terminated by + # setting the nodes into DOWN. + # EC2 instances not yet assigned, are going to fail during bootstrap, + # because no entry in the DynamoDB table would be found + self._update_failed_nodes(set(nodes_resume_list)) + elif 0 < len(successful_launched_nodes) < len(nodes_resume_list): + # Try to reuse partial capacity of already launched EC2 instances + logger.info( + "Releasing booked instances %s", + print_with_count( + [ + (queue, compute_resource, instance) + for queue, compute_resources in instances_launched.items() + for compute_resource, instances in compute_resources.items() + for instance in instances + ] + ), + ) + self._update_dict(self.unused_launched_instances, instances_launched) + self._update_failed_nodes(set(nodes_resume_list), "LimitedInstanceCapacity", override=False) else: - # Not implemented, never goes here - logger.error("Best effort job level scaling not implemented") + # No instances launched at all, e.g. CreateFleet API returns no EC2 instances + self._update_failed_nodes(set(nodes_resume_list), "InsufficientInstanceCapacity", override=False) def _launch_instances( self, @@ -1044,7 +1129,11 @@ def _launch_instances( ) if slurm_node_list: - logger.info("Launching instances for nodes %s", print_with_count(slurm_node_list)) + logger.info( + "Launching %s instances for nodes %s", + "all-or-nothing" if all_or_nothing_batch else "best-effort", + print_with_count(slurm_node_list), + ) fleet_manager = self._get_fleet_manager(all_or_nothing_batch, compute_resource, queue) for batch_nodes in grouper(slurm_node_list, launch_batch_size): @@ -1070,6 +1159,7 @@ def _launch_instances( if isinstance(e, ClientError): update_failed_nodes_parameters["error_code"] = e.response.get("Error", {}).get("Code") self._update_failed_nodes(**update_failed_nodes_parameters) + # TODO pay attention to this in the best-effort case if job and all_or_nothing_batch: # When launching instances for a specific Job, @@ -1128,7 +1218,7 @@ def _resize_slurm_node_list( # Reuse already launched capacity # fmt: off logger.info( - "Reusing already launched instances for nodes %s:", + "Booking already launched instances for nodes %s:", print_with_count(slurm_node_list[:len(reusable_instances)]), ) instances_launched[queue][compute_resource].extend(reusable_instances[:len(slurm_node_list)]) @@ -1145,6 +1235,7 @@ def _assign_instances_to_nodes( nodes_to_launch: Dict[str, any], instances_launched: Dict[str, any], assign_node_batch_size: int, + raise_on_error: bool, ): if update_node_address: for queue, compute_resources in nodes_to_launch.items(): @@ -1152,10 +1243,22 @@ def _assign_instances_to_nodes( launched_ec2_instances = instances_launched.get(queue, {}).get(compute_resource, []) for batch in grouper(list(zip(slurm_node_list, launched_ec2_instances)), assign_node_batch_size): - batch_nodes, batch_launched_ec2_instances = zip(*batch) - assigned_nodes = self._update_slurm_node_addrs(list(batch_nodes), batch_launched_ec2_instances) - self._store_assigned_hostnames(assigned_nodes) - self._update_dns_hostnames(assigned_nodes, assign_node_batch_size) + batch_nodes = [] + try: + batch_nodes, batch_launched_ec2_instances = zip(*batch) + assigned_nodes = self._update_slurm_node_addrs( + slurm_nodes=list(batch_nodes), launched_instances=batch_launched_ec2_instances + ) + self._store_assigned_hostnames(nodes=assigned_nodes) + self._update_dns_hostnames( + nodes=assigned_nodes, update_dns_batch_size=assign_node_batch_size + ) + except (NodeAddrUpdateError, HostnameTableStoreError, HostnameDnsStoreError): + if raise_on_error: + raise InstanceToNodeAssignmentError + + # Update the batch of failed node and continue + self._update_failed_nodes(set(batch_nodes)) def _update_slurm_node_addrs(self, slurm_nodes: List[str], launched_instances: List[EC2Instance]): """Update node information in slurm with info from launched EC2 instance.""" @@ -1173,7 +1276,7 @@ def _update_slurm_node_addrs(self, slurm_nodes: List[str], launched_instances: L nodehostnames=node_hostnames, ) logger.info( - "Nodes are now configured with instances: %s", + "Nodes are now configured with instances %s", print_with_count(zip(slurm_nodes, launched_instances)), ) @@ -1185,8 +1288,7 @@ def _update_slurm_node_addrs(self, slurm_nodes: List[str], launched_instances: L print_with_count(slurm_nodes), print_with_count(launched_instances), ) - self._update_failed_nodes(set(slurm_nodes)) - return {} + raise NodeAddrUpdateError class NodeListScalingInstanceManager(InstanceManager): diff --git a/src/slurm_plugin/resume.py b/src/slurm_plugin/resume.py index 6300bd572..e62a8f249 100644 --- a/src/slurm_plugin/resume.py +++ b/src/slurm_plugin/resume.py @@ -45,7 +45,7 @@ class SlurmResumeConfig: "run_instances_overrides": "/opt/slurm/etc/pcluster/run_instances_overrides.json", "create_fleet_overrides": "/opt/slurm/etc/pcluster/create_fleet_overrides.json", "fleet_config_file": "/etc/parallelcluster/slurm_plugin/fleet-config.json", - "all_or_nothing_batch": False, + "all_or_nothing_batch": True, "job_level_scaling": True, "temp_jls_for_node_sharing": False, } @@ -158,10 +158,17 @@ def _handle_failed_nodes(node_list, reason="Failure when resuming nodes"): Clustermgtd will be responsible for running full DOWN -> POWER_DOWN process. """ try: - log.info("Setting following failed nodes into DOWN state: %s", print_with_count(node_list)) + log.info( + "Setting following failed nodes into DOWN state %s with reason: %s", print_with_count(node_list), reason + ) set_nodes_down(node_list, reason=reason) except Exception as e: - log.error("Failed to place nodes %s into down with exception: %s", print_with_count(node_list), e) + log.error( + "Failed to place nodes %s into DOWN for reason %s with exception: %s", + print_with_count(node_list), + reason, + e, + ) def _resume(arg_nodes, resume_config, slurm_resume): @@ -218,7 +225,7 @@ def _resume(arg_nodes, resume_config, slurm_resume): if failed_nodes: log.error( - "Failed to launch following nodes, setting nodes to down: %s", + "Failed to launch following nodes, setting nodes to DOWN: %s", print_with_count(failed_nodes), ) for error_code, node_list in instance_manager.failed_nodes.items(): diff --git a/tests/slurm_plugin/test_clustermgtd.py b/tests/slurm_plugin/test_clustermgtd.py index b582090de..8716a6194 100644 --- a/tests/slurm_plugin/test_clustermgtd.py +++ b/tests/slurm_plugin/test_clustermgtd.py @@ -1096,7 +1096,7 @@ def test_handle_unhealthy_static_nodes( # Mock associated function cluster_manager._instance_manager.delete_instances = mocker.MagicMock() - cluster_manager._instance_manager._parse_requested_nodes = mocker.MagicMock( + cluster_manager._instance_manager._parse_nodes_resume_list = mocker.MagicMock( return_value={ "queue1": { "c5xlarge": [ diff --git a/tests/slurm_plugin/test_instance_manager.py b/tests/slurm_plugin/test_instance_manager.py index 075476d99..8cb2e7a14 100644 --- a/tests/slurm_plugin/test_instance_manager.py +++ b/tests/slurm_plugin/test_instance_manager.py @@ -23,7 +23,13 @@ import slurm_plugin from assertpy import assert_that from slurm_plugin.fleet_manager import EC2Instance -from slurm_plugin.instance_manager import HostnameDnsStoreError, InstanceManager, InstanceManagerFactory +from slurm_plugin.instance_manager import ( + HostnameDnsStoreError, + InstanceManager, + InstanceManagerFactory, + InstanceToNodeAssignmentError, + NodeAddrUpdateError, +) from slurm_plugin.slurm_resources import ( EC2_HEALTH_STATUS_UNHEALTHY_STATES, EC2_INSTANCE_ALIVE_STATES, @@ -640,7 +646,7 @@ def test_update_dns_hostnames( def test_parse_requested_instances( self, node_list, expected_results, expected_failed_nodes, instance_manager, job_level_scaling ): - assert_that(instance_manager._parse_requested_nodes(node_list)).is_equal_to(expected_results) + assert_that(instance_manager._parse_nodes_resume_list(node_list)).is_equal_to(expected_results) assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) @pytest.mark.parametrize( @@ -1818,6 +1824,7 @@ def test_add_instances_for_resume_file( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) instance_manager._scaling_for_jobs_multi_node.assert_any_call( job_list=expected_jobs_multi_node_no_oversubscribe, @@ -1826,6 +1833,7 @@ def test_add_instances_for_resume_file( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) instance_manager._scaling_for_nodes.assert_any_call( node_list=expected_nodes_oversubscribe, @@ -2187,14 +2195,30 @@ def test_parse_slurm_resume( @pytest.mark.parametrize( "update_node_address, nodes_to_launch, instances_launched, assign_node_batch_size, " - "expected_update_slurm_node_addrs_calls", + "raise_on_error, expected_update_slurm_node_addrs_calls, expected_exception, " + "failed_nodes, expected_failed_nodes", [ ( False, {}, {}, 0, + False, + None, + None, + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}}, + ), + ( + False, + {}, + {}, + 0, + True, + None, None, + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}}, ), ( True, @@ -2213,16 +2237,20 @@ def test_parse_slurm_resume( }, }, 1, + False, [ call( - ["q1-q1c1-st-large-1"], - ( + slurm_nodes=["q1-q1c1-st-large-1"], + launched_instances=( EC2Instance( "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) ), ), ), ], + None, + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}}, ), ( True, @@ -2266,10 +2294,11 @@ def test_parse_slurm_resume( }, }, 10, + False, [ call( - ["q1-q1c1-st-large-1", "q1-q1c1-st-large-2"], - ( + slurm_nodes=["q1-q1c1-st-large-1", "q1-q1c1-st-large-2"], + launched_instances=( EC2Instance( "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) ), @@ -2279,16 +2308,16 @@ def test_parse_slurm_resume( ), ), call( - ["q1-q1c2-st-small-1"], - ( + slurm_nodes=["q1-q1c2-st-small-1"], + launched_instances=( EC2Instance( "i-123456", "ip.1.0.0.3", "ip-1-0-0-3", datetime(2020, 1, 1, tzinfo=timezone.utc) ), ), ), call( - ["q2-q2c1-st-large-1", "q2-q2c1-st-large-2", "q2-q2c1-st-large-3"], - ( + slurm_nodes=["q2-q2c1-st-large-1", "q2-q2c1-st-large-2", "q2-q2c1-st-large-3"], + launched_instances=( EC2Instance( "i-12347", "ip.1.0.0.4", "ip-1-0-0-4", datetime(2020, 1, 1, tzinfo=timezone.utc) ), @@ -2301,10 +2330,241 @@ def test_parse_slurm_resume( ), ), ], + None, + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}}, + ), + ( + True, + { + "q1": { + "q1c1": ["q1-q1c1-st-large-1"], + }, + }, + { + "q1": { + "q1c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ], + }, + }, + 1, + False, + [ + call( + slurm_nodes=["q1-q1c1-st-large-1"], + launched_instances=( + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + ], + [HostnameDnsStoreError()], + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}, "Exception": {"q1-q1c1-st-large-1"}}, + ), + ( + True, + { + "q1": { + "q1c1": ["q1-q1c1-st-large-1", "q1-q1c1-st-large-2"], + }, + }, + { + "q1": { + "q1c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ], + }, + }, + 1, + False, + [ + call( + slurm_nodes=["q1-q1c1-st-large-1"], + launched_instances=( + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + call( + slurm_nodes=["q1-q1c1-st-large-2"], + launched_instances=( + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + ], + [None, HostnameDnsStoreError()], + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}, "Exception": {"q1-q1c1-st-large-2"}}, + ), + ( + True, + { + "q1": { + "q1c1": ["q1-q1c1-st-large-1", "q1-q1c1-st-large-2"], + }, + }, + { + "q1": { + "q1c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ], + }, + }, + 1, + False, + [ + call( + slurm_nodes=["q1-q1c1-st-large-1"], + launched_instances=( + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + call( + slurm_nodes=["q1-q1c1-st-large-2"], + launched_instances=( + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + ], + [HostnameDnsStoreError(), None], + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}, "Exception": {"q1-q1c1-st-large-1"}}, + ), + ( + True, + { + "q1": { + "q1c1": ["q1-q1c1-st-large-1"], + }, + }, + { + "q1": { + "q1c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ], + }, + }, + 1, + True, + [ + call( + slurm_nodes=["q1-q1c1-st-large-1"], + launched_instances=( + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + ], + HostnameDnsStoreError(), + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}}, + ), + ( + True, + { + "q1": { + "q1c1": ["q1-q1c1-st-large-1", "q1-q1c1-st-large-2"], + }, + }, + { + "q1": { + "q1c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ], + }, + }, + 1, + True, + [ + call( + slurm_nodes=["q1-q1c1-st-large-1"], + launched_instances=( + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + call( + slurm_nodes=["q1-q1c1-st-large-2"], + launched_instances=( + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + ], + [None, HostnameDnsStoreError()], + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}}, + ), + ( + True, + { + "q1": { + "q1c1": ["q1-q1c1-st-large-1", "q1-q1c1-st-large-2"], + }, + }, + { + "q1": { + "q1c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ], + }, + }, + 1, + True, + [ + call( + slurm_nodes=["q1-q1c1-st-large-1"], + launched_instances=( + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ), + ), + ], + [HostnameDnsStoreError(), None], + {"Error": {"queueX-st-cxlarge-1"}}, + {"Error": {"queueX-st-cxlarge-1"}}, ), ], ) - def test_assign_instances_to_nodes( + def test_assign_instances_to_nodes( # TODO test the except path for multiple batches e.g. =2 self, mocker, instance_manager, @@ -2312,19 +2572,35 @@ def test_assign_instances_to_nodes( nodes_to_launch, instances_launched, assign_node_batch_size, + raise_on_error, expected_update_slurm_node_addrs_calls, + expected_exception, + failed_nodes, + expected_failed_nodes, ): # patch internal functions instance_manager._update_slurm_node_addrs = mocker.MagicMock() instance_manager._store_assigned_hostnames = mocker.MagicMock() - instance_manager._update_dns_hostnames = mocker.MagicMock() + instance_manager._update_dns_hostnames = mocker.MagicMock(side_effect=expected_exception) + instance_manager.failed_nodes = failed_nodes - instance_manager._assign_instances_to_nodes( - update_node_address=update_node_address, - nodes_to_launch=nodes_to_launch, - instances_launched=instances_launched, - assign_node_batch_size=assign_node_batch_size, - ) + if update_node_address and raise_on_error: + with pytest.raises(InstanceToNodeAssignmentError): + instance_manager._assign_instances_to_nodes( + update_node_address=update_node_address, + nodes_to_launch=nodes_to_launch, + instances_launched=instances_launched, + assign_node_batch_size=assign_node_batch_size, + raise_on_error=raise_on_error, + ) + else: + instance_manager._assign_instances_to_nodes( + update_node_address=update_node_address, + nodes_to_launch=nodes_to_launch, + instances_launched=instances_launched, + assign_node_batch_size=assign_node_batch_size, + raise_on_error=raise_on_error, + ) if not update_node_address: instance_manager._update_slurm_node_addrs.assert_not_called() @@ -2339,9 +2615,11 @@ def test_assign_instances_to_nodes( len(expected_update_slurm_node_addrs_calls) ) + assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) + @pytest.mark.parametrize( "node_list, launched_instances, use_private_hostname, expected_update_nodes, " - "expected_update_nodes_call, expected_failed_nodes, expected_return", + "expected_update_nodes_call, expected_return", [ ( ["queue1-st-c5xlarge-1"], @@ -2350,7 +2628,6 @@ def test_assign_instances_to_nodes( None, call(["queue1-st-c5xlarge-1"], nodeaddrs=[], nodehostnames=None), {}, - {}, ), ( ["queue1-st-c5xlarge-1"], @@ -2358,7 +2635,6 @@ def test_assign_instances_to_nodes( False, None, call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=None), - {}, { "queue1-st-c5xlarge-1": EC2Instance( id="id-1", private_ip="ip-1", hostname="hostname-1", launch_time="some_launch_time" @@ -2371,7 +2647,6 @@ def test_assign_instances_to_nodes( True, None, call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"]), - {}, { "queue1-st-c5xlarge-1": EC2Instance( id="id-1", private_ip="ip-1", hostname="hostname-1", launch_time="some_launch_time" @@ -2384,8 +2659,7 @@ def test_assign_instances_to_nodes( True, subprocess.CalledProcessError(1, "command"), call(["queue1-st-c5xlarge-1"], nodeaddrs=["ip-1"], nodehostnames=["hostname-1"]), - {"Exception": {"queue1-st-c5xlarge-1"}}, - {}, + NodeAddrUpdateError(), ), ( ["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2"], @@ -2396,7 +2670,6 @@ def test_assign_instances_to_nodes( False, None, call(["queue1-st-c5xlarge-1", "queue1-st-c5xlarge-2"], nodeaddrs=["ip-1", "ip-2"], nodehostnames=None), - {}, { "queue1-st-c5xlarge-1": EC2Instance( id="id-1", private_ip="ip-1", hostname="hostname-1", launch_time="some_launch_time" @@ -2415,7 +2688,6 @@ def test_update_slurm_node_addrs( use_private_hostname, expected_update_nodes, expected_update_nodes_call, - expected_failed_nodes, expected_return, instance_manager, mocker, @@ -2425,7 +2697,12 @@ def test_update_slurm_node_addrs( ) instance_manager._use_private_hostname = use_private_hostname - function_return = instance_manager._update_slurm_node_addrs(node_list, launched_instances) + if isinstance(expected_return, Exception): + with pytest.raises(expected_return.__class__): + instance_manager._update_slurm_node_addrs(node_list, launched_instances) + else: + function_return = instance_manager._update_slurm_node_addrs(node_list, launched_instances) + assert_that(function_return).is_equal_to(expected_return) if expected_update_nodes_call: mock_update_nodes.assert_called_once() @@ -2433,8 +2710,7 @@ def test_update_slurm_node_addrs( else: mock_update_nodes.assert_not_called() - assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) - assert_that(function_return).is_equal_to(expected_return) + assert_that(instance_manager.failed_nodes).is_empty() @pytest.mark.parametrize( "job, launch_batch_size, assign_node_batch_size, update_node_address, all_or_nothing_batch, " @@ -2448,13 +2724,13 @@ def test_update_slurm_node_addrs( 2, False, False, - {}, + {"queue4": {"c5xlarge": ["queue4-st-c5xlarge-1"]}}, {}, {}, {}, False, None, - {}, + {"InsufficientInstanceCapacity": {"queue4-st-c5xlarge-1"}}, ), ( SlurmResumeJob(140819, "queue4-st-c5xlarge-1", "queue4-st-c5xlarge-1", "NO"), @@ -2511,7 +2787,7 @@ def test_update_slurm_node_addrs( {}, {}, True, - HostnameDnsStoreError(), + InstanceToNodeAssignmentError(), {"Exception": {"queue4-st-c5xlarge-1"}}, ), ( @@ -2740,27 +3016,23 @@ def test_add_instances_for_job( job, launch_batch_size, assign_node_batch_size, update_node_address, all_or_nothing_batch ) - if not all_or_nothing_batch: - instance_manager._launch_instances.assert_not_called() - instance_manager._assign_instances_to_nodes.assert_not_called() - else: - instance_manager._launch_instances.assert_called_once_with( - job=job, - nodes_to_launch=expected_nodes_to_launch, - launch_batch_size=launch_batch_size, - all_or_nothing_batch=all_or_nothing_batch, - ) + instance_manager._launch_instances.assert_called_once_with( + job=job, + nodes_to_launch=expected_nodes_to_launch, + launch_batch_size=launch_batch_size, + all_or_nothing_batch=all_or_nothing_batch, + ) - assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances) - if expect_assign_instances_to_nodes_called: - instance_manager._assign_instances_to_nodes.assert_called_once() - if expect_assign_instances_to_nodes_failure: - assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) - else: - assert_that(instance_manager.failed_nodes).is_empty() - else: - instance_manager._assign_instances_to_nodes.assert_not_called() + assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances) + if expect_assign_instances_to_nodes_called: + instance_manager._assign_instances_to_nodes.assert_called_once() + if expect_assign_instances_to_nodes_failure: assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) + else: + assert_that(instance_manager.failed_nodes).is_empty() + else: + instance_manager._assign_instances_to_nodes.assert_not_called() + assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) @pytest.mark.parametrize( "job, nodes_to_launch, launch_batch_size, unused_launched_instances, launched_instances, " @@ -3074,7 +3346,7 @@ def test_launch_instances( @pytest.mark.parametrize( "job_list, launch_batch_size, assign_node_batch_size, terminate_batch_size, update_node_address, " - "expected_single_nodes_no_oversubscribe", + "expected_single_nodes_no_oversubscribe, all_or_nothing_batch", [ ( [], @@ -3083,6 +3355,7 @@ def test_launch_instances( 3, True, [], + True, ), ( [ @@ -3093,6 +3366,7 @@ def test_launch_instances( 3, True, [], + False, ), ( [ @@ -3104,6 +3378,19 @@ def test_launch_instances( 3, True, ["queue4-st-c5xlarge-1", "queue4-st-c5xlarge-2"], + False, + ), + ( + [ + SlurmResumeJob(140819, "queue4-st-c5xlarge-1", "queue4-st-c5xlarge-1", "NO"), + SlurmResumeJob(140820, "queue4-st-c5xlarge-2", "queue4-st-c5xlarge-2", "NO"), + ], + 1, + 2, + 3, + True, + ["queue4-st-c5xlarge-1", "queue4-st-c5xlarge-2"], + True, ), ], ) @@ -3117,6 +3404,7 @@ def test_scaling_for_jobs_single_node( terminate_batch_size, update_node_address, expected_single_nodes_no_oversubscribe, + all_or_nothing_batch, ): # patch internal functions instance_manager._scaling_for_jobs = mocker.MagicMock() @@ -3128,6 +3416,7 @@ def test_scaling_for_jobs_single_node( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) if not job_list: instance_manager._scaling_for_jobs.assert_not_called() @@ -3139,6 +3428,7 @@ def test_scaling_for_jobs_single_node( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) instance_manager._scaling_for_nodes.assert_not_called() if len(job_list) > 1: @@ -3151,9 +3441,10 @@ def test_scaling_for_jobs_single_node( ) @pytest.mark.parametrize( - "job_list, launch_batch_size, assign_node_batch_size, terminate_batch_size, " "update_node_address", + "job_list, launch_batch_size, assign_node_batch_size, terminate_batch_size, update_node_address, " + "all_or_nothing_batch", [ - ([], 1, 2, 3, True), + ([], 1, 2, 3, True, False), ( [ SlurmResumeJob( @@ -3167,6 +3458,7 @@ def test_scaling_for_jobs_single_node( 2, 1, True, + True, ), ( [ @@ -3187,6 +3479,7 @@ def test_scaling_for_jobs_single_node( 1, 3, False, + True, ), ], ) @@ -3199,6 +3492,7 @@ def test_scaling_for_jobs( assign_node_batch_size, terminate_batch_size, update_node_address, + all_or_nothing_batch, ): # patch internal functions instance_manager._terminate_unassigned_launched_instances = mocker.MagicMock() @@ -3213,6 +3507,7 @@ def test_scaling_for_jobs( assign_node_batch_size=assign_node_batch_size, terminate_batch_size=terminate_batch_size, update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, ) if not job_list: @@ -3224,7 +3519,7 @@ def test_scaling_for_jobs( launch_batch_size=launch_batch_size, assign_node_batch_size=assign_node_batch_size, update_node_address=update_node_address, - all_or_nothing_batch=True, + all_or_nothing_batch=all_or_nothing_batch, ) setup_logging_filter.return_value.__enter__.return_value.set_custom_value.assert_any_call(job.job_id) assert_that( @@ -3366,7 +3661,7 @@ def test_scaling_for_nodes( ) if not node_list: - instance_manager._add_instances_for_nodes.assert_not_called() + instance_manager._add_instances_for_nodes = mocker.MagicMock() else: instance_manager._add_instances_for_nodes.assert_called_once_with( node_list=node_list, @@ -3626,6 +3921,516 @@ def test_update_dict(self, instance_manager, target_dict, update, expected_dict) actual_dict = instance_manager._update_dict(target_dict, update) assert_that(actual_dict).is_equal_to(expected_dict) + @pytest.mark.parametrize( + "assign_node_batch_size, failed_launch_nodes, instances_launched, nodes_resume_list, nodes_resume_mapping, " + "successful_launched_nodes, update_node_address, expected_failed_nodes", + [ + ( + 1, + [], + {}, + [], + {}, + [], + False, + {}, + ), + ( + 2, + [], + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ], + "c2": [ + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ], + } + }, + ["q1-dy-c1-1", "q1-dy-c2-1"], + {"q1": {"c1": ["q1-dy-c1-1"], "c2": ["q1-dy-c2-1"]}}, + ["q1-dy-c1-1", "q1-dy-c2-1"], + True, + {}, + ), + ( + 3, + ["q1-dy-c2-1"], + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ], + } + }, + ["q1-dy-c1-1", "q1-dy-c2-1"], + {"q1": {"c1": ["q1-dy-c1-1"], "c2": ["q1-dy-c2-1"]}}, + ["q1-dy-c1-1"], + True, + {"LimitedInstanceCapacity": {"q1-dy-c2-1"}}, + ), + ( + 4, + ["q1-dy-c1-1", "q1-dy-c2-1"], + {}, + ["q1-dy-c1-1", "q1-dy-c2-1"], + {"q1": {"c1": ["q1-dy-c1-1"], "c2": ["q1-dy-c2-1"]}}, + [], + True, + {"InsufficientInstanceCapacity": {"q1-dy-c1-1", "q1-dy-c2-1"}}, + ), + ], + ) + def test_best_effort_node_assignment( + self, + mocker, + instance_manager, + assign_node_batch_size, + failed_launch_nodes, + instances_launched, + nodes_resume_list, + nodes_resume_mapping, + successful_launched_nodes, + update_node_address, + expected_failed_nodes, + ): + # patch internal functions + instance_manager._assign_instances_to_nodes = mocker.MagicMock() + + instance_manager.best_effort_node_assignment( + assign_node_batch_size=assign_node_batch_size, + failed_launch_nodes=failed_launch_nodes, + instances_launched=instances_launched, + nodes_resume_list=nodes_resume_list, + nodes_resume_mapping=nodes_resume_mapping, + successful_launched_nodes=successful_launched_nodes, + update_node_address=update_node_address, + ) + + if len(successful_launched_nodes) == 0: + instance_manager._assign_instances_to_nodes.assert_not_called() + else: + instance_manager._assign_instances_to_nodes.assert_called_with( + update_node_address=update_node_address, + nodes_to_launch=nodes_resume_mapping, + instances_launched=instances_launched, + assign_node_batch_size=assign_node_batch_size, + raise_on_error=False, + ) + + assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) + + @pytest.mark.parametrize( + "assign_node_batch_size, instances_launched, nodes_resume_list, nodes_resume_mapping, " + "successful_launched_nodes, update_node_address, expected_failed_nodes, " + "expect_assign_instances_to_nodes_failure, unused_launched_instances, expected_unused_launched_instances", + [ + ( + 1, + {}, + [], + {}, + [], + False, + {}, + None, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + ), + ( + 2, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ], + "c2": [ + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ], + } + }, + ["q1-dy-c1-1", "q1-dy-c2-1"], + {"q1": {"c1": ["q1-dy-c1-1"], "c2": ["q1-dy-c2-1"]}}, + ["q1-dy-c1-1", "q1-dy-c2-1"], + True, + {}, + None, + {}, + {}, + ), + ( + 3, + { + "q1": { + "c1": [ + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ], + } + }, + ["q1-dy-c1-1", "q1-dy-c2-1"], + {"q1": {"c1": ["q1-dy-c1-1"], "c2": ["q1-dy-c2-1"]}}, + ["q1-dy-c1-1"], + True, + {"LimitedInstanceCapacity": {"q1-dy-c1-1", "q1-dy-c2-1"}}, + None, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + } + }, + ), + ( + 4, + {}, + ["q1-dy-c1-1", "q1-dy-c2-1"], + {"q1": {"c1": ["q1-dy-c1-1"], "c2": ["q1-dy-c2-1"]}}, + [], + True, + {"InsufficientInstanceCapacity": {"q1-dy-c1-1", "q1-dy-c2-1"}}, + None, + {}, + {}, + ), + ( + 5, + {}, + [], + {}, + [], + False, + {}, + InstanceToNodeAssignmentError(), + {}, + {}, + ), + ( + 6, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ], + "c2": [ + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ], + } + }, + ["q1-dy-c1-1", "q1-dy-c2-1"], + {"q1": {"c1": ["q1-dy-c1-1"], "c2": ["q1-dy-c2-1"]}}, + ["q1-dy-c1-1", "q1-dy-c2-1"], + True, + {"Exception": {"q1-dy-c1-1", "q1-dy-c2-1"}}, + InstanceToNodeAssignmentError(), + {}, + {}, + ), + ], + ) + def test_all_or_nothing_node_assignment( + self, + mocker, + instance_manager, + assign_node_batch_size, + instances_launched, + nodes_resume_list, + nodes_resume_mapping, + successful_launched_nodes, + update_node_address, + expected_failed_nodes, + expect_assign_instances_to_nodes_failure, + unused_launched_instances, + expected_unused_launched_instances, + ): + # patch internal functions + instance_manager._assign_instances_to_nodes = mocker.MagicMock( + side_effect=expect_assign_instances_to_nodes_failure + ) + instance_manager.unused_launched_instances = unused_launched_instances + + instance_manager.all_or_nothing_node_assignment( + assign_node_batch_size=assign_node_batch_size, + instances_launched=instances_launched, + nodes_resume_list=nodes_resume_list, + nodes_resume_mapping=nodes_resume_mapping, + successful_launched_nodes=successful_launched_nodes, + update_node_address=update_node_address, + ) + + if len(successful_launched_nodes) < len(nodes_resume_list): + instance_manager._assign_instances_to_nodes.assert_not_called() + else: + instance_manager._assign_instances_to_nodes.assert_called_with( + update_node_address=update_node_address, + nodes_to_launch=nodes_resume_mapping, + instances_launched=instances_launched, + assign_node_batch_size=assign_node_batch_size, + raise_on_error=True, + ) + + assert_that(instance_manager.failed_nodes).is_equal_to(expected_failed_nodes) + assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances) + + @pytest.mark.parametrize( + "job_list, " + "node_list, " + "launch_batch_size, " + "assign_node_batch_size, " + "terminate_batch_size, " + "update_node_address, " + "all_or_nothing_batch, " + "unused_launched_instances, " + "mock_launch_instances, " + "expected_unused_launched_instances", + [ + ( + [], + [], + 1, + 2, + 3, + False, + False, + {}, + {}, + {}, + ), + ( + [], + [], + 1, + 2, + 3, + True, + False, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + {}, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + ), + ( + [], + [], + 1, + 2, + 3, + False, + True, + {}, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + ), + ( + [], + [], + 1, + 2, + 3, + True, + True, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + { + "q1": { + "c1": [ + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + } + }, + ), + ( + [ + SlurmResumeJob( + job_id=140816, + nodes_alloc="queue3-st-c5xlarge-[7-10]", + nodes_resume="queue3-st-c5xlarge-[7-9]", + oversubscribe="NO", + ), + ], + ["queue4-st-c5xlarge-1"], + 3, + 2, + 1, + True, + True, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + { + "q2": { + "c2": [ + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ) + ] + } + }, + { + "q1": { + "c1": [ + EC2Instance( + "i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + }, + "q2": { + "c2": [ + EC2Instance( + "i-12346", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc) + ), + ] + }, + }, + ), + ], + ) + def test_scaling_for_jobs_multi_node( + self, + mocker, + instance_manager, + job_list, + node_list, + launch_batch_size, + assign_node_batch_size, + terminate_batch_size, + update_node_address, + all_or_nothing_batch, + unused_launched_instances, + mock_launch_instances, + expected_unused_launched_instances, + ): + # patch internal functions + instance_manager._launch_instances = mocker.MagicMock(return_value=mock_launch_instances) + instance_manager._scaling_for_jobs = mocker.MagicMock() + instance_manager.unused_launched_instances = unused_launched_instances + + instance_manager._scaling_for_jobs_multi_node( + job_list=job_list, + node_list=node_list, + launch_batch_size=launch_batch_size, + assign_node_batch_size=assign_node_batch_size, + terminate_batch_size=terminate_batch_size, + update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, + ) + + instance_manager._scaling_for_jobs.assert_called_once_with( + job_list=job_list, + launch_batch_size=launch_batch_size, + assign_node_batch_size=assign_node_batch_size, + terminate_batch_size=terminate_batch_size, + update_node_address=update_node_address, + all_or_nothing_batch=all_or_nothing_batch, + ) + + assert_that(instance_manager.unused_launched_instances).is_equal_to(expected_unused_launched_instances) + class TestNodeListScalingInstanceManager: @pytest.fixture @@ -4513,7 +5318,7 @@ def test_add_instances_for_nodes( # Mock _update_slurm_node_addrs_and_failed_nodes but still allow original code to execute original_update_func = instance_manager._update_slurm_node_addrs_and_failed_nodes instance_manager._update_slurm_node_addrs_and_failed_nodes = mocker.MagicMock(side_effect=original_update_func) - instance_manager._parse_requested_nodes = mocker.MagicMock(return_value=nodes_to_launch) + instance_manager._parse_nodes_resume_list = mocker.MagicMock(return_value=nodes_to_launch) # patch fleet manager calls mocker.patch.object( slurm_plugin.fleet_manager.Ec2RunInstancesManager, diff --git a/tests/slurm_plugin/test_resume.py b/tests/slurm_plugin/test_resume.py index a479699e2..e4d944eb9 100644 --- a/tests/slurm_plugin/test_resume.py +++ b/tests/slurm_plugin/test_resume.py @@ -50,7 +50,7 @@ def boto3_stubber_path(): "logging_config": os.path.join( os.path.dirname(slurm_plugin.__file__), "logging", "parallelcluster_resume_logging.conf" ), - "all_or_nothing_batch": False, + "all_or_nothing_batch": True, "clustermgtd_timeout": 300, "clustermgtd_heartbeat_file_path": "/home/ec2-user/clustermgtd_heartbeat", "job_level_scaling": True,