Skip to content

Commit

Permalink
Merge branch 'master' into fix-sharee-permisions-products
Browse files Browse the repository at this point in the history
  • Loading branch information
matusdrobuliak66 authored Aug 19, 2024
2 parents c6f788d + 21c16c9 commit ef86d1a
Show file tree
Hide file tree
Showing 55 changed files with 462 additions and 272 deletions.
29 changes: 29 additions & 0 deletions packages/aws-library/src/aws_library/ec2/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from ._client import SimcoreEC2API
from ._errors import EC2AccessError, EC2NotConnectedError, EC2RuntimeError
from ._models import (
AWSTagKey,
AWSTagValue,
EC2InstanceBootSpecific,
EC2InstanceConfig,
EC2InstanceData,
EC2InstanceType,
EC2Tags,
Resources,
)

__all__: tuple[str, ...] = (
"AWSTagKey",
"AWSTagValue",
"EC2AccessError",
"EC2InstanceBootSpecific",
"EC2InstanceConfig",
"EC2InstanceData",
"EC2InstanceType",
"EC2NotConnectedError",
"EC2RuntimeError",
"EC2Tags",
"Resources",
"SimcoreEC2API",
)

# nopycln: file
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,24 @@
import botocore.exceptions
from aiobotocore.session import ClientCreatorContext
from aiocache import cached # type: ignore[import-untyped]
from pydantic import ByteSize, PositiveInt, parse_obj_as
from pydantic import ByteSize, PositiveInt
from servicelib.logging_utils import log_context
from settings_library.ec2 import EC2Settings
from types_aiobotocore_ec2 import EC2Client
from types_aiobotocore_ec2.literals import InstanceStateNameType, InstanceTypeType
from types_aiobotocore_ec2.type_defs import FilterTypeDef, TagTypeDef

from .errors import (
EC2InstanceNotFoundError,
EC2InstanceTypeInvalidError,
EC2RuntimeError,
EC2TooManyInstancesError,
)
from .models import (
from ._error_handler import ec2_exception_handler
from ._errors import EC2InstanceNotFoundError, EC2TooManyInstancesError
from ._models import (
AWSTagKey,
EC2InstanceConfig,
EC2InstanceData,
EC2InstanceType,
EC2Tags,
Resources,
)
from .utils import compose_user_data
from ._utils import compose_user_data, ec2_instance_data_from_aws_instance

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,13 +57,13 @@ async def close(self) -> None:
await self.exit_stack.aclose()

async def ping(self) -> bool:
try:
with contextlib.suppress(Exception):
await self.client.describe_account_attributes()
return True
except Exception: # pylint: disable=broad-except
return False
return False

@cached(noself=True)
@ec2_exception_handler(_logger)
async def get_ec2_instance_capabilities(
self,
instance_type_names: set[InstanceTypeType],
Expand All @@ -82,32 +78,26 @@ async def get_ec2_instance_capabilities(
ClustersKeeperRuntimeError: unexpected error communicating with EC2
"""
try:
instance_types = await self.client.describe_instance_types(
InstanceTypes=list(instance_type_names)
)
list_instances: list[EC2InstanceType] = []
for instance in instance_types.get("InstanceTypes", []):
with contextlib.suppress(KeyError):
list_instances.append(
EC2InstanceType(
name=instance["InstanceType"],
resources=Resources(
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=ByteSize(
int(instance["MemoryInfo"]["SizeInMiB"])
* 1024
* 1024
),
instance_types = await self.client.describe_instance_types(
InstanceTypes=list(instance_type_names)
)
list_instances: list[EC2InstanceType] = []
for instance in instance_types.get("InstanceTypes", []):
with contextlib.suppress(KeyError):
list_instances.append(
EC2InstanceType(
name=instance["InstanceType"],
resources=Resources(
cpus=instance["VCpuInfo"]["DefaultVCpus"],
ram=ByteSize(
int(instance["MemoryInfo"]["SizeInMiB"]) * 1024 * 1024
),
)
),
)
return list_instances
except botocore.exceptions.ClientError as exc:
if exc.response.get("Error", {}).get("Code", "") == "InvalidInstanceType":
raise EC2InstanceTypeInvalidError from exc
raise EC2RuntimeError from exc # pragma: no cover
)
return list_instances

@ec2_exception_handler(_logger)
async def launch_instances(
self,
instance_config: EC2InstanceConfig,
Expand Down Expand Up @@ -193,38 +183,41 @@ async def launch_instances(
await waiter.wait(InstanceIds=instance_ids)
_logger.info("instances %s exists now.", instance_ids)

# get the private IPs
# NOTE: waiting for pending ensure we get all the IPs back
described_instances = await self.client.describe_instances(
InstanceIds=instance_ids
)
assert "Instances" in described_instances["Reservations"][0] # nosec
instance_datas = [
EC2InstanceData(
launch_time=instance["LaunchTime"],
id=instance["InstanceId"],
aws_private_dns=instance["PrivateDnsName"],
aws_public_ip=instance.get("PublicIpAddress", None),
type=instance["InstanceType"],
state=instance["State"]["Name"],
tags=parse_obj_as(
EC2Tags, {tag["Key"]: tag["Value"] for tag in instance["Tags"]}
),
resources=instance_config.type.resources,
)
for instance in described_instances["Reservations"][0]["Instances"]
await ec2_instance_data_from_aws_instance(self, i)
for i in described_instances["Reservations"][0]["Instances"]
]
_logger.info(
"%s is available, happy computing!!",
"%s is pending now, happy computing!!",
f"{instance_datas=}",
)
return instance_datas

@ec2_exception_handler(_logger)
async def get_instances(
self,
*,
key_names: list[str],
tags: EC2Tags,
state_names: list[InstanceStateNameType] | None = None,
) -> list[EC2InstanceData]:
"""returns the instances matching the given criteria
Arguments:
key_names -- filter the instances by key names
tags -- filter instances by key and their values
Keyword Arguments:
state_names -- filters the instances by state (pending, running, etc...) (default: {None})
Returns:
the instances found
"""
# NOTE: be careful: Name=instance-state-name,Values=["pending", "running"] means pending OR running
# NOTE2: AND is done by repeating Name=instance-state-name,Values=pending Name=instance-state-name,Values=running
if state_names is None:
Expand All @@ -245,76 +238,87 @@ async def get_instances(
all_instances = []
for reservation in instances["Reservations"]:
assert "Instances" in reservation # nosec
for instance in reservation["Instances"]:
assert "LaunchTime" in instance # nosec
assert "InstanceId" in instance # nosec
assert "PrivateDnsName" in instance # nosec
assert "InstanceType" in instance # nosec
assert "State" in instance # nosec
assert "Name" in instance["State"] # nosec
ec2_instance_types = await self.get_ec2_instance_capabilities(
{instance["InstanceType"]}
)
assert len(ec2_instance_types) == 1 # nosec
assert "Tags" in instance # nosec
all_instances.append(
EC2InstanceData(
launch_time=instance["LaunchTime"],
id=instance["InstanceId"],
aws_private_dns=instance["PrivateDnsName"],
aws_public_ip=instance.get("PublicIpAddress", None),
type=instance["InstanceType"],
state=instance["State"]["Name"],
resources=ec2_instance_types[0].resources,
tags=parse_obj_as(
EC2Tags,
{tag["Key"]: tag["Value"] for tag in instance["Tags"]},
),
)
)
all_instances.extend(
[
await ec2_instance_data_from_aws_instance(self, i)
for i in reservation["Instances"]
]
)
_logger.debug(
"received: %s instances with %s", f"{len(all_instances)}", f"{state_names=}"
)
return all_instances

@ec2_exception_handler(_logger)
async def start_instances(
self, instance_datas: Iterable[EC2InstanceData]
) -> list[EC2InstanceData]:
"""starts stopped instances. Will return once the started instances are pending so that their IPs are available.
Arguments:
instance_datas -- the instances to start
Raises:
EC2InstanceNotFoundError: if some of the instance_datas are not found
Returns:
the started instance datas with their respective IPs
"""
instance_ids = [i.id for i in instance_datas]
with log_context(
_logger,
logging.INFO,
msg=f"starting instances {instance_ids}",
):
await self.client.start_instances(InstanceIds=instance_ids)
# wait for the instance to be in a pending state
# NOTE: reference to EC2 states https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-lifecycle.html
waiter = self.client.get_waiter("instance_exists")
await waiter.wait(InstanceIds=instance_ids)
_logger.info("instances %s exists now.", instance_ids)
# NOTE: waiting for pending ensure we get all the IPs back
aws_instances = await self.client.describe_instances(
InstanceIds=instance_ids
)
assert len(aws_instances["Reservations"]) == 1 # nosec
assert "Instances" in aws_instances["Reservations"][0] # nosec
return [
await ec2_instance_data_from_aws_instance(self, i)
for i in aws_instances["Reservations"][0]["Instances"]
]

@ec2_exception_handler(_logger)
async def stop_instances(self, instance_datas: Iterable[EC2InstanceData]) -> None:
try:
with log_context(
_logger,
logging.INFO,
msg=f"stopping instances {[i.id for i in instance_datas]}",
):
await self.client.stop_instances(
InstanceIds=[i.id for i in instance_datas]
)
except botocore.exceptions.ClientError as exc:
if (
exc.response.get("Error", {}).get("Code", "")
== "InvalidInstanceID.NotFound"
):
raise EC2InstanceNotFoundError from exc
raise # pragma: no cover
"""Stops running instances.
Stopping an already stopped instance will do nothing.
Arguments:
instance_datas -- the instances to stop
Raises:
EC2InstanceNotFoundError: any of the instance_datas are not found
"""
with log_context(
_logger,
logging.INFO,
msg=f"stopping instances {[i.id for i in instance_datas]}",
):
await self.client.stop_instances(InstanceIds=[i.id for i in instance_datas])

@ec2_exception_handler(_logger)
async def terminate_instances(
self, instance_datas: Iterable[EC2InstanceData]
) -> None:
try:
with log_context(
_logger,
logging.INFO,
msg=f"terminating instances {[i.id for i in instance_datas]}",
):
await self.client.terminate_instances(
InstanceIds=[i.id for i in instance_datas]
)
except botocore.exceptions.ClientError as exc:
if (
exc.response.get("Error", {}).get("Code", "")
== "InvalidInstanceID.NotFound"
):
raise EC2InstanceNotFoundError from exc
raise # pragma: no cover
with log_context(
_logger,
logging.INFO,
msg=f"terminating instances {[i.id for i in instance_datas]}",
):
await self.client.terminate_instances(
InstanceIds=[i.id for i in instance_datas]
)

@ec2_exception_handler(_logger)
async def set_instances_tags(
self, instances: Sequence[EC2InstanceData], *, tags: EC2Tags
) -> None:
Expand All @@ -336,6 +340,7 @@ async def set_instances_tags(
raise EC2InstanceNotFoundError from exc
raise # pragma: no cover

@ec2_exception_handler(_logger)
async def remove_instances_tags(
self, instances: Sequence[EC2InstanceData], *, tag_keys: Iterable[AWSTagKey]
) -> None:
Expand Down
Loading

0 comments on commit ef86d1a

Please sign in to comment.