Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add external SNS topic trigger stack #1

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions src/aibs_informatics_cdk_lib/constructs_/external_sns_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from typing import Any, Mapping, Optional, Sequence, Union

import aws_cdk as cdk
import constructs
from aibs_informatics_core.env import EnvBase, EnvType, ResourceNameBaseEnum
from aws_cdk import aws_cloudwatch as cloudwatch
from aws_cdk import aws_lambda as lambda_
from aws_cdk import aws_sns as sns
from aws_cdk import aws_sqs as sqs
from aws_cdk.aws_lambda_event_sources import SqsEventSource
from aws_cdk.aws_sns_subscriptions import SqsSubscription


class ExternalSnsTrigger(constructs.Construct):
def __init__(
self,
scope: constructs.Construct,
id: str,
env_base: EnvBase,
triggered_lambda_fn: lambda_.Function,
external_sns_event_name: Union[str, ResourceNameBaseEnum],
external_sns_topic_arn: str,
external_sns_event_queue_filters: Optional[Sequence[Mapping[str, Any]]] = None,
**kwargs,
) -> None:
"""This intended to be a generic CDK construct that defines resources necessary to
implement a trigger system that listens for events from an external
(some other AWS account) SNS Topic and fires off a lambda in the
account where this stack is deployed.


┌---- ExternalSNSTrigger ----┐
| |
(other AWS account) | (SQS) | (provided Lambda)
external_sns_topic -----> external_sns_event_queue -----> triggered_lambda_fn
| | |
| v |
| (SQS DLQ) |
| external_sns_event_dlq |
| | |
| v |
| (Cloudwatch) |
| triggered_lambda_dlq_alarm |
| |
└----------------------------┘

The main intended use case is to provide a simple template for setting up automation
based on PTS SNS Topic notifications in a separate AWS account (e.g. in an informatics processing pipeline)

Example usage:
self.external_sns_trigger_construct = ExternalSnsTrigger(
scope=self,
id=self.env_base.get_construct_id("merscope-imaging-process-sns-trigger"),
env_base=self.env_base,
triggered_lambda_fn=self.pts_listener_fn,
external_sns_event_name="merscope-imaging-process",
external_sns_topic_arn=f"arn:aws:sns:us-west-2:{account_id}:{sns_topic_name}",
)
"""
super().__init__(scope=scope, id=id)

external_sns_event_dlq_name = env_base.prefixed(external_sns_event_name, "sns-event-dlq")
self.external_sns_event_dlq = sqs.Queue(
scope=self,
id=env_base.get_construct_id(external_sns_event_name, "sns-event-dlq"),
queue_name=external_sns_event_dlq_name,
retention_period=cdk.Duration.days(14),
)

self.external_sns_event_queue = sqs.Queue(
scope=self,
id=env_base.get_construct_id(external_sns_event_name, "sns-event-queue"),
queue_name=env_base.prefixed(external_sns_event_name, "sns-event-queue"),
retention_period=cdk.Duration.days(7),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=2,
queue=self.external_sns_event_dlq,
),
# visibility_timeout must be longer than the `timeout` of the `triggered_lambda_fn` or deploy will fail
visibility_timeout=cdk.Duration.seconds(330),
)

# The *owning* AWS account of the SNS topic needs to give Subscribe permissions to the
# AWS account where this stack will be deployed
# See: https://docs.aws.amazon.com/sns/latest/dg/sns-send-message-to-sqs-cross-account.html
self.external_sns_topic = sns.Topic.from_topic_arn(
scope=self,
id=env_base.get_construct_id(external_sns_event_name, "external-sns-topic"),
topic_arn=external_sns_topic_arn,
)

# Useful reference:
# https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_sns/Subscription.html#aws_cdk.aws_sns.Subscription
self.external_sns_topic.add_subscription(
SqsSubscription(
queue=self.external_sns_event_queue,
raw_message_delivery=True,
)
)

triggered_lambda_fn.add_event_source(
source=SqsEventSource(
queue=self.external_sns_event_queue,
report_batch_item_failures=True,
filters=external_sns_event_queue_filters,
enabled=(env_base.env_type is EnvType.PROD),
)
)
self.external_sns_event_queue.grant_consume_messages(triggered_lambda_fn)

# Alarm that fires if external_sns_event_queue fails delivery or if lambda fails to process
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a comment before about making this configurable. could you add a todo maybe if not make it configurable this go around?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, will add a TODO comment

# Further actions can be configured by accessing ExternalSnsTrigger.triggered_lambda_dlq_alarm resource
# TODO: Revisit making alarms even more configurable in the future
self.triggered_lambda_dlq_alarm = cloudwatch.Alarm(
scope=self,
id=env_base.get_construct_id(external_sns_event_name, "sns-event-dlq-alarm"),
alarm_description=(
f"Alarm if more than 1 message in {external_sns_event_dlq_name} in 10 minute period"
),
metric=self.external_sns_event_dlq.metric_approximate_number_of_messages_visible(
statistic=cloudwatch.Stats.MAXIMUM,
period=cdk.Duration.minutes(10),
),
evaluation_periods=1,
threshold=1,
datapoints_to_alarm=1,
comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD,
)
Loading