Skip to content

Commit

Permalink
Add external SNS topic trigger stack
Browse files Browse the repository at this point in the history
  • Loading branch information
njmei committed May 4, 2024
1 parent 392da29 commit 880dbd0
Showing 1 changed file with 124 additions and 0 deletions.
124 changes: 124 additions & 0 deletions src/aibs_informatics_cdk_lib/stacks/external_sns_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from typing import Mapping, Optional
from constructs import Construct

import aws_cdk as cdk
from aws_cdk import aws_cloudwatch as cloudwatch
from aws_cdk import aws_sqs as sqs
from aws_cdk import aws_sns as sns
from aws_cdk import aws_lambda as lambda_
from aws_cdk.aws_sns_subscriptions import SqsSubscription

from aibs_informatics_core.env import EnvBase, ResourceNameBaseEnum
from aibs_informatics_cdk_lib.stacks.base import EnvBaseStack
from aibs_informatics_cdk_lib.constructs_.assets.code_asset import CodeAsset


class ExternalSNSTriggerStack(EnvBaseStack):
def __init__(
self,
scope: Construct,
id: Optional[str],
env_base: EnvBase,
triggered_lambda_fn_name: str | ResourceNameBaseEnum,
triggered_lambda_code_asset: CodeAsset,
triggered_lambda_handler: str,
external_sns_event_name: str | ResourceNameBaseEnum,
external_sns_topic_arn: str,
external_sns_filter_policy: Optional[Mapping[str, sns.SubscriptionFilter]] = None,
external_sns_filter_policy_with_message_body: Optional[Mapping[str, sns.FilterOrPolicy]] = None,
**kwargs,
) -> None:

"""This intended to be a generic CDK stack that defines resources necessary to
implement a trigger system that listens for events from an external
(some other AWS account) SNS Topic and fires off a lambda in the
account where this stack is deployed.
┌------------- ExternalSNSTriggerStack ---------------┐
| |
| |
(other AWS account) | (SQS) (Lambda) |
external_sns_topic -> | external_sns_events_queue -> triggered_lambda_fn |
| | |
| v |
| (Cloudwatch) (SQS DLQ) |
| triggered_lambda_dlq_alarm <- triggered_lambda_dlq |
| |
└-----------------------------------------------------┘
The main intended use case is to provide a simple template for setting up automation
based on PTS SNS Topic notifications in a separate AWS account (e.g. in an informatics processing pipeline)
"""
super().__init__(scope=scope, id=id, env_base=env_base, **kwargs)

self.external_sns_events_queue = sqs.Queue(
scope=self,
id=self.env_base.get_construct_id(external_sns_event_name, "event-queue"),
queue_name=self.env_base.prefixed(external_sns_event_name, "event-queue"),
retention_period=cdk.Duration.days(7),
)

# The *owning* AWS account of the SNS topic needs to give Subscribe permissions to the
# AWS account where this stack will be deployed
# See: https://docs.aws.amazon.com/sns/latest/dg/sns-send-message-to-sqs-cross-account.html
self.external_sns_topic = sns.Topic.from_topic_arn(
scope=self,
id=self.env_base.get_construct_id(external_sns_event_name, "external-sns-topic"),
topic_arn=external_sns_topic_arn,
)

# Useful references:
# https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_sns/Subscription.html#aws_cdk.aws_sns.Subscription
# https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_sns/SubscriptionFilter.html#aws_cdk.aws_sns.SubscriptionFilter
# https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_sns/FilterOrPolicy.html#aws_cdk.aws_sns.FilterOrPolicy
self.external_sns_topic.add_subscription(
subscription=SqsSubscription(
queue=self.external_sns_events_queue,
raw_message_delivery=True,
filter_policy=external_sns_filter_policy,
filter_policy_with_message_body=external_sns_filter_policy_with_message_body,
)
)

triggered_lambda_dlq_name = self.env_base.prefixed(triggered_lambda_fn_name, "dlq")
self.triggered_lambda_dlq = sqs.Queue(
scope=self,
id=self.env_base.get_construct_id(triggered_lambda_fn_name, "dlq"),
queue_name=triggered_lambda_dlq_name,
retention_period=cdk.Duration.days(14),
)

self.triggered_lambda_fn = lambda_.Function(
scope=self,
id=self.env_base.get_construct_id(triggered_lambda_fn_name, "triggered-lambda"),
function_name=self.get_resource_name(triggered_lambda_fn_name),
handler=triggered_lambda_handler,
runtime=triggered_lambda_code_asset.default_runtime,
code=triggered_lambda_code_asset.as_code,
memory_size=128,
timeout=cdk.Duration.minutes(5),
environment=triggered_lambda_code_asset.environment,
dead_letter_queue=self.triggered_lambda_dlq,
dead_letter_queue_enabled=self.is_prod,
)

self.external_sns_events_queue.grant_consume_messages(self.triggered_lambda_fn)
self.triggered_lambda_dlq.grant_send_messages(self.triggered_lambda_fn)

# Alarm that fires if triggered lambda fails to parse external SNS Topic events
self.triggered_lambda_dlq_alarm = cloudwatch.Alarm(
scope=self,
id=self.get_construct_id(triggered_lambda_fn_name, "dlq", "alarm"),
alarm_description=(
f"Alarm if more than 1 message in {triggered_lambda_dlq_name} in 10 minute period"
),
metric=self.triggered_lambda_dlq.metric_approximate_number_of_messages_visible(
statistic=cloudwatch.Stats.MAXIMUM,
period=cdk.Duration.minutes(10),
),
evaluation_periods=1,
threshold=1,
datapoints_to_alarm=1,
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
)

0 comments on commit 880dbd0

Please sign in to comment.