Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-3.8] Raise exception when CreateFleet doesn't return any instance #578 #579

Merged
merged 3 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/slurm_plugin/fleet_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ def __init__(self, message: str):
super().__init__(message)


class LaunchInstancesError(Exception):
"""Represent an error during the launch of EC2 instances."""

def __init__(self, code: str, message: str = ""):
self.code = code
super().__init__(message)


class FleetManagerFactory:
@staticmethod
def get_manager(
Expand Down Expand Up @@ -153,11 +161,11 @@ def __init__(

@abstractmethod
def _evaluate_launch_params(self, count):
pass
"""Evaluate parameters to be passed to run_instances call."""

@abstractmethod
def _launch_instances(self, launch_params):
pass
"""Launch a batch of ec2 instances."""

def launch_ec2_instances(self, count, job_id=None):
"""
Expand Down Expand Up @@ -361,7 +369,8 @@ def _launch_instances(self, launch_params):

instances = response.get("Instances", [])
log_level = logging.WARNING if instances else logging.ERROR
for err in response.get("Errors", []):
err_list = response.get("Errors", [])
for err in err_list:
logger.log(
log_level,
"Error in CreateFleet request (%s): %s - %s",
Expand All @@ -375,6 +384,8 @@ def _launch_instances(self, launch_params):
if partial_instance_ids:
logger.error("Unable to retrieve instance info for instances: %s", partial_instance_ids)

if not instances and len(err_list) == 1:
raise LaunchInstancesError(err_list[0].get("ErrorCode"), err_list[0].get("ErrorMessage"))
return {"Instances": instances}
except ClientError as e:
logger.error("Failed CreateFleet request: %s", e.response.get("ResponseMetadata", {}).get("RequestId"))
Expand Down
4 changes: 3 additions & 1 deletion src/slurm_plugin/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ def all_or_nothing_node_assignment(
# No instances launched at all, e.g. CreateFleet API returns no EC2 instances
self._update_failed_nodes(set(nodes_resume_list), "InsufficientInstanceCapacity", override=False)

def _launch_instances(
def _launch_instances( # noqa: C901
self,
nodes_to_launch: Dict[str, any],
launch_batch_size: int,
Expand Down Expand Up @@ -1016,6 +1016,8 @@ def _launch_instances(
update_failed_nodes_parameters = {"nodeset": set(batch_nodes)}
if isinstance(e, ClientError):
update_failed_nodes_parameters["error_code"] = e.response.get("Error", {}).get("Code")
elif isinstance(e, Exception) and hasattr(e, "code"):
update_failed_nodes_parameters["error_code"] = e.code
self._update_failed_nodes(**update_failed_nodes_parameters)

if job and all_or_nothing_batch:
Expand Down
184 changes: 117 additions & 67 deletions tests/slurm_plugin/test_fleet_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
import pytest
from assertpy import assert_that
from botocore.exceptions import ClientError
from slurm_plugin.fleet_manager import Ec2CreateFleetManager, EC2Instance, Ec2RunInstancesManager, FleetManagerFactory
from slurm_plugin.fleet_manager import (
Ec2CreateFleetManager,
EC2Instance,
Ec2RunInstancesManager,
FleetManagerFactory,
LaunchInstancesError,
)

from tests.common import FLEET_CONFIG, MockedBoto3Request

Expand Down Expand Up @@ -219,75 +225,78 @@ def test_launch_instances(self, boto3_stubber, launch_params, mocked_boto3_reque

# -------- Ec2CreateFleetManager ------

test_fleet_exception_params = {
"LaunchTemplateConfigs": [
{
"LaunchTemplateSpecification": {"LaunchTemplateName": "hit-queue1-fleet-spot", "Version": "$Latest"},
"Overrides": [
{
"InstanceRequirements": {
"VCpuCount": {"Min": 2},
"MemoryMiB": {"Min": 2048},
"AllowedInstanceTypes": ["inf*"],
"AcceleratorManufacturers": ["nvidia"],
}
}
],
}
],
"SpotOptions": {
"AllocationStrategy": "capacity-optimized",
"SingleInstanceType": False,
"SingleAvailabilityZone": True,
"MinTargetCapacity": 1,
},
"TargetCapacitySpecification": {"TotalTargetCapacity": 5, "DefaultTargetCapacityType": "spot"},
"Type": "instant",
}

test_fleet_spot_params = {
"LaunchTemplateConfigs": [
{
"LaunchTemplateSpecification": {"LaunchTemplateName": "hit-queue1-fleet-spot", "Version": "$Latest"},
"Overrides": [
{"MaxPrice": "10", "InstanceType": "t2.medium", "SubnetId": "1234567"},
{"MaxPrice": "10", "InstanceType": "t2.large", "SubnetId": "1234567"},
],
}
],
"SpotOptions": {
"AllocationStrategy": "capacity-optimized",
"SingleInstanceType": False,
"SingleAvailabilityZone": True,
"MinTargetCapacity": 1,
},
"TargetCapacitySpecification": {"TotalTargetCapacity": 5, "DefaultTargetCapacityType": "spot"},
"Type": "instant",
}
class TestEc2CreateFleetManager:
test_fleet_exception_params = {
"LaunchTemplateConfigs": [
{
"LaunchTemplateSpecification": {"LaunchTemplateName": "hit-queue1-fleet-spot", "Version": "$Latest"},
"Overrides": [
{
"InstanceRequirements": {
"VCpuCount": {"Min": 2},
"MemoryMiB": {"Min": 2048},
"AllowedInstanceTypes": ["inf*"],
"AcceleratorManufacturers": ["nvidia"],
}
}
],
}
],
"SpotOptions": {
"AllocationStrategy": "capacity-optimized",
"SingleInstanceType": False,
"SingleAvailabilityZone": True,
"MinTargetCapacity": 1,
},
"TargetCapacitySpecification": {"TotalTargetCapacity": 5, "DefaultTargetCapacityType": "spot"},
"Type": "instant",
}

test_on_demand_params = {
"LaunchTemplateConfigs": [
{
"LaunchTemplateSpecification": {"LaunchTemplateName": "hit-queue2-fleet-ondemand", "Version": "$Latest"},
"Overrides": [
{"InstanceType": "t2.medium", "SubnetId": "1234567"},
{"InstanceType": "t2.large", "SubnetId": "1234567"},
],
}
],
"OnDemandOptions": {
"AllocationStrategy": "lowest-price",
"SingleInstanceType": False,
"SingleAvailabilityZone": True,
"MinTargetCapacity": 1,
"CapacityReservationOptions": {"UsageStrategy": "use-capacity-reservations-first"},
},
"TargetCapacitySpecification": {"TotalTargetCapacity": 5, "DefaultTargetCapacityType": "on-demand"},
"Type": "instant",
}
test_fleet_spot_params = {
"LaunchTemplateConfigs": [
{
"LaunchTemplateSpecification": {"LaunchTemplateName": "hit-queue1-fleet-spot", "Version": "$Latest"},
"Overrides": [
{"MaxPrice": "10", "InstanceType": "t2.medium", "SubnetId": "1234567"},
{"MaxPrice": "10", "InstanceType": "t2.large", "SubnetId": "1234567"},
],
}
],
"SpotOptions": {
"AllocationStrategy": "capacity-optimized",
"SingleInstanceType": False,
"SingleAvailabilityZone": True,
"MinTargetCapacity": 1,
},
"TargetCapacitySpecification": {"TotalTargetCapacity": 5, "DefaultTargetCapacityType": "spot"},
"Type": "instant",
}

test_on_demand_params = {
"LaunchTemplateConfigs": [
{
"LaunchTemplateSpecification": {
"LaunchTemplateName": "hit-queue2-fleet-ondemand",
"Version": "$Latest",
},
"Overrides": [
{"InstanceType": "t2.medium", "SubnetId": "1234567"},
{"InstanceType": "t2.large", "SubnetId": "1234567"},
],
}
],
"OnDemandOptions": {
"AllocationStrategy": "lowest-price",
"SingleInstanceType": False,
"SingleAvailabilityZone": True,
"MinTargetCapacity": 1,
"CapacityReservationOptions": {"UsageStrategy": "use-capacity-reservations-first"},
},
"TargetCapacitySpecification": {"TotalTargetCapacity": 5, "DefaultTargetCapacityType": "on-demand"},
"Type": "instant",
}

class TestCreateFleetManager:
@pytest.mark.parametrize(
(
"batch_size",
Expand Down Expand Up @@ -594,8 +603,45 @@ def test_evaluate_launch_params(
}
],
),
# create-fleet - throttling
(
test_on_demand_params,
[
MockedBoto3Request(
method="create_fleet",
response={
"Instances": [],
"Errors": [
{"ErrorCode": "RequestLimitExceeded", "ErrorMessage": "Request limit exceeded."}
],
"ResponseMetadata": {"RequestId": "37633199-bcc6-4a88-89e3-89d859d76096"},
},
expected_params=test_on_demand_params,
),
],
[],
),
# create-fleet - multiple errors
(
test_on_demand_params,
[
MockedBoto3Request(
method="create_fleet",
response={
"Instances": [],
"Errors": [
{"ErrorCode": "RequestLimitExceeded", "ErrorMessage": "Request limit exceeded."},
{"ErrorCode": "AnotherError", "ErrorMessage": "Something went wrong"},
],
"ResponseMetadata": {"RequestId": "37633199-bcc6-4a88-89e3-89d859d76096"},
},
expected_params=test_on_demand_params,
),
],
[],
),
],
ids=["fleet_spot", "fleet_exception", "fleet_ondemand"],
ids=["fleet_spot", "fleet_exception", "fleet_ondemand", "fleet_throttling", "fleet_multiple_errors"],
)
def test_launch_instances(
self,
Expand All @@ -617,6 +663,10 @@ def test_launch_instances(
with pytest.raises(Exception) as e:
fleet_manager._launch_instances(launch_params)
assert isinstance(e, ClientError)
elif len(expected_assigned_nodes) == 0 and len(mocked_boto3_request[0].response.get("Errors")) == 1:
with pytest.raises(LaunchInstancesError) as e:
fleet_manager._launch_instances(launch_params)
assert isinstance(e, LaunchInstancesError)
else:
assigned_nodes = fleet_manager._launch_instances(launch_params)
assert_that(assigned_nodes.get("Instances", [])).is_equal_to(expected_assigned_nodes)
Expand Down
72 changes: 71 additions & 1 deletion tests/slurm_plugin/test_instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import slurm_plugin
from assertpy import assert_that
from slurm_plugin.common import ScalingStrategy
from slurm_plugin.fleet_manager import EC2Instance
from slurm_plugin.fleet_manager import EC2Instance, LaunchInstancesError
from slurm_plugin.instance_manager import (
HostnameDnsStoreError,
InstanceManager,
Expand Down Expand Up @@ -3252,6 +3252,76 @@ def test_node_assignment_by_scaling_strategy(
{},
{"InsufficientInstanceCapacity": {"queue4-st-c5xlarge-1"}},
),
(
None,
{
"queue1": {"c52xlarge": ["queue1-st-c52xlarge-1"]},
"queue2": {"c5xlarge": ["queue2-st-c5xlarge-1"]},
"queue3": {"p4d24xlarge": ["queue3-st-p4d24xlarge-1"]},
},
15,
{},
[
{
"Instances": [
{
"InstanceId": "i-12345",
"InstanceType": "c5.2xlarge",
"PrivateIpAddress": "ip.1.0.0.1",
"PrivateDnsName": "ip-1-0-0-1",
"LaunchTime": datetime(2020, 1, 1, tzinfo=timezone.utc),
"NetworkInterfaces": [
{
"Attachment": {
"DeviceIndex": 0,
"NetworkCardIndex": 0,
},
"PrivateIpAddress": "ip.1.0.0.1",
},
],
}
],
},
LaunchInstancesError("throttling", "got throttled"),
{
"Instances": [
{
"InstanceId": "i-123456",
"InstanceType": "p4d24xlarge",
"PrivateIpAddress": "ip.1.0.0.2",
"PrivateDnsName": "ip-1-0-0-2",
"LaunchTime": datetime(2020, 1, 1, tzinfo=timezone.utc),
"NetworkInterfaces": [
{
"Attachment": {
"DeviceIndex": 0,
"NetworkCardIndex": 0,
},
"PrivateIpAddress": "ip.1.0.0.2",
},
],
}
],
},
],
{
"queue1": {
"c52xlarge": [
EC2Instance(
"i-12345", "ip.1.0.0.1", "ip-1-0-0-1", datetime(2020, 1, 1, tzinfo=timezone.utc)
)
]
},
"queue3": {
"p4d24xlarge": [
EC2Instance(
"i-123456", "ip.1.0.0.2", "ip-1-0-0-2", datetime(2020, 1, 1, tzinfo=timezone.utc)
)
]
},
},
{"throttling": {"queue2-st-c5xlarge-1"}},
),
],
)
def test_launch_instances(
Expand Down
Loading