Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Container Instance Draining in Amazon ECS #71

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
118 changes: 111 additions & 7 deletions cloudlift/deployment/cluster_template_generator.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
import json
import re
import pathlib

from cfn_flip import to_yaml
from stringcase import camelcase, pascalcase
from troposphere import (Base64, FindInMap, Output, Parameter, Ref, Sub,
cloudformation)
cloudformation, GetAtt, Join)
from troposphere.autoscaling import (AutoScalingGroup, LaunchConfiguration,
ScalingPolicy)
ScalingPolicy, LifecycleHook)
from troposphere.cloudwatch import Alarm, MetricDimension
from troposphere.ec2 import (VPC, InternetGateway, NatGateway, Route,
RouteTable, SecurityGroup, Subnet,
SubnetRouteTableAssociation, VPCGatewayAttachment)
from troposphere.ecs import Cluster
from troposphere.elasticache import SubnetGroup as ElastiCacheSubnetGroup
from troposphere.iam import InstanceProfile, Role
from troposphere.iam import InstanceProfile, Role, PolicyType, Policy
from troposphere.logs import LogGroup
from troposphere.policies import (AutoScalingRollingUpdate, CreationPolicy,
ResourceSignal)
from troposphere.rds import DBSubnetGroup

from troposphere.awslambda import Function, Code, MEMORY_VALUES, Permission
from cloudlift.config import DecimalEncoder
from cloudlift.config import get_client_for, get_region_for_environment
from cloudlift.deployment.template_generator import TemplateGenerator
from cloudlift.version import VERSION
from troposphere.sns import Subscription, Topic, SubscriptionResource
from awacs.aws import Allow, Statement, Principal, PolicyDocument
from awacs.sts import AssumeRole


class ClusterTemplateGenerator(TemplateGenerator):
Expand Down Expand Up @@ -305,6 +309,7 @@ def _add_cluster(self):
cluster = Cluster('Cluster', ClusterName=Ref('AWS::StackName'))
self.template.add_resource(cluster)
self._add_ec2_auto_scaling()
self._add_instance_draining(cluster)
self._add_cluster_alarms(cluster)
return cluster

Expand Down Expand Up @@ -541,6 +546,105 @@ def _add_ec2_auto_scaling(self):
)
self.template.add_resource(self.cluster_scaling_policy)

def _add_instance_draining(self, cluster):
self.sns_asg_role = Role(
"SNSASGRole",
AssumeRolePolicyDocument=PolicyDocument(
Statement=[
Statement(
Effect=Allow,
Action=[AssumeRole],
Principal=Principal("Service", ["autoscaling.amazonaws.com"])
)
]
),
ManagedPolicyArns=["arn:aws:iam::aws:policy/service-role/AutoScalingNotificationAccessRole"]
)
self.template.add_resource(self.sns_asg_role)
self.lambda_execution_role = Role(
"LambdaExecutionRole",
Policies=[Policy(
PolicyName="lambda-inline",
PolicyDocument={
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"autoscaling:CompleteLifecycleAction",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"ecs:ListContainerInstances",
"ecs:DescribeContainerInstances",
"ecs:UpdateContainerInstancesState",
"sns:Publish"
],
"Resource": "*"
}],
}
)],
AssumeRolePolicyDocument=PolicyDocument(
Statement=[
Statement(
Effect=Allow,
Action=[AssumeRole],
Principal=Principal("Service", ["lambda.amazonaws.com"])
)
]
),
ManagedPolicyArns=["arn:aws:iam::aws:policy/service-role/AutoScalingNotificationAccessRole"]
)
self.template.add_resource(self.lambda_execution_role)
with open (str(pathlib.Path(__file__).parent.absolute())+"/ecs_instance_draining_lambda.py", "r") as ecs_instance_draining_lambda:
lambda_code=ecs_instance_draining_lambda.readlines()
self.lambda_function_for_asg = Function(
"LambdaFunctionForASG",
Handler="index.lambda_handler",
Role=GetAtt(self.lambda_execution_role, "Arn"),
Runtime="python3.6",
MemorySize=128,
Timeout=60,
Code=Code(
ZipFile=Join("", lambda_code)
)
)
self.template.add_resource(self.lambda_function_for_asg)
self.asg_sns_topic = Topic(
"ASGSNSTopic",
TopicName=Join("-", [Ref(cluster),"Topic"]),
praveenraghav01 marked this conversation as resolved.
Show resolved Hide resolved
Subscription=[Subscription(
Protocol="lambda",
Endpoint=GetAtt(self.lambda_function_for_asg, "Arn")
)]
)
self.template.add_resource(self.asg_sns_topic)
self.lambda_invoke_permission = Permission(
"LambdaInvokePermission",
FunctionName=Ref(self.lambda_function_for_asg),
Action="lambda:InvokeFunction",
Principal="sns.amazonaws.com",
SourceArn=Ref(self.asg_sns_topic)
)
self.template.add_resource(self.lambda_invoke_permission)
self.lambda_subscription_to_sns_topic = SubscriptionResource(
"LambdaSubscriptionToSNSTopic",
Protocol="lambda",
Endpoint=GetAtt(self.lambda_function_for_asg, "Arn"),
TopicArn=Ref(self.asg_sns_topic)
)
self.template.add_resource(self.lambda_subscription_to_sns_topic)
self.asg_lifecycle_hook=LifecycleHook(
"ASGLifecycleHook",
AutoScalingGroupName=Ref(self.auto_scaling_group),
DefaultResult="ABANDON",
LifecycleHookName=Join("-", [Ref(cluster),"ASG-Hook"]),
LifecycleTransition="autoscaling:EC2_INSTANCE_TERMINATING",
NotificationMetadata=Ref(cluster),
NotificationTargetARN=Ref(self.asg_sns_topic),
RoleARN=GetAtt(self.sns_asg_role, "Arn"),
)
self.template.add_resource(self.asg_lifecycle_hook)

def _add_cluster_parameters(self):
self.template.add_parameter(Parameter(
"Environment",
Expand Down Expand Up @@ -666,7 +770,7 @@ def _add_metadata(self):
'Subnet2',
'NotificationSnsArn'
]
},
}
],
'ParameterLabels': {
'Environment': {
Expand All @@ -684,7 +788,7 @@ def _add_metadata(self):
'default': 'Min. no. of instances in cluster'
},
'NotificationSnsArn': {
'default': 'The SNS topic to which notifactions has to be triggered'
'default': 'The SNS topic to which notifications has to be triggered'
},
'Subnet1': {
'default': 'Enter the ID of the 1st subnet'
Expand All @@ -694,7 +798,7 @@ def _add_metadata(self):
},
'VPC': {
'default': 'Enter the VPC in which you want the environment to be setup'
},
}
}
}
})
61 changes: 61 additions & 0 deletions cloudlift/deployment/ecs_instance_draining_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import json
import time
import boto3
import os

ECS = boto3.client('ecs')
ASG = boto3.client('autoscaling')
SNS = boto3.client('sns')

def find_ecs_instance_info(instance_id,cluster_name):
paginator = ECS.get_paginator('list_container_instances')
for list_resp in paginator.paginate(cluster=cluster_name):
arns = list_resp['containerInstanceArns']
desc_resp = ECS.describe_container_instances(cluster=cluster_name,
containerInstances=arns)
for container_instance in desc_resp['containerInstances']:
if container_instance['ec2InstanceId'] != instance_id:
continue
print('Found instance: id=%s, arn=%s, status=%s, runningTasksCount=%s' %
(instance_id, container_instance['containerInstanceArn'],
container_instance['status'], container_instance['runningTasksCount']))
return (container_instance['containerInstanceArn'],
container_instance['status'], container_instance['runningTasksCount'])
return None, None, 0

def instance_has_running_tasks(instance_id,cluster_name):
(instance_arn, container_status, running_tasks) = find_ecs_instance_info(instance_id,cluster_name)
if instance_arn is None:
print('Could not find instance ID %s. Letting autoscaling kill the instance.' %
(instance_id))
return False
if container_status != 'DRAINING':
print('Setting container instance %s (%s) to DRAINING' %
(instance_id, instance_arn))
ECS.update_container_instances_state(cluster=cluster_name,
containerInstances=[instance_arn],
status='DRAINING')
return running_tasks > 0

def lambda_handler(event, context):
msg = json.loads(event['Records'][0]['Sns']['Message'])
print("Event: ", msg)
if 'LifecycleTransition' not in msg.keys() or \
msg['LifecycleTransition'].find('autoscaling:EC2_INSTANCE_TERMINATING') == -1:
print('Exiting since the lifecycle transition is not EC2_INSTANCE_TERMINATING.')
return
if instance_has_running_tasks(msg['EC2InstanceId'], msg['NotificationMetadata']):
print('Tasks are still running on instance %s; posting msg to SNS topic %s' %
(msg['EC2InstanceId'], event['Records'][0]['Sns']['TopicArn']))
time.sleep(5)
sns_resp = SNS.publish(TopicArn=event['Records'][0]['Sns']['TopicArn'],
Message=json.dumps(msg),
Subject='Publishing SNS msg to invoke Lambda again.')
print('Posted msg %s to SNS topic.' % (sns_resp['MessageId']))
else:
print('No tasks are running on instance %s; setting lifecycle to complete' %
(msg['EC2InstanceId']))
ASG.complete_lifecycle_action(LifecycleHookName=msg['LifecycleHookName'],
AutoScalingGroupName=msg['AutoScalingGroupName'],
LifecycleActionResult='CONTINUE',
InstanceId=msg['EC2InstanceId'])