Skip to content

Commit

Permalink
update constructs
Browse files Browse the repository at this point in the history
  • Loading branch information
rpmcginty committed Jul 26, 2024
1 parent 12d4970 commit 06ec42b
Show file tree
Hide file tree
Showing 6 changed files with 556 additions and 223 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from dataclasses import dataclass
from typing import List, Optional

from aibs_informatics_core.env import EnvBase
from attr import field
from aws_cdk import aws_cloudwatch as cw
from aws_cdk import aws_lambda as lambda_

from aibs_informatics_cdk_lib.constructs_.cw.types import (
AlarmMetricConfig,
GraphMetricConfig,
GroupedGraphMetricConfig,
)


@dataclass
class LambdaFunctionMetricConfigGenerator:
lambda_function: lambda_.IFunction
lambda_function_name: str = field(default=None)
dimension_map: dict = field(init=False)

def __post_init__(self):
if self.lambda_function_name is None:
self.lambda_function_name = self.lambda_function.function_name

self.dimension_map = {"FunctionName": self.lambda_function_name}

def get_invocations_metric(
self,
name_override: Optional[str] = None,
) -> GraphMetricConfig:
return GraphMetricConfig(
metric="Invocations",
label=f"{name_override or self.lambda_function_name} Invocations",
statistic="Sum",
dimension_map=self.dimension_map,
)

def get_errors_metric(
self,
name_override: Optional[str] = None,
discriminator: Optional[str] = None,
include_alarm: bool = False,
alarm_threshold: int = 1,
alarm_evaluation_periods: int = 3,
alarm_datapoints_to_alarm: int = 1,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
idx = discriminator or "0"
config = GraphMetricConfig(
metric="Errors",
statistic="Sum",
label=f"{name} Errors",
dimension_map=self.dimension_map,
)
if include_alarm:
config["alarm"] = AlarmMetricConfig(
name=f"{name} Errors Alarm {idx}",
threshold=alarm_threshold,
evaluation_periods=alarm_evaluation_periods,
datapoints_to_alarm=alarm_datapoints_to_alarm,
comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
)
return config

def get_availability_metric(
self,
name_override: Optional[str] = None,
discriminator: Optional[str] = None,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
idx = discriminator or "0"

return GraphMetricConfig(
metric="Availability",
statistic="Average",
dimension_map=self.dimension_map,
label=f"{name} %",
metric_expression=f"100 - 100 * errors_{idx} / MAX([errors_{idx}, invocations_{idx}])",
using_metrics={
f"errors_{idx}": self.lambda_function.metric_errors(),
f"invocations_{idx}": self.lambda_function.metric_invocations(),
},
)

def get_duration_avg_metric(
self,
name_override: Optional[str] = None,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
return GraphMetricConfig(
metric="Duration",
statistic="Average",
dimension_map=self.dimension_map,
label=f"{name} Avg",
)

def get_duration_max_metric(
self,
name_override: Optional[str] = None,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
return GraphMetricConfig(
metric="Duration",
statistic="Maximum",
dimension_map=self.dimension_map,
label=f"{name} Max",
)

def get_duration_min_metric(
self,
name_override: Optional[str] = None,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
return GraphMetricConfig(
metric="Duration",
statistic="Minimum",
dimension_map=self.dimension_map,
label=f"{name} Min",
)

def get_duration_metric_group(
self,
name_override: Optional[str] = None,
title: Optional[str] = None,
include_min_max_duration: bool = False,
) -> GroupedGraphMetricConfig:
name = name_override or self.lambda_function_name

avg = self.get_duration_avg_metric(name_override)
if include_min_max_duration:
min_ = self.get_duration_min_metric(name_override)
max_ = self.get_duration_max_metric(name_override)

return GroupedGraphMetricConfig(
title=title or f"{name} Duration",
namespace="AWS/Lambda",
metrics=[avg, min_, max_],
)

def get_success_failure_metrics(
self,
name_override: Optional[str] = None,
success_as_percent: bool = True,
) -> List[GraphMetricConfig]:
name = name_override or self.lambda_function_name

failures = self.get_errors_metric(name)
if success_as_percent:
success = self.get_availability_metric(name)
else:
success = self.get_invocations_metric(name)
success["axis_side"] = "right"
failures["axis_side"] = "left"
return [success, failures]

def get_success_failure_metric_group(
self,
name_override: Optional[str] = None,
title: Optional[str] = None,
success_as_percent: bool = True,
) -> GroupedGraphMetricConfig:
name = name_override or self.lambda_function_name

failures = self.get_errors_metric(name_override)
if success_as_percent:
success = self.get_availability_metric(name_override)
else:
success = self.get_invocations_metric(name_override)
success["axis_side"] = "right"
failures["axis_side"] = "left"

return GroupedGraphMetricConfig(
title=title or f"{name} Invocations",
namespace="AWS/Lambda",
metrics=[success, failures],
)
145 changes: 145 additions & 0 deletions src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from dataclasses import dataclass
from typing import Literal, Optional

from aibs_informatics_core.env import EnvBase
from attr import field
from aws_cdk import aws_cloudwatch as cw
from aws_cdk import aws_stepfunctions as sfn

from aibs_informatics_cdk_lib.constructs_.cw.types import AlarmMetricConfig, GraphMetricConfig

SFN_TIME_UNITS = Literal["hours", "minutes", "seconds", "milliseconds"]


@dataclass
class StateMachineMetricConfigGenerator:
state_machine: sfn.IStateMachine
state_machine_name: str
dimension_map: dict = field(init=False)

def __post_init__(self):
self.dimension_map = {"StateMachineArn": self.state_machine.state_machine_arn}

def get_execution_completion_metric(
self, name_override: Optional[str] = None
) -> GraphMetricConfig:
"""get the execution completion metric for the state machine
Args:
name_override (Optional[str], optional): override for name used.
Defaults to None.
Returns:
GraphMetricConfig
"""
return GraphMetricConfig(
metric="ExecutionsSucceeded",
label=f"{name_override or self.state_machine_name} Completed",
statistic="Sum",
dimension_map=self.dimension_map,
)

def get_execution_invocations_metric(
self, name_override: Optional[str] = None
) -> GraphMetricConfig:
"""get the execution invocations metric for the state machine
Args:
name_override (Optional[str], optional): override for name used.
Defaults to None.
Returns:
GraphMetricConfig
"""
return GraphMetricConfig(
metric="ExecutionsStarted",
label=f"{name_override or self.state_machine_name} Started",
statistic="Sum",
dimension_map=self.dimension_map,
)

def get_execution_failures_metric(
self,
name_override: Optional[str] = None,
discriminator: Optional[str] = None,
alarm_threshold: int = 1,
alarm_evaluation_periods: int = 3,
alarm_datapoints_to_alarm: int = 1,
) -> GraphMetricConfig:
"""get the execution failures metric for the state machine
Args:
name_override (Optional[str], optional): override for name used.
Defaults to state machine name.
discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math.
Defaults to "0".
alarm_threshold (int, optional): Alarm threshold used. Defaults to 1.
alarm_evaluation_periods (int, optional): Alarm evaluation periods. Defaults to 3.
alarm_datapoints_to_alarm (int, optional): Alarm datapoints to alarm. Defaults to 1.
Returns:
GraphMetricConfig: _description_
"""
name = name_override or self.state_machine_name
idx = discriminator or "0"
return GraphMetricConfig(
metric="ExecutionErrors",
statistic="Sum",
label=f"{name} Errors",
dimension_map=self.dimension_map,
metric_expression=(
f"failed_{idx} + aborted_{idx} + timed_out_{idx} + throttled_{idx}"
),
using_metrics={
f"failed_{idx}": self.state_machine.metric_failed(),
f"aborted_{idx}": self.state_machine.metric_aborted(),
f"timed_out_{idx}": self.state_machine.metric_timed_out(),
f"throttled_{idx}": self.state_machine.metric_throttled(),
},
alarm=AlarmMetricConfig(
name=f"{name}-errors",
threshold=alarm_threshold,
evaluation_periods=alarm_evaluation_periods,
datapoints_to_alarm=alarm_datapoints_to_alarm,
comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
),
)

def get_execution_timing_metric(
self,
name_override: Optional[str] = None,
discriminator: Optional[str] = None,
time_unit: SFN_TIME_UNITS = "minutes",
) -> GraphMetricConfig:
"""get the execution time metric for the state machine
Args:
name_override (Optional[str], optional): override for name used.
Defaults to state machine name.
discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math.
Defaults to "0".
time_unit (SFN_TIME_UNITS, optional): unit of time to use for metric.
Defaults to "minutes".
Returns:
GraphMetricConfig
"""
name = name_override or self.state_machine_name
idx = discriminator or "0"
if time_unit == "seconds":
divisor = " / 1000"
elif time_unit == "minutes":
divisor = " / 1000 / 60"
elif time_unit == "hours":
divisor = " / 1000 / 60 / 60"
else:
divisor = ""

return GraphMetricConfig(
metric="ExecutionTime",
statistic="Average",
label=f"{name} Execution Time",
dimension_map=self.dimension_map,
metric_expression=f"time_msec_{idx} {divisor}",
using_metrics={f"time_msec_{idx}": self.state_machine.metric_time()},
)
30 changes: 14 additions & 16 deletions src/aibs_informatics_cdk_lib/constructs_/cw/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
from aws_cdk import aws_cloudwatch_actions as cw_actions
from aws_cdk import aws_sns as sns

from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstruct, EnvBaseConstructMixins
from aibs_informatics_cdk_lib.constructs_.cw.types import GroupedGraphMetricConfig
from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstructMixins
from aibs_informatics_cdk_lib.constructs_.cw.types import (
GroupedGraphMetricConfig,
to_comparison_operator,
)


class DashboardMixins(EnvBaseConstructMixins):
Expand Down Expand Up @@ -105,7 +108,7 @@ def create_widgets_and_alarms(
Returns:
Tuple[List[cw.IWidget], List[cw.IAlarm]]: List of widgets and list of alarms
"""
self_stack = cdk.Stack.of(self.as_construct())
self_stack = cdk.Stack.of(self.dashboard)

graph_widgets: List[cw.IWidget] = []
metric_alarms: List[cw.IAlarm] = []
Expand Down Expand Up @@ -162,13 +165,13 @@ def create_widgets_and_alarms(
)

metric_axis = metric_config.get("axis_side", "left")
lr_graph_metrics[metric_axis].append(graph_metric)
lr_graph_metrics[metric_axis].append(graph_metric) # type: ignore # MathExpression implements IMetric

metric_alarm_config = metric_config.get("alarm")
if metric_alarm_config:
alarm_name = metric_alarm_config["name"]
alarm = graph_metric.create_alarm(
self_stack or self,
self_stack,
self.get_construct_id(alarm_name, alarm_id_discriminator),
# TODO: every time a change is made to these alarms, Cfn throws an error
# for trying to modify what is a custom resource. So instead, let
Expand All @@ -178,7 +181,9 @@ def create_widgets_and_alarms(
threshold=metric_alarm_config["threshold"],
evaluation_periods=metric_alarm_config["evaluation_periods"],
datapoints_to_alarm=metric_alarm_config["datapoints_to_alarm"],
comparison_operator=metric_alarm_config["comparison_operator"],
comparison_operator=to_comparison_operator(
metric_alarm_config["comparison_operator"]
),
)
lr_annotations[metric_axis].append(
cw.HorizontalAnnotation(
Expand All @@ -188,7 +193,7 @@ def create_widgets_and_alarms(
)
metric_alarms.append(alarm)
if alarm_topic:
alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic))
alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic)) # type: ignore # SnsAction implements IAlarmAction

graph_widgets.append(
cw.GraphWidget(
Expand Down Expand Up @@ -253,13 +258,6 @@ def __init__(
self.dashboard = self


class DashboardTools(EnvBaseConstruct, DashboardMixins):
def __init__(
self,
scope: constructs.Construct,
id: Optional[str],
env_base: EnvBase,
dashboard: cw.Dashboard,
) -> None:
super().__init__(scope, id, env_base)
class DashboardTools(DashboardMixins):
def __init__(self, dashboard: cw.Dashboard) -> None:
self.dashboard = dashboard
Loading

0 comments on commit 06ec42b

Please sign in to comment.