diff --git a/src/aibs_informatics_cdk_lib/stacks/external_sns_trigger.py b/src/aibs_informatics_cdk_lib/stacks/external_sns_trigger.py new file mode 100644 index 0000000..064982b --- /dev/null +++ b/src/aibs_informatics_cdk_lib/stacks/external_sns_trigger.py @@ -0,0 +1,124 @@ +from typing import Mapping, Optional +from constructs import Construct + +import aws_cdk as cdk +from aws_cdk import aws_cloudwatch as cloudwatch +from aws_cdk import aws_sqs as sqs +from aws_cdk import aws_sns as sns +from aws_cdk import aws_lambda as lambda_ +from aws_cdk.aws_sns_subscriptions import SqsSubscription + +from aibs_informatics_core.env import EnvBase, ResourceNameBaseEnum +from aibs_informatics_cdk_lib.stacks.base import EnvBaseStack +from aibs_informatics_cdk_lib.constructs_.assets.code_asset import CodeAsset + + +class ExternalSNSTriggerStack(EnvBaseStack): + def __init__( + self, + scope: Construct, + id: Optional[str], + env_base: EnvBase, + triggered_lambda_fn_name: str | ResourceNameBaseEnum, + triggered_lambda_code_asset: CodeAsset, + triggered_lambda_handler: str, + external_sns_event_name: str | ResourceNameBaseEnum, + external_sns_topic_arn: str, + external_sns_filter_policy: Optional[Mapping[str, sns.SubscriptionFilter]] = None, + external_sns_filter_policy_with_message_body: Optional[Mapping[str, sns.FilterOrPolicy]] = None, + **kwargs, + ) -> None: + + """This intended to be a generic CDK stack that defines resources necessary to + implement a trigger system that listens for events from an external + (some other AWS account) SNS Topic and fires off a lambda in the + account where this stack is deployed. + + + ┌------------- ExternalSNSTriggerStack ---------------┐ + | | + | | + (other AWS account) | (SQS) (Lambda) | + external_sns_topic -> | external_sns_events_queue -> triggered_lambda_fn | + | | | + | v | + | (Cloudwatch) (SQS DLQ) | + | triggered_lambda_dlq_alarm <- triggered_lambda_dlq | + | | + └-----------------------------------------------------┘ + + The main intended use case is to provide a simple template for setting up automation + based on PTS SNS Topic notifications in a separate AWS account (e.g. in an informatics processing pipeline) + """ + super().__init__(scope=scope, id=id, env_base=env_base, **kwargs) + + self.external_sns_events_queue = sqs.Queue( + scope=self, + id=self.env_base.get_construct_id(external_sns_event_name, "event-queue"), + queue_name=self.env_base.prefixed(external_sns_event_name, "event-queue"), + retention_period=cdk.Duration.days(7), + ) + + # The *owning* AWS account of the SNS topic needs to give Subscribe permissions to the + # AWS account where this stack will be deployed + # See: https://docs.aws.amazon.com/sns/latest/dg/sns-send-message-to-sqs-cross-account.html + self.external_sns_topic = sns.Topic.from_topic_arn( + scope=self, + id=self.env_base.get_construct_id(external_sns_event_name, "external-sns-topic"), + topic_arn=external_sns_topic_arn, + ) + + # Useful references: + # https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_sns/Subscription.html#aws_cdk.aws_sns.Subscription + # https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_sns/SubscriptionFilter.html#aws_cdk.aws_sns.SubscriptionFilter + # https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_sns/FilterOrPolicy.html#aws_cdk.aws_sns.FilterOrPolicy + self.external_sns_topic.add_subscription( + subscription=SqsSubscription( + queue=self.external_sns_events_queue, + raw_message_delivery=True, + filter_policy=external_sns_filter_policy, + filter_policy_with_message_body=external_sns_filter_policy_with_message_body, + ) + ) + + triggered_lambda_dlq_name = self.env_base.prefixed(triggered_lambda_fn_name, "dlq") + self.triggered_lambda_dlq = sqs.Queue( + scope=self, + id=self.env_base.get_construct_id(triggered_lambda_fn_name, "dlq"), + queue_name=triggered_lambda_dlq_name, + retention_period=cdk.Duration.days(14), + ) + + self.triggered_lambda_fn = lambda_.Function( + scope=self, + id=self.env_base.get_construct_id(triggered_lambda_fn_name, "triggered-lambda"), + function_name=self.get_resource_name(triggered_lambda_fn_name), + handler=triggered_lambda_handler, + runtime=triggered_lambda_code_asset.default_runtime, + code=triggered_lambda_code_asset.as_code, + memory_size=128, + timeout=cdk.Duration.minutes(5), + environment=triggered_lambda_code_asset.environment, + dead_letter_queue=self.triggered_lambda_dlq, + dead_letter_queue_enabled=self.is_prod, + ) + + self.external_sns_events_queue.grant_consume_messages(self.triggered_lambda_fn) + self.triggered_lambda_dlq.grant_send_messages(self.triggered_lambda_fn) + + # Alarm that fires if triggered lambda fails to parse external SNS Topic events + self.triggered_lambda_dlq_alarm = cloudwatch.Alarm( + scope=self, + id=self.get_construct_id(triggered_lambda_fn_name, "dlq", "alarm"), + alarm_description=( + f"Alarm if more than 1 message in {triggered_lambda_dlq_name} in 10 minute period" + ), + metric=self.triggered_lambda_dlq.metric_approximate_number_of_messages_visible( + statistic=cloudwatch.Stats.MAXIMUM, + period=cdk.Duration.minutes(10), + ), + evaluation_periods=1, + threshold=1, + datapoints_to_alarm=1, + comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD, + )