Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[develop] Add support for Capacity Blocks for ML #591

Merged
merged 36 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
525faf9
Fix Node class to use in mocks
enrico-usai Oct 16, 2023
d88054e
Improve indentation
enrico-usai Oct 30, 2023
9eb8a1d
Extend scontrol output parser to be able to retrieve reservation info
enrico-usai Oct 13, 2023
54d2cf1
Add commands to manage slurm reservation
enrico-usai Oct 16, 2023
55be6fa
Add command to get Slurm reservations info using scontrol
enrico-usai Oct 25, 2023
5878078
Add SlurmNode methods to check reservation state
enrico-usai Oct 16, 2023
f458d34
Exclude nodes in maintenance from static nodes to be replaced
enrico-usai Oct 16, 2023
dbc7224
Add boto3 layer
enrico-usai Oct 18, 2023
b02d9cf
Introduce CapacityBlockManager to handle instances part of a Capacity…
enrico-usai Oct 18, 2023
c4ab8d1
Cleanup all the leftover slurm reservations even if not related to gi…
enrico-usai Oct 25, 2023
cb3dd14
Rename does_slurm_reservation_exist to is_slurm_reservation
enrico-usai Oct 26, 2023
9eefb80
Define _is_time_to_update_capacity_blocks_info method and fix interna…
enrico-usai Oct 26, 2023
db71c50
Add robustness when updating info from ec2
enrico-usai Oct 26, 2023
15f38f5
Improve main method of CapacityBlockManager
enrico-usai Oct 26, 2023
dd7c93d
Minor improvements to debugging messages
enrico-usai Oct 26, 2023
d1deabe
Move management of capacity blocks in a common place for static and d…
enrico-usai Oct 26, 2023
4639b1c
Rename _capacity_blocks_from_config to _retrieve_capacity_blocks_from…
enrico-usai Oct 26, 2023
7b9f34d
Avoid to raise an exception if there is an error updating a single ca…
enrico-usai Oct 26, 2023
2c373f8
Add essential logic to manage internal CapacityBlockManager exceptions
enrico-usai Oct 26, 2023
da62e0f
Manage internal errors when managing Capacity Blocks and related slur…
enrico-usai Oct 26, 2023
222f79f
Rename slurm_reservation_name_to_id to capacity_block_id_from_slurm_r…
enrico-usai Oct 30, 2023
0a7b1a2
Manage empty allocation strategy when using capacity block
enrico-usai Oct 30, 2023
1e93e63
Add unit tests for fleet manager when using capacity block
enrico-usai Oct 30, 2023
4564d1e
Rename parameters of is_healthy and is_state_healthy methods of Slurm…
enrico-usai Nov 2, 2023
54d1b5b
Improve unit tests, covering missing error cases
enrico-usai Nov 3, 2023
cb93e53
Unify CapacityBlockReservationInfo with CapacityReservationInfo and a…
enrico-usai Nov 6, 2023
7c84360
Add region to Boto3 client initialization
enrico-usai Nov 6, 2023
c10d899
Minor logging improvements
enrico-usai Nov 6, 2023
5730e14
Fix ec2_client usage
enrico-usai Nov 7, 2023
4ad7793
Fix slurm reservation creation
enrico-usai Nov 7, 2023
b40ff34
Mark nodes as reserved only if Slurm reservation is correctly created
enrico-usai Nov 7, 2023
303dbf6
Avoid to call update_slurm_reservation when it is not required
enrico-usai Nov 7, 2023
d3ca87f
Fix is_time_to_update_capacity_blocks_info method
enrico-usai Nov 7, 2023
e83b3f3
Do not print warning messages for reserved nodes
enrico-usai Nov 8, 2023
298cac3
Support same CB in multiple queues and compute resources
enrico-usai Nov 9, 2023
5508027
Merge branch 'develop' into wip/cbr
enrico-usai Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This file is used to list changes made in each version of the aws-parallelcluste
------

**ENHANCEMENTS**
- Add support for EC2 Capacity Blocks for ML.

**CHANGES**
- Perform job-level scaling by default for all jobs, using information in the `SLURM_RESUME_FILE`. Job-level scaling
Expand Down
10 changes: 10 additions & 0 deletions src/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.
157 changes: 157 additions & 0 deletions src/aws/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.

import functools
import logging
import time
from enum import Enum

import boto3
from botocore.config import Config
from botocore.exceptions import BotoCoreError, ClientError, ParamValidationError

LOGGER = logging.getLogger(__name__)


class AWSClientError(Exception):
"""Error during execution of some AWS calls."""

class ErrorCode(Enum):
"""Error codes for AWS ClientError."""

VALIDATION_ERROR = "ValidationError"
REQUEST_LIMIT_EXCEEDED = "RequestLimitExceeded"
THROTTLING_EXCEPTION = "ThrottlingException"
CONDITIONAL_CHECK_FAILED_EXCEPTION = "ConditionalCheckFailedException"

@classmethod
def throttling_error_codes(cls):
"""Return a set of error codes returned when service rate limits are exceeded."""
return {cls.REQUEST_LIMIT_EXCEEDED.value, cls.THROTTLING_EXCEPTION.value}

def __init__(self, function_name: str, message: str, error_code: str = None):
super().__init__(message)
self.message = message
self.error_code = error_code
self.function_name = function_name


class LimitExceededError(AWSClientError):
"""Error caused by exceeding the limits of a downstream AWS service."""

def __init__(self, function_name: str, message: str, error_code: str = None):
super().__init__(function_name=function_name, message=message, error_code=error_code)

Check warning on line 51 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L51

Added line #L51 was not covered by tests


class BadRequestError(AWSClientError):
"""Error caused by a problem in the request."""

def __init__(self, function_name: str, message: str, error_code: str = None):
super().__init__(function_name=function_name, message=message, error_code=error_code)

Check warning on line 58 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L58

Added line #L58 was not covered by tests


class AWSExceptionHandler:
"""AWS Exception handler."""

@staticmethod
def handle_client_exception(func):
"""Handle Boto3 errors, can be used as a decorator."""

@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except ParamValidationError as validation_error:
error = BadRequestError(

Check warning on line 73 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L73

Added line #L73 was not covered by tests
func.__name__,
"Error validating parameter. Failed with exception: {0}".format(str(validation_error)),
)
except BotoCoreError as e:
error = AWSClientError(func.__name__, str(e))

Check warning on line 78 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L78

Added line #L78 was not covered by tests
except ClientError as e:
# add request id
message = e.response["Error"]["Message"]
error_code = e.response["Error"]["Code"]

if error_code in AWSClientError.ErrorCode.throttling_error_codes():
error = LimitExceededError(func.__name__, message, error_code)

Check warning on line 85 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L85

Added line #L85 was not covered by tests
elif error_code == AWSClientError.ErrorCode.VALIDATION_ERROR:
error = BadRequestError(func.__name__, message, error_code)

Check warning on line 87 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L87

Added line #L87 was not covered by tests
else:
error = AWSClientError(func.__name__, message, error_code)
LOGGER.error("Encountered error when performing boto3 call in %s: %s", error.function_name, error.message)
raise error

return wrapper

@staticmethod
def retry_on_boto3_throttling(func):
"""Retry boto3 calls on throttling, can be used as a decorator."""

@functools.wraps(func)
def wrapper(*args, **kwargs):

Check warning on line 100 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L99-L100

Added lines #L99 - L100 were not covered by tests
while True:
try:
return func(*args, **kwargs)
except ClientError as e:
if e.response["Error"]["Code"] != "Throttling":
raise
LOGGER.debug("Throttling when calling %s function. Will retry in %d seconds.", func.__name__, 5)
time.sleep(5)

Check warning on line 108 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L102-L108

Added lines #L102 - L108 were not covered by tests

return wrapper

Check warning on line 110 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L110

Added line #L110 was not covered by tests


def _log_boto3_calls(params, **kwargs):
service = kwargs["event_name"].split(".")[-2]
operation = kwargs["event_name"].split(".")[-1]
region = kwargs["context"].get("client_region", boto3.session.Session().region_name)
LOGGER.info(
"Executing boto3 call: region=%s, service=%s, operation=%s, params=%s", region, service, operation, params
)


class Boto3Client:
"""Boto3 client Class."""

def __init__(self, client_name: str, config: Config, region: str = None):
region = region if region else get_region()
self._client = boto3.client(client_name, region_name=region, config=config if config else None)
self._client.meta.events.register("provide-client-params.*.*", _log_boto3_calls)

def _paginate_results(self, method, **kwargs):
"""
Return a generator for a boto3 call, this allows pagination over an arbitrary number of responses.

:param method: boto3 method
:param kwargs: arguments to method
:return: generator with boto3 results
"""
paginator = self._client.get_paginator(method.__name__)
for page in paginator.paginate(**kwargs).result_key_iters():
for result in page:
yield result


class Boto3Resource:
"""Boto3 resource Class."""

def __init__(self, resource_name: str):
self._resource = boto3.resource(resource_name)
self._resource.meta.client.meta.events.register("provide-client-params.*.*", _log_boto3_calls)

Check warning on line 149 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L148-L149

Added lines #L148 - L149 were not covered by tests


def get_region():
"""Get region used internally for all the AWS calls."""
region = boto3.session.Session().region_name
if region is None:
raise AWSClientError("get_region", "AWS region not configured")

Check warning on line 156 in src/aws/common.py

View check run for this annotation

Codecov / codecov/patch

src/aws/common.py#L156

Added line #L156 was not covered by tests
return region
89 changes: 89 additions & 0 deletions src/aws/ec2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from aws.common import AWSExceptionHandler, Boto3Client


class CapacityReservationInfo:
"""
Data object wrapping the result of a describe-capacity-reservations call.

{
"CapacityReservationId": "cr-123456",
"OwnerId": "123",
"CapacityReservationArn": "arn:aws:ec2:us-east-2:123:capacity-reservation/cr-123456",
"AvailabilityZoneId": "use2-az1",
"InstanceType": "t3.large",
"InstancePlatform": "Linux/UNIX",
"AvailabilityZone": "eu-west-1a",
"Tenancy": "default",
"TotalInstanceCount": 1,
"AvailableInstanceCount": 1,
"EbsOptimized": false,
"EphemeralStorage": false,
"State": "active",
"StartDate": "2023-11-15T11:30:00+00:00",
"EndDate": "2023-11-16T11:30:00+00:00", # capacity-block only
"EndDateType": "limited",
"InstanceMatchCriteria": "targeted",
"CreateDate": "2023-10-25T20:40:13+00:00",
"Tags": [
{
"Key": "aws:ec2capacityreservation:incrementalRequestedQuantity",
"Value": "1"
},
{
"Key": "aws:ec2capacityreservation:capacityReservationType",
"Value": "capacity-block"
}
],
"CapacityAllocations": [],
"ReservationType": "capacity-block" # capacity-block only
}
"""

def __init__(self, capacity_reservation_data):
self.capacity_reservation_data = capacity_reservation_data

def capacity_reservation_id(self):
"""Return the id of the Capacity Reservation."""
return self.capacity_reservation_data.get("CapacityReservationId")

def state(self):
"""Return the state of the Capacity Reservation."""
return self.capacity_reservation_data.get("State")

def __eq__(self, other):
return self.__dict__ == other.__dict__


class Ec2Client(Boto3Client):
"""Implement EC2 Boto3 client."""

def __init__(self, config=None, region=None):
super().__init__("ec2", region=region, config=config)

@AWSExceptionHandler.handle_client_exception
def describe_capacity_reservations(self, capacity_reservation_ids: List[str]) -> List[CapacityReservationInfo]:
"""Accept a space separated list of reservation ids. Return a list of CapacityReservationInfo."""
result = []
response = list(
self._paginate_results(
self._client.describe_capacity_reservations,
CapacityReservationIds=capacity_reservation_ids,
# ReservationType=reservation_type, # not yet available
)
)
for capacity_reservation in response:
result.append(CapacityReservationInfo(capacity_reservation))

return result
9 changes: 6 additions & 3 deletions src/common/schedulers/slurm_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@
SCONTROL_OUTPUT_AWK_PARSER = (
'awk \'BEGIN{{RS="\\n\\n" ; ORS="######\\n";}} {{print}}\' | '
+ "grep -oP '^(NodeName=\\S+)|(NodeAddr=\\S+)|(NodeHostName=\\S+)|(?<!Next)(State=\\S+)|"
+ "(Partitions=\\S+)|(SlurmdStartTime=\\S+)|(LastBusyTime=\\S+)|(Reason=.*)|(######)'"
+ "(Partitions=\\S+)|(SlurmdStartTime=\\S+)|(LastBusyTime=\\S+)|(ReservationName=\\S+)|(Reason=.*)|(######)'"
)

# Set default timeouts for running different slurm commands.
# These timeouts might be needed when running on large scale
DEFAULT_SCONTROL_COMMAND_TIMEOUT = 30
DEFAULT_GET_INFO_COMMAND_TIMEOUT = 30
DEFAULT_UPDATE_COMMAND_TIMEOUT = 60

Expand Down Expand Up @@ -389,15 +390,16 @@ def _parse_nodes_info(slurm_node_info: str) -> List[SlurmNode]:
"""Parse slurm node info into SlurmNode objects."""
# [ec2-user@ip-10-0-0-58 ~]$ /opt/slurm/bin/scontrol show nodes compute-dy-c5xlarge-[1-3],compute-dy-c5xlarge-50001\
# | awk 'BEGIN{{RS="\n\n" ; ORS="######\n";}} {{print}}' | grep -oP "^(NodeName=\S+)|(NodeAddr=\S+)
# |(NodeHostName=\S+)|(?<!Next)(State=\S+)|(Partitions=\S+)|(SlurmdStartTime=\S+)|(LastBusyTime=\\S+)|(Reason=.*)\
# |(######)"
# |(NodeHostName=\S+)|(?<!Next)(State=\S+)|(Partitions=\S+)|(SlurmdStartTime=\S+)|(LastBusyTime=\\S+)
# |(ReservationName=\S+)|(Reason=.*)|(######)"
# NodeName=compute-dy-c5xlarge-1
# NodeAddr=1.2.3.4
# NodeHostName=compute-dy-c5xlarge-1
# State=IDLE+CLOUD+POWER
# Partitions=compute,compute2
# SlurmdStartTime=2023-01-26T09:57:15
# Reason=some reason
# ReservationName=root_1
# ######
# NodeName=compute-dy-c5xlarge-2
# NodeAddr=1.2.3.4
Expand Down Expand Up @@ -430,6 +432,7 @@ def _parse_nodes_info(slurm_node_info: str) -> List[SlurmNode]:
"Reason": "reason",
"SlurmdStartTime": "slurmdstarttime",
"LastBusyTime": "lastbusytime",
"ReservationName": "reservation_name",
}

date_fields = ["SlurmdStartTime", "LastBusyTime"]
Expand Down
Loading
Loading