From 29106c138c4a5ed129606d7fe5d1a785d9c81ee9 Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 22 Aug 2024 10:32:57 +0200 Subject: [PATCH] separate concertns --- .../modules/auto_scaling_core.py | 67 ++++++++++++++----- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 61294c0d152..442abc06d70 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -18,6 +18,7 @@ from fastapi import FastAPI from models_library.generated_models.docker_rest_api import Node, NodeState from servicelib.logging_utils import log_catch, log_context +from servicelib.utils import limited_gather from servicelib.utils_formatting import timedelta_as_minute_second from types_aiobotocore_ec2.literals import InstanceTypeType @@ -97,21 +98,6 @@ async def _analyze_current_cluster( docker_nodes, existing_ec2_instances ) - # started buffer instance shall be asked to join the cluster once they are running - ssm_client = get_ssm_client(app) - if started_buffer_ec2s := [ - i - for i in pending_ec2s - if is_buffer_machine(i.tags) - and await ssm_client.is_instance_connected_to_ssm_server(i.id) - and await ssm_client.wait_for_has_instance_completed_cloud_init(i.id) - ]: - await ssm_client.send_command( - [i.id for i in started_buffer_ec2s], - command=await utils_docker.get_docker_swarm_join_bash_command(), - command_name="docker swarm join", - ) - # analyse pending ec2s, check if they are pending since too long now = arrow.utcnow().datetime broken_ec2s = [ @@ -210,6 +196,56 @@ async def _terminate_broken_ec2s(app: FastAPI, cluster: Cluster) -> Cluster: ) +async def _make_pending_buffer_ec2s_join_cluster( + app: FastAPI, + cluster: Cluster, +) -> Cluster: + # started buffer instance shall be asked to join the cluster once they are running + ssm_client = get_ssm_client(app) + + if buffer_ec2s_pending := [ + i.ec2_instance for i in cluster.pending_ec2s if is_buffer_machine(i.tags) + ]: + buffer_ec2_connection_state = await limited_gather( + *[ + ssm_client.is_instance_connected_to_ssm_server(i.id) + for i in buffer_ec2s_pending + ], + reraise=False, + log=_logger, + limit=20, + ) + buffer_ec2_connected_to_ssm_server = [ + i + for i, c in zip( + buffer_ec2s_pending, buffer_ec2_connection_state, strict=True + ) + if c is True + ] + buffer_ec2_initialized = await limited_gather( + *[ + ssm_client.wait_for_has_instance_completed_cloud_init(i.id) + for i in buffer_ec2_connected_to_ssm_server + ], + reraise=False, + log=_logger, + limit=20, + ) + buffer_ec2_ready_for_command = [ + i + for i, r in zip( + buffer_ec2_connected_to_ssm_server, buffer_ec2_initialized, strict=True + ) + if r is True + ] + await ssm_client.send_command( + [i.id for i in buffer_ec2_ready_for_command], + command=await utils_docker.get_docker_swarm_join_bash_command(), + command_name="docker swarm join", + ) + return cluster + + async def _try_attach_pending_ec2s( app: FastAPI, cluster: Cluster, @@ -1127,6 +1163,7 @@ async def auto_scale_cluster( ) cluster = await _cleanup_disconnected_nodes(app, cluster) cluster = await _terminate_broken_ec2s(app, cluster) + cluster = await _make_pending_buffer_ec2s_join_cluster(app, cluster) cluster = await _try_attach_pending_ec2s( app, cluster, auto_scaling_mode, allowed_instance_types )