diff --git a/src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/__init__.py b/src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/lambda_.py b/src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/lambda_.py new file mode 100644 index 0000000..d8b32c4 --- /dev/null +++ b/src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/lambda_.py @@ -0,0 +1,177 @@ +from dataclasses import dataclass +from typing import List, Optional + +from aibs_informatics_core.env import EnvBase +from attr import field +from aws_cdk import aws_cloudwatch as cw +from aws_cdk import aws_lambda as lambda_ + +from aibs_informatics_cdk_lib.constructs_.cw.types import ( + AlarmMetricConfig, + GraphMetricConfig, + GroupedGraphMetricConfig, +) + + +@dataclass +class LambdaFunctionMetricConfigGenerator: + lambda_function: lambda_.IFunction + lambda_function_name: str = field(default=None) + dimension_map: dict = field(init=False) + + def __post_init__(self): + if self.lambda_function_name is None: + self.lambda_function_name = self.lambda_function.function_name + + self.dimension_map = {"FunctionName": self.lambda_function_name} + + def get_invocations_metric( + self, + name_override: Optional[str] = None, + ) -> GraphMetricConfig: + return GraphMetricConfig( + metric="Invocations", + label=f"{name_override or self.lambda_function_name} Invocations", + statistic="Sum", + dimension_map=self.dimension_map, + ) + + def get_errors_metric( + self, + name_override: Optional[str] = None, + discriminator: Optional[str] = None, + include_alarm: bool = False, + alarm_threshold: int = 1, + alarm_evaluation_periods: int = 3, + alarm_datapoints_to_alarm: int = 1, + ) -> GraphMetricConfig: + name = name_override or self.lambda_function_name + idx = discriminator or "0" + config = GraphMetricConfig( + metric="Errors", + statistic="Sum", + label=f"{name} Errors", + dimension_map=self.dimension_map, + ) + if include_alarm: + config["alarm"] = AlarmMetricConfig( + name=f"{name} Errors Alarm {idx}", + threshold=alarm_threshold, + evaluation_periods=alarm_evaluation_periods, + datapoints_to_alarm=alarm_datapoints_to_alarm, + comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD, + ) + return config + + def get_availability_metric( + self, + name_override: Optional[str] = None, + discriminator: Optional[str] = None, + ) -> GraphMetricConfig: + name = name_override or self.lambda_function_name + idx = discriminator or "0" + + return GraphMetricConfig( + metric="Availability", + statistic="Average", + dimension_map=self.dimension_map, + label=f"{name} %", + metric_expression=f"100 - 100 * errors_{idx} / MAX([errors_{idx}, invocations_{idx}])", + using_metrics={ + f"errors_{idx}": self.lambda_function.metric_errors(), + f"invocations_{idx}": self.lambda_function.metric_invocations(), + }, + ) + + def get_duration_avg_metric( + self, + name_override: Optional[str] = None, + ) -> GraphMetricConfig: + name = name_override or self.lambda_function_name + return GraphMetricConfig( + metric="Duration", + statistic="Average", + dimension_map=self.dimension_map, + label=f"{name} Avg", + ) + + def get_duration_max_metric( + self, + name_override: Optional[str] = None, + ) -> GraphMetricConfig: + name = name_override or self.lambda_function_name + return GraphMetricConfig( + metric="Duration", + statistic="Maximum", + dimension_map=self.dimension_map, + label=f"{name} Max", + ) + + def get_duration_min_metric( + self, + name_override: Optional[str] = None, + ) -> GraphMetricConfig: + name = name_override or self.lambda_function_name + return GraphMetricConfig( + metric="Duration", + statistic="Minimum", + dimension_map=self.dimension_map, + label=f"{name} Min", + ) + + def get_duration_metric_group( + self, + name_override: Optional[str] = None, + title: Optional[str] = None, + include_min_max_duration: bool = False, + ) -> GroupedGraphMetricConfig: + name = name_override or self.lambda_function_name + + avg = self.get_duration_avg_metric(name_override) + if include_min_max_duration: + min_ = self.get_duration_min_metric(name_override) + max_ = self.get_duration_max_metric(name_override) + + return GroupedGraphMetricConfig( + title=title or f"{name} Duration", + namespace="AWS/Lambda", + metrics=[avg, min_, max_], + ) + + def get_success_failure_metrics( + self, + name_override: Optional[str] = None, + success_as_percent: bool = True, + ) -> List[GraphMetricConfig]: + name = name_override or self.lambda_function_name + + failures = self.get_errors_metric(name) + if success_as_percent: + success = self.get_availability_metric(name) + else: + success = self.get_invocations_metric(name) + success["axis_side"] = "right" + failures["axis_side"] = "left" + return [success, failures] + + def get_success_failure_metric_group( + self, + name_override: Optional[str] = None, + title: Optional[str] = None, + success_as_percent: bool = True, + ) -> GroupedGraphMetricConfig: + name = name_override or self.lambda_function_name + + failures = self.get_errors_metric(name_override) + if success_as_percent: + success = self.get_availability_metric(name_override) + else: + success = self.get_invocations_metric(name_override) + success["axis_side"] = "right" + failures["axis_side"] = "left" + + return GroupedGraphMetricConfig( + title=title or f"{name} Invocations", + namespace="AWS/Lambda", + metrics=[success, failures], + ) diff --git a/src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py b/src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py new file mode 100644 index 0000000..509aa89 --- /dev/null +++ b/src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py @@ -0,0 +1,145 @@ +from dataclasses import dataclass +from typing import Literal, Optional + +from aibs_informatics_core.env import EnvBase +from attr import field +from aws_cdk import aws_cloudwatch as cw +from aws_cdk import aws_stepfunctions as sfn + +from aibs_informatics_cdk_lib.constructs_.cw.types import AlarmMetricConfig, GraphMetricConfig + +SFN_TIME_UNITS = Literal["hours", "minutes", "seconds", "milliseconds"] + + +@dataclass +class StateMachineMetricConfigGenerator: + state_machine: sfn.IStateMachine + state_machine_name: str + dimension_map: dict = field(init=False) + + def __post_init__(self): + self.dimension_map = {"StateMachineArn": self.state_machine.state_machine_arn} + + def get_execution_completion_metric( + self, name_override: Optional[str] = None + ) -> GraphMetricConfig: + """get the execution completion metric for the state machine + + Args: + name_override (Optional[str], optional): override for name used. + Defaults to None. + + Returns: + GraphMetricConfig + """ + return GraphMetricConfig( + metric="ExecutionsSucceeded", + label=f"{name_override or self.state_machine_name} Completed", + statistic="Sum", + dimension_map=self.dimension_map, + ) + + def get_execution_invocations_metric( + self, name_override: Optional[str] = None + ) -> GraphMetricConfig: + """get the execution invocations metric for the state machine + + Args: + name_override (Optional[str], optional): override for name used. + Defaults to None. + + Returns: + GraphMetricConfig + """ + return GraphMetricConfig( + metric="ExecutionsStarted", + label=f"{name_override or self.state_machine_name} Started", + statistic="Sum", + dimension_map=self.dimension_map, + ) + + def get_execution_failures_metric( + self, + name_override: Optional[str] = None, + discriminator: Optional[str] = None, + alarm_threshold: int = 1, + alarm_evaluation_periods: int = 3, + alarm_datapoints_to_alarm: int = 1, + ) -> GraphMetricConfig: + """get the execution failures metric for the state machine + + Args: + name_override (Optional[str], optional): override for name used. + Defaults to state machine name. + discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math. + Defaults to "0". + alarm_threshold (int, optional): Alarm threshold used. Defaults to 1. + alarm_evaluation_periods (int, optional): Alarm evaluation periods. Defaults to 3. + alarm_datapoints_to_alarm (int, optional): Alarm datapoints to alarm. Defaults to 1. + + Returns: + GraphMetricConfig: _description_ + """ + name = name_override or self.state_machine_name + idx = discriminator or "0" + return GraphMetricConfig( + metric="ExecutionErrors", + statistic="Sum", + label=f"{name} Errors", + dimension_map=self.dimension_map, + metric_expression=( + f"failed_{idx} + aborted_{idx} + timed_out_{idx} + throttled_{idx}" + ), + using_metrics={ + f"failed_{idx}": self.state_machine.metric_failed(), + f"aborted_{idx}": self.state_machine.metric_aborted(), + f"timed_out_{idx}": self.state_machine.metric_timed_out(), + f"throttled_{idx}": self.state_machine.metric_throttled(), + }, + alarm=AlarmMetricConfig( + name=f"{name}-errors", + threshold=alarm_threshold, + evaluation_periods=alarm_evaluation_periods, + datapoints_to_alarm=alarm_datapoints_to_alarm, + comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD, + ), + ) + + def get_execution_timing_metric( + self, + name_override: Optional[str] = None, + discriminator: Optional[str] = None, + time_unit: SFN_TIME_UNITS = "minutes", + ) -> GraphMetricConfig: + """get the execution time metric for the state machine + + Args: + name_override (Optional[str], optional): override for name used. + Defaults to state machine name. + discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math. + Defaults to "0". + time_unit (SFN_TIME_UNITS, optional): unit of time to use for metric. + Defaults to "minutes". + + Returns: + GraphMetricConfig + """ + name = name_override or self.state_machine_name + idx = discriminator or "0" + if time_unit == "seconds": + divisor = " / 1000" + elif time_unit == "minutes": + divisor = " / 1000 / 60" + elif time_unit == "hours": + divisor = " / 1000 / 60 / 60" + else: + divisor = "" + + return GraphMetricConfig( + metric="ExecutionTime", + statistic="Average", + label=f"{name} Execution Time", + dimension_map=self.dimension_map, + metric_expression=f"time_msec_{idx} {divisor}", + using_metrics={f"time_msec_{idx}": self.state_machine.metric_time()}, + ) diff --git a/src/aibs_informatics_cdk_lib/constructs_/cw/dashboard.py b/src/aibs_informatics_cdk_lib/constructs_/cw/dashboard.py index f0c1b67..5c96162 100644 --- a/src/aibs_informatics_cdk_lib/constructs_/cw/dashboard.py +++ b/src/aibs_informatics_cdk_lib/constructs_/cw/dashboard.py @@ -11,8 +11,11 @@ from aws_cdk import aws_cloudwatch_actions as cw_actions from aws_cdk import aws_sns as sns -from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstruct, EnvBaseConstructMixins -from aibs_informatics_cdk_lib.constructs_.cw.types import GroupedGraphMetricConfig +from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstructMixins +from aibs_informatics_cdk_lib.constructs_.cw.types import ( + GroupedGraphMetricConfig, + to_comparison_operator, +) class DashboardMixins(EnvBaseConstructMixins): @@ -105,7 +108,7 @@ def create_widgets_and_alarms( Returns: Tuple[List[cw.IWidget], List[cw.IAlarm]]: List of widgets and list of alarms """ - self_stack = cdk.Stack.of(self.as_construct()) + self_stack = cdk.Stack.of(self.dashboard) graph_widgets: List[cw.IWidget] = [] metric_alarms: List[cw.IAlarm] = [] @@ -162,13 +165,13 @@ def create_widgets_and_alarms( ) metric_axis = metric_config.get("axis_side", "left") - lr_graph_metrics[metric_axis].append(graph_metric) + lr_graph_metrics[metric_axis].append(graph_metric) # type: ignore # MathExpression implements IMetric metric_alarm_config = metric_config.get("alarm") if metric_alarm_config: alarm_name = metric_alarm_config["name"] alarm = graph_metric.create_alarm( - self_stack or self, + self_stack, self.get_construct_id(alarm_name, alarm_id_discriminator), # TODO: every time a change is made to these alarms, Cfn throws an error # for trying to modify what is a custom resource. So instead, let @@ -178,7 +181,9 @@ def create_widgets_and_alarms( threshold=metric_alarm_config["threshold"], evaluation_periods=metric_alarm_config["evaluation_periods"], datapoints_to_alarm=metric_alarm_config["datapoints_to_alarm"], - comparison_operator=metric_alarm_config["comparison_operator"], + comparison_operator=to_comparison_operator( + metric_alarm_config["comparison_operator"] + ), ) lr_annotations[metric_axis].append( cw.HorizontalAnnotation( @@ -188,7 +193,7 @@ def create_widgets_and_alarms( ) metric_alarms.append(alarm) if alarm_topic: - alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic)) + alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic)) # type: ignore # SnsAction implements IAlarmAction graph_widgets.append( cw.GraphWidget( @@ -253,13 +258,6 @@ def __init__( self.dashboard = self -class DashboardTools(EnvBaseConstruct, DashboardMixins): - def __init__( - self, - scope: constructs.Construct, - id: Optional[str], - env_base: EnvBase, - dashboard: cw.Dashboard, - ) -> None: - super().__init__(scope, id, env_base) +class DashboardTools(DashboardMixins): + def __init__(self, dashboard: cw.Dashboard) -> None: self.dashboard = dashboard diff --git a/src/aibs_informatics_cdk_lib/constructs_/cw/types.py b/src/aibs_informatics_cdk_lib/constructs_/cw/types.py index a1efaf0..e02ecf0 100644 --- a/src/aibs_informatics_cdk_lib/constructs_/cw/types.py +++ b/src/aibs_informatics_cdk_lib/constructs_/cw/types.py @@ -3,6 +3,23 @@ from aws_cdk import aws_cloudwatch as cw +def to_comparison_operator(value: Union[cw.ComparisonOperator, str]) -> cw.ComparisonOperator: + if isinstance(value, cw.ComparisonOperator): + return value + elif value.lower() in [">=", "greater_than_or_equal_to"]: + return cw.ComparisonOperator.GREATER_THAN_OR_EQUAL_TO_THRESHOLD + elif value.lower() in [">", "greater_than"]: + return cw.ComparisonOperator.GREATER_THAN_THRESHOLD + elif value.lower() in ["<=", "less_than_or_equal_to"]: + return cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD + elif value.lower() in ["<", "less_than"]: + return cw.ComparisonOperator.LESS_THAN_THRESHOLD + elif value.lower() in ["<>", "out_of_range"]: + return cw.ComparisonOperator.LESS_THAN_LOWER_OR_GREATER_THAN_UPPER_THRESHOLD + else: + return cw.ComparisonOperator(value) + + class _AlarmMetricConfigOptional(TypedDict, total=False): pass diff --git a/src/aibs_informatics_cdk_lib/constructs_/monitoring.py b/src/aibs_informatics_cdk_lib/constructs_/monitoring.py index f953a87..3f7f5ef 100644 --- a/src/aibs_informatics_cdk_lib/constructs_/monitoring.py +++ b/src/aibs_informatics_cdk_lib/constructs_/monitoring.py @@ -1,6 +1,8 @@ -from typing import List, Optional, Tuple, Union, cast +from typing import List, Literal, Optional, Union from aibs_informatics_core.env import EnvBase, ResourceNameBaseEnum +from aibs_informatics_core.models.email_address import EmailAddress +from aibs_informatics_core.utils.hashing import uuid_str from aws_cdk import Duration from aws_cdk import aws_cloudwatch as cw from aws_cdk import aws_lambda as lambda_ @@ -17,6 +19,13 @@ GraphMetricConfig, GroupedGraphMetricConfig, ) +from aibs_informatics_cdk_lib.constructs_.cw.config_generators.lambda_ import ( + LambdaFunctionMetricConfigGenerator, +) +from aibs_informatics_cdk_lib.constructs_.cw.config_generators.sfn import ( + SFN_TIME_UNITS, + StateMachineMetricConfigGenerator, +) class MonitoringConstruct(EnvBaseConstruct): @@ -25,11 +34,14 @@ def __init__( scope: Construct, id: str, env_base: EnvBase, + name: Optional[str] = None, + notify_on_alarms: Optional[bool] = None, + alarm_topic: Optional[sns.Topic] = None, ) -> None: super().__init__(scope, id, env_base) - self.monitoring_name = self.construct_id - self.notify_on_alarms = None - self._alarm_topic = None + self.monitoring_name = name or self.construct_id + self.notify_on_alarms = notify_on_alarms + self.alarm_topic = alarm_topic @property def monitoring_name(self) -> str: @@ -46,16 +58,20 @@ def notify_on_alarms(self) -> bool: return self._notify_on_alarms @notify_on_alarms.setter - def notify_on_alarms(self, value: bool): + def notify_on_alarms(self, value: Optional[bool]): self._notify_on_alarms = value @property def alarm_topic(self) -> sns.Topic: if self._alarm_topic is None: - self._alarm_topic = sns.Topic( + self.alarm_topic = sns.Topic( self, self.get_construct_id(self.monitoring_name, "alarm-topic") ) - return self._alarm_topic + return self.alarm_topic + + @alarm_topic.setter + def alarm_topic(self, value: Optional[sns.Topic]): + self._alarm_topic = value def create_dashboard( self, start: Optional[str] = "-P1W", end: Optional[str] = None @@ -69,228 +85,212 @@ def create_dashboard( end=end, ) - def add_function_widgets( + def add_function_widget( self, - group_name: str, - *function_names_groups: Union[Tuple[str, List[str]], List[str]], + dashboard: cw.Dashboard, + function_name: str, + title: Optional[str] = None, + title_header_level: int = 1, + prefix_name_with_env: bool = True, include_min_max_duration: bool = False, ): - self.dashboard_tools.add_text_widget(f"{group_name} Lambda Functions", 1) + self.add_function_widgets( + dashboard, + function_name, + title=title, + title_header_level=title_header_level, + prefix_name_with_env=prefix_name_with_env, + include_min_max_duration=include_min_max_duration, + ) - function_names_groups: List[Tuple[str, List[str]]] = [ - ( - cast(Tuple[str, List[str]], _) - if isinstance(_, tuple) and len(_) == 2 and isinstance(_[-1], list) - else ("/".join(_), _) - ) - for _ in function_names_groups - ] + def add_function_widgets( + self, + dashboard: cw.Dashboard, + *function_names: str, + title: Optional[str] = None, + title_header_level: int = 1, + prefix_name_with_env: bool = True, + include_min_max_duration: bool = False, + include_alarm: bool = False, + ): + dashboard_tools = ( + dashboard if isinstance(dashboard, EnhancedDashboard) else DashboardTools(dashboard) + ) + if title: + dashboard_tools.add_text_widget(title, title_header_level) - for sub_group_name, function_names in function_names_groups: - grouped_invocation_metrics: List[GraphMetricConfig] = [] - grouped_error_metrics: List[GraphMetricConfig] = [] - grouped_timing_metrics: List[GraphMetricConfig] = [] + grouped_invocation_metrics: List[GraphMetricConfig] = [] + grouped_error_metrics: List[GraphMetricConfig] = [] + grouped_timing_metrics: List[GraphMetricConfig] = [] + group_name = uuid_str(str(function_names)) - for idx, raw_function_name in enumerate(function_names): + for idx, raw_function_name in enumerate(function_names): + if prefix_name_with_env: function_name = self.get_name_with_env(raw_function_name) - fn = lambda_.Function.from_function_name( - self, f"{function_name}-from-name", function_name=function_name - ) - dimension_map = {"FunctionName": function_name} - - grouped_invocation_metrics.append( - GraphMetricConfig( - metric="Invocations", - statistic="Sum", - dimension_map=dimension_map, - label=f"{raw_function_name} Count", - ) - ) + else: + function_name = raw_function_name - grouped_error_metrics.append( - GraphMetricConfig( - metric="Errors", - statistic="Sum", - dimension_map=dimension_map, - label=f"{raw_function_name} Errors", - ) - ) - grouped_error_metrics.append( - GraphMetricConfig( - metric="Availability", - statistic="Average", - dimension_map=dimension_map, - label=f"{raw_function_name} %", - metric_expression=f"100 - 100 * errors_{idx} / MAX([errors_{idx}, invocations_{idx}])", - using_metrics={ - f"errors_{idx}": fn.metric_errors(), - f"invocations_{idx}": fn.metric_invocations(), - }, - axis_side="right", - ) - ) + fn_config_generator = LambdaFunctionMetricConfigGenerator( + lambda_function=lambda_.Function.from_function_name( + self, f"{function_name}-from-name", function_name=function_name + ), + lambda_function_name=function_name, + ) - grouped_timing_metrics.append( - GraphMetricConfig( - metric="Duration", - statistic="Average", - dimension_map=dimension_map, - label=f"{raw_function_name} Avg", - ) + grouped_invocation_metrics.append(fn_config_generator.get_invocations_metric()) + grouped_error_metrics.append( + fn_config_generator.get_errors_metric( + discriminator=str(idx), include_alarm=include_alarm ) - if include_min_max_duration: - grouped_timing_metrics.append( - GraphMetricConfig( - metric="Duration", - statistic="Minimum", - dimension_map=dimension_map, - label=f"{raw_function_name} Min", - ) - ) - grouped_timing_metrics.append( - GraphMetricConfig( - metric="Duration", - statistic="Maximum", - dimension_map=dimension_map, - label=f"{raw_function_name} Max", - ) - ) - - grouped_metrics: List[GroupedGraphMetricConfig] = [ - GroupedGraphMetricConfig( - title="Function Invocations", metrics=grouped_invocation_metrics - ), - GroupedGraphMetricConfig( - title="Function Successes / Failures", metrics=grouped_error_metrics - ), - GroupedGraphMetricConfig( - title="Function Duration", - namespace="AWS/Lambda", - metrics=grouped_timing_metrics, - ), - ] - self.dashboard_tools.add_text_widget(sub_group_name, 2) + ) - self.dashboard_tools.add_graphs( - grouped_metric_configs=grouped_metrics, + # Availability metric - make sure to set the axis_side to "right" + avail_metric = fn_config_generator.get_availability_metric(discriminator=str(idx)) + avail_metric["axis_side"] = "right" + grouped_error_metrics.append(avail_metric) + + duration_avg_metric = fn_config_generator.get_duration_avg_metric() + grouped_timing_metrics.append(duration_avg_metric) + if include_min_max_duration: + grouped_timing_metrics.append(fn_config_generator.get_duration_max_metric()) + grouped_timing_metrics.append(fn_config_generator.get_duration_min_metric()) + + grouped_metrics: List[GroupedGraphMetricConfig] = [ + GroupedGraphMetricConfig( + title="Function Invocations", metrics=grouped_invocation_metrics + ), + GroupedGraphMetricConfig( + title="Function Successes / Failures", metrics=grouped_error_metrics + ), + GroupedGraphMetricConfig( + title="Function Duration", namespace="AWS/Lambda", - period=Duration.minutes(5), - alarm_id_discriminator=sub_group_name, - alarm_topic=self.alarm_topic if self.notify_on_alarms else None, - dimensions={}, - ) + metrics=grouped_timing_metrics, + ), + ] + + dashboard_tools.add_graphs( + grouped_metric_configs=grouped_metrics, + namespace="AWS/Lambda", + period=Duration.minutes(5), + alarm_id_discriminator=group_name, + alarm_topic=self.alarm_topic if self.notify_on_alarms else None, + dimensions={}, + ) + + def add_state_machine_widget( + self, + dashboard: cw.Dashboard, + state_machine_name: str, + title: Optional[str] = None, + title_header_level: int = 1, + prefix_name_with_env: bool = True, + ): + self.add_state_machine_widgets( + dashboard, + state_machine_name, + title=title, + title_header_level=title_header_level, + prefix_name_with_env=prefix_name_with_env, + ) def add_state_machine_widgets( self, - group_name: str, - *grouped_state_machine_names: Union[Tuple[str, List[str]], List[str]], + dashboard: cw.Dashboard, + *state_machine_names: str, + title: Optional[str] = None, + title_header_level: int = 1, + prefix_name_with_env: bool = True, + time_unit: SFN_TIME_UNITS = "minutes", ): - grouped_state_machine_names: List[Tuple[str, List[str]]] = [ - ( - cast(Tuple[str, List[str]], _) - if isinstance(_, tuple) and len(_) == 2 and isinstance(_[-1], list) - else ("/".join(_), _) - ) - for _ in grouped_state_machine_names - ] - self.dashboard_tools.add_text_widget(f"{group_name} State Machines", 1) + dashboard_tools = ( + dashboard if isinstance(dashboard, EnhancedDashboard) else DashboardTools(dashboard) + ) - for sub_group_name, raw_state_machine_names in grouped_state_machine_names: - grouped_invocation_metrics: List[GraphMetricConfig] = [] - grouped_error_metrics: List[GraphMetricConfig] = [] - grouped_timing_metrics: List[GraphMetricConfig] = [] + if title: + dashboard_tools.add_text_widget(title, title_header_level) - for idx, raw_state_machine_name in enumerate(raw_state_machine_names): - state_machine_name = self.get_state_machine_name(raw_state_machine_name) - state_machine_arn = self.get_state_machine_arn(raw_state_machine_name) + grouped_invocation_metrics: List[GraphMetricConfig] = [] + grouped_error_metrics: List[GraphMetricConfig] = [] + grouped_timing_metrics: List[GraphMetricConfig] = [] + group_name = uuid_str(str(state_machine_names)) + for idx, raw_state_machine_name in enumerate(state_machine_names): + state_machine_name = self.get_state_machine_name( + raw_state_machine_name, prefix_name_with_env=prefix_name_with_env + ) - state_machine = sfn.StateMachine.from_state_machine_name( + sm_config_generator = StateMachineMetricConfigGenerator( + state_machine=sfn.StateMachine.from_state_machine_name( self, f"{state_machine_name}-from-name", state_machine_name + ), + state_machine_name=state_machine_name, + ) + grouped_invocation_metrics.append( + sm_config_generator.get_execution_invocations_metric(raw_state_machine_name) + ) + grouped_invocation_metrics.append( + sm_config_generator.get_execution_completion_metric(raw_state_machine_name) + ) + grouped_error_metrics.append( + sm_config_generator.get_execution_failures_metric( + raw_state_machine_name, discriminator=str(idx) ) - - dimension_map = {"StateMachineArn": state_machine_arn} - grouped_invocation_metrics.append( - GraphMetricConfig( - metric="ExecutionsSucceeded", - label=f"{raw_state_machine_name} Completed", - statistic="Sum", - dimension_map=dimension_map, - ) - ) - grouped_invocation_metrics.append( - GraphMetricConfig( - metric="ExecutionsStarted", - label=f"{raw_state_machine_name} Started", - statistic="Sum", - dimension_map=dimension_map, - ) - ) - grouped_error_metrics.append( - GraphMetricConfig( - metric="ExecutionErrors", - statistic="Sum", - label=f"{raw_state_machine_name} Errors", - dimension_map=dimension_map, - metric_expression=( - f"failed_{idx} + aborted_{idx} + timed_out_{idx} + throttled_{idx}" - ), - using_metrics={ - f"failed_{idx}": state_machine.metric_failed(), - f"aborted_{idx}": state_machine.metric_aborted(), - f"timed_out_{idx}": state_machine.metric_timed_out(), - f"throttled_{idx}": state_machine.metric_throttled(), - }, - alarm=AlarmMetricConfig( - name=f"{raw_state_machine_name}-errors", - threshold=1, - evaluation_periods=3, - datapoints_to_alarm=1, - comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD, - ), - ), + ) + grouped_timing_metrics.append( + sm_config_generator.get_execution_timing_metric( + raw_state_machine_name, + discriminator=str(idx), + time_unit=time_unit, ) + ) - grouped_timing_metrics.append( - GraphMetricConfig( - metric="ExecutionTime", - statistic="Average", - label=f"{raw_state_machine_name} Time", - dimension_map=dimension_map, - metric_expression=f"time_sec_{idx} / 1000 / 60", - using_metrics={f"time_sec_{idx}": state_machine.metric_time()}, - ), - ) + grouped_metrics = [ + GroupedGraphMetricConfig( + title="Execution Invocations", metrics=grouped_invocation_metrics + ), + GroupedGraphMetricConfig(title="Execution Errors", metrics=grouped_error_metrics), + GroupedGraphMetricConfig( + title="Execution Time", + metrics=grouped_timing_metrics, + left_y_axis=cw.YAxisProps(label=f"Time ({time_unit})"), + ), + ] - grouped_metrics = [ - GroupedGraphMetricConfig( - title="Execution Invocations", metrics=grouped_invocation_metrics - ), - GroupedGraphMetricConfig(title="Execution Errors", metrics=grouped_error_metrics), - GroupedGraphMetricConfig( - title="Execution Time", - metrics=grouped_timing_metrics, - left_y_axis=cw.YAxisProps(label="Time (min)"), - ), - ] - - self.dashboard_tools.add_text_widget(sub_group_name, 2) - self.dashboard_tools.add_graphs( - grouped_metric_configs=grouped_metrics, - namespace="AWS/States", - period=Duration.minutes(5), - alarm_id_discriminator=sub_group_name, - alarm_topic=self.alarm_topic, - dimensions={}, - ) + dashboard_tools.add_graphs( + grouped_metric_configs=grouped_metrics, + namespace="AWS/States", + period=Duration.minutes(5), + alarm_id_discriminator=group_name, + alarm_topic=self.alarm_topic if self.notify_on_alarms else None, + dimensions={}, + ) - def get_state_machine_name(self, name: Union[str, ResourceNameBaseEnum]) -> str: + def add_alarm_subscription(self, email: Union[str, EmailAddress]): + if not isinstance(email, EmailAddress): + email = EmailAddress(email) + + return sns.Subscription( + self, + self.get_construct_id(f"{email}-alarm-subscription"), + topic=self.alarm_topic, # type: ignore # Topic implements ITopic + endpoint=email, + protocol=sns.SubscriptionProtocol.EMAIL, + ) + + def get_state_machine_name( + self, name: Union[str, ResourceNameBaseEnum], prefix_name_with_env: bool = True + ) -> str: if isinstance(name, ResourceNameBaseEnum): return name.get_name(self.env_base) - else: + elif prefix_name_with_env: return self.env_base.get_state_machine_name(name) + else: + return name - def get_state_machine_arn(self, name) -> str: - state_machine_name = self.get_state_machine_name(name) + def get_state_machine_arn( + self, name: Union[str, ResourceNameBaseEnum], prefix_name_with_env: bool = True + ) -> str: + state_machine_name = self.get_state_machine_name(name, prefix_name_with_env) return build_sfn_arn(resource_type="stateMachine", resource_id=state_machine_name) @@ -301,16 +301,12 @@ def __init__( id: str, env_base: EnvBase, notify_on_alarms: Optional[bool] = None, + alarm_email: Optional[str] = None, ) -> None: super().__init__(scope, id, env_base) self.notify_on_alarms = notify_on_alarms if self.notify_on_alarms: - sns.Subscription( - self, - self.get_construct_id(f"{id}-alarm-subscription"), - topic=self.alarm_topic, - endpoint="marmotdev@alleninstitute.org", - protocol=sns.SubscriptionProtocol("EMAIL"), - ) + email = alarm_email or "marmotdev@alleninstitute.org" + self.add_alarm_subscription(email) self.dashboard = self.create_dashboard(start="-P1D")