Skip to content

Commit

Permalink
separate concertns
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed Aug 22, 2024
1 parent b868bf4 commit 29106c1
Showing 1 changed file with 52 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from fastapi import FastAPI
from models_library.generated_models.docker_rest_api import Node, NodeState
from servicelib.logging_utils import log_catch, log_context
from servicelib.utils import limited_gather
from servicelib.utils_formatting import timedelta_as_minute_second
from types_aiobotocore_ec2.literals import InstanceTypeType

Expand Down Expand Up @@ -97,21 +98,6 @@ async def _analyze_current_cluster(
docker_nodes, existing_ec2_instances
)

# started buffer instance shall be asked to join the cluster once they are running
ssm_client = get_ssm_client(app)
if started_buffer_ec2s := [
i
for i in pending_ec2s
if is_buffer_machine(i.tags)
and await ssm_client.is_instance_connected_to_ssm_server(i.id)
and await ssm_client.wait_for_has_instance_completed_cloud_init(i.id)
]:
await ssm_client.send_command(
[i.id for i in started_buffer_ec2s],
command=await utils_docker.get_docker_swarm_join_bash_command(),
command_name="docker swarm join",
)

# analyse pending ec2s, check if they are pending since too long
now = arrow.utcnow().datetime
broken_ec2s = [
Expand Down Expand Up @@ -210,6 +196,56 @@ async def _terminate_broken_ec2s(app: FastAPI, cluster: Cluster) -> Cluster:
)


async def _make_pending_buffer_ec2s_join_cluster(
app: FastAPI,
cluster: Cluster,
) -> Cluster:
# started buffer instance shall be asked to join the cluster once they are running
ssm_client = get_ssm_client(app)

if buffer_ec2s_pending := [
i.ec2_instance for i in cluster.pending_ec2s if is_buffer_machine(i.tags)
]:
buffer_ec2_connection_state = await limited_gather(
*[
ssm_client.is_instance_connected_to_ssm_server(i.id)
for i in buffer_ec2s_pending
],
reraise=False,
log=_logger,
limit=20,
)
buffer_ec2_connected_to_ssm_server = [
i
for i, c in zip(
buffer_ec2s_pending, buffer_ec2_connection_state, strict=True
)
if c is True
]
buffer_ec2_initialized = await limited_gather(
*[
ssm_client.wait_for_has_instance_completed_cloud_init(i.id)
for i in buffer_ec2_connected_to_ssm_server
],
reraise=False,
log=_logger,
limit=20,
)
buffer_ec2_ready_for_command = [
i
for i, r in zip(
buffer_ec2_connected_to_ssm_server, buffer_ec2_initialized, strict=True
)
if r is True
]
await ssm_client.send_command(
[i.id for i in buffer_ec2_ready_for_command],
command=await utils_docker.get_docker_swarm_join_bash_command(),
command_name="docker swarm join",
)
return cluster


async def _try_attach_pending_ec2s(
app: FastAPI,
cluster: Cluster,
Expand Down Expand Up @@ -1127,6 +1163,7 @@ async def auto_scale_cluster(
)
cluster = await _cleanup_disconnected_nodes(app, cluster)
cluster = await _terminate_broken_ec2s(app, cluster)
cluster = await _make_pending_buffer_ec2s_join_cluster(app, cluster)
cluster = await _try_attach_pending_ec2s(
app, cluster, auto_scaling_mode, allowed_instance_types
)
Expand Down

0 comments on commit 29106c1

Please sign in to comment.