-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
554 additions
and
221 deletions.
There are no files selected for viewing
Empty file.
177 changes: 177 additions & 0 deletions
177
src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/lambda_.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
from dataclasses import dataclass | ||
from typing import List, Optional | ||
|
||
from aibs_informatics_core.env import EnvBase | ||
from attr import field | ||
from aws_cdk import aws_cloudwatch as cw | ||
from aws_cdk import aws_lambda as lambda_ | ||
|
||
from aibs_informatics_cdk_lib.constructs_.cw.types import ( | ||
AlarmMetricConfig, | ||
GraphMetricConfig, | ||
GroupedGraphMetricConfig, | ||
) | ||
|
||
|
||
@dataclass | ||
class LambdaFunctionMetricConfigGenerator: | ||
lambda_function: lambda_.IFunction | ||
lambda_function_name: str = field(default=None) | ||
dimension_map: dict = field(init=False) | ||
|
||
def __post_init__(self): | ||
if self.lambda_function_name is None: | ||
self.lambda_function_name = self.lambda_function.function_name | ||
|
||
self.dimension_map = {"FunctionName": self.lambda_function_name} | ||
|
||
def get_invocations_metric( | ||
self, | ||
name_override: Optional[str] = None, | ||
) -> GraphMetricConfig: | ||
return GraphMetricConfig( | ||
metric="Invocations", | ||
label=f"{name_override or self.lambda_function_name} Invocations", | ||
statistic="Sum", | ||
dimension_map=self.dimension_map, | ||
) | ||
|
||
def get_errors_metric( | ||
self, | ||
name_override: Optional[str] = None, | ||
discriminator: Optional[str] = None, | ||
include_alarm: bool = False, | ||
alarm_threshold: int = 1, | ||
alarm_evaluation_periods: int = 3, | ||
alarm_datapoints_to_alarm: int = 1, | ||
) -> GraphMetricConfig: | ||
name = name_override or self.lambda_function_name | ||
idx = discriminator or "0" | ||
config = GraphMetricConfig( | ||
metric="Errors", | ||
statistic="Sum", | ||
label=f"{name} Errors", | ||
dimension_map=self.dimension_map, | ||
) | ||
if include_alarm: | ||
config["alarm"] = AlarmMetricConfig( | ||
name=f"{name} Errors Alarm {idx}", | ||
threshold=alarm_threshold, | ||
evaluation_periods=alarm_evaluation_periods, | ||
datapoints_to_alarm=alarm_datapoints_to_alarm, | ||
comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD, | ||
) | ||
return config | ||
|
||
def get_availability_metric( | ||
self, | ||
name_override: Optional[str] = None, | ||
discriminator: Optional[str] = None, | ||
) -> GraphMetricConfig: | ||
name = name_override or self.lambda_function_name | ||
idx = discriminator or "0" | ||
|
||
return GraphMetricConfig( | ||
metric="Availability", | ||
statistic="Average", | ||
dimension_map=self.dimension_map, | ||
label=f"{name} %", | ||
metric_expression=f"100 - 100 * errors_{idx} / MAX([errors_{idx}, invocations_{idx}])", | ||
using_metrics={ | ||
f"errors_{idx}": self.lambda_function.metric_errors(), | ||
f"invocations_{idx}": self.lambda_function.metric_invocations(), | ||
}, | ||
) | ||
|
||
def get_duration_avg_metric( | ||
self, | ||
name_override: Optional[str] = None, | ||
) -> GraphMetricConfig: | ||
name = name_override or self.lambda_function_name | ||
return GraphMetricConfig( | ||
metric="Duration", | ||
statistic="Average", | ||
dimension_map=self.dimension_map, | ||
label=f"{name} Avg", | ||
) | ||
|
||
def get_duration_max_metric( | ||
self, | ||
name_override: Optional[str] = None, | ||
) -> GraphMetricConfig: | ||
name = name_override or self.lambda_function_name | ||
return GraphMetricConfig( | ||
metric="Duration", | ||
statistic="Maximum", | ||
dimension_map=self.dimension_map, | ||
label=f"{name} Max", | ||
) | ||
|
||
def get_duration_min_metric( | ||
self, | ||
name_override: Optional[str] = None, | ||
) -> GraphMetricConfig: | ||
name = name_override or self.lambda_function_name | ||
return GraphMetricConfig( | ||
metric="Duration", | ||
statistic="Minimum", | ||
dimension_map=self.dimension_map, | ||
label=f"{name} Min", | ||
) | ||
|
||
def get_duration_metric_group( | ||
self, | ||
name_override: Optional[str] = None, | ||
title: Optional[str] = None, | ||
include_min_max_duration: bool = False, | ||
) -> GroupedGraphMetricConfig: | ||
name = name_override or self.lambda_function_name | ||
|
||
avg = self.get_duration_avg_metric(name_override) | ||
if include_min_max_duration: | ||
min_ = self.get_duration_min_metric(name_override) | ||
max_ = self.get_duration_max_metric(name_override) | ||
|
||
return GroupedGraphMetricConfig( | ||
title=title or f"{name} Duration", | ||
namespace="AWS/Lambda", | ||
metrics=[avg, min_, max_], | ||
) | ||
|
||
def get_success_failure_metrics( | ||
self, | ||
name_override: Optional[str] = None, | ||
success_as_percent: bool = True, | ||
) -> List[GraphMetricConfig]: | ||
name = name_override or self.lambda_function_name | ||
|
||
failures = self.get_errors_metric(name) | ||
if success_as_percent: | ||
success = self.get_availability_metric(name) | ||
else: | ||
success = self.get_invocations_metric(name) | ||
success["axis_side"] = "right" | ||
failures["axis_side"] = "left" | ||
return [success, failures] | ||
|
||
def get_success_failure_metric_group( | ||
self, | ||
name_override: Optional[str] = None, | ||
title: Optional[str] = None, | ||
success_as_percent: bool = True, | ||
) -> GroupedGraphMetricConfig: | ||
name = name_override or self.lambda_function_name | ||
|
||
failures = self.get_errors_metric(name_override) | ||
if success_as_percent: | ||
success = self.get_availability_metric(name_override) | ||
else: | ||
success = self.get_invocations_metric(name_override) | ||
success["axis_side"] = "right" | ||
failures["axis_side"] = "left" | ||
|
||
return GroupedGraphMetricConfig( | ||
title=title or f"{name} Invocations", | ||
namespace="AWS/Lambda", | ||
metrics=[success, failures], | ||
) |
145 changes: 145 additions & 0 deletions
145
src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
from dataclasses import dataclass | ||
from typing import Literal, Optional | ||
|
||
from aibs_informatics_core.env import EnvBase | ||
from attr import field | ||
from aws_cdk import aws_cloudwatch as cw | ||
from aws_cdk import aws_stepfunctions as sfn | ||
|
||
from aibs_informatics_cdk_lib.constructs_.cw.types import AlarmMetricConfig, GraphMetricConfig | ||
|
||
SFN_TIME_UNITS = Literal["hours", "minutes", "seconds", "milliseconds"] | ||
|
||
|
||
@dataclass | ||
class StateMachineMetricConfigGenerator: | ||
state_machine: sfn.IStateMachine | ||
state_machine_name: str | ||
dimension_map: dict = field(init=False) | ||
|
||
def __post_init__(self): | ||
self.dimension_map = {"StateMachineArn": self.state_machine.state_machine_arn} | ||
|
||
def get_execution_completion_metric( | ||
self, name_override: Optional[str] = None | ||
) -> GraphMetricConfig: | ||
"""get the execution completion metric for the state machine | ||
Args: | ||
name_override (Optional[str], optional): override for name used. | ||
Defaults to None. | ||
Returns: | ||
GraphMetricConfig | ||
""" | ||
return GraphMetricConfig( | ||
metric="ExecutionsSucceeded", | ||
label=f"{name_override or self.state_machine_name} Completed", | ||
statistic="Sum", | ||
dimension_map=self.dimension_map, | ||
) | ||
|
||
def get_execution_invocations_metric( | ||
self, name_override: Optional[str] = None | ||
) -> GraphMetricConfig: | ||
"""get the execution invocations metric for the state machine | ||
Args: | ||
name_override (Optional[str], optional): override for name used. | ||
Defaults to None. | ||
Returns: | ||
GraphMetricConfig | ||
""" | ||
return GraphMetricConfig( | ||
metric="ExecutionsStarted", | ||
label=f"{name_override or self.state_machine_name} Started", | ||
statistic="Sum", | ||
dimension_map=self.dimension_map, | ||
) | ||
|
||
def get_execution_failures_metric( | ||
self, | ||
name_override: Optional[str] = None, | ||
discriminator: Optional[str] = None, | ||
alarm_threshold: int = 1, | ||
alarm_evaluation_periods: int = 3, | ||
alarm_datapoints_to_alarm: int = 1, | ||
) -> GraphMetricConfig: | ||
"""get the execution failures metric for the state machine | ||
Args: | ||
name_override (Optional[str], optional): override for name used. | ||
Defaults to state machine name. | ||
discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math. | ||
Defaults to "0". | ||
alarm_threshold (int, optional): Alarm threshold used. Defaults to 1. | ||
alarm_evaluation_periods (int, optional): Alarm evaluation periods. Defaults to 3. | ||
alarm_datapoints_to_alarm (int, optional): Alarm datapoints to alarm. Defaults to 1. | ||
Returns: | ||
GraphMetricConfig: _description_ | ||
""" | ||
name = name_override or self.state_machine_name | ||
idx = discriminator or "0" | ||
return GraphMetricConfig( | ||
metric="ExecutionErrors", | ||
statistic="Sum", | ||
label=f"{name} Errors", | ||
dimension_map=self.dimension_map, | ||
metric_expression=( | ||
f"failed_{idx} + aborted_{idx} + timed_out_{idx} + throttled_{idx}" | ||
), | ||
using_metrics={ | ||
f"failed_{idx}": self.state_machine.metric_failed(), | ||
f"aborted_{idx}": self.state_machine.metric_aborted(), | ||
f"timed_out_{idx}": self.state_machine.metric_timed_out(), | ||
f"throttled_{idx}": self.state_machine.metric_throttled(), | ||
}, | ||
alarm=AlarmMetricConfig( | ||
name=f"{name}-errors", | ||
threshold=alarm_threshold, | ||
evaluation_periods=alarm_evaluation_periods, | ||
datapoints_to_alarm=alarm_datapoints_to_alarm, | ||
comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD, | ||
), | ||
) | ||
|
||
def get_execution_timing_metric( | ||
self, | ||
name_override: Optional[str] = None, | ||
discriminator: Optional[str] = None, | ||
time_unit: SFN_TIME_UNITS = "minutes", | ||
) -> GraphMetricConfig: | ||
"""get the execution time metric for the state machine | ||
Args: | ||
name_override (Optional[str], optional): override for name used. | ||
Defaults to state machine name. | ||
discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math. | ||
Defaults to "0". | ||
time_unit (SFN_TIME_UNITS, optional): unit of time to use for metric. | ||
Defaults to "minutes". | ||
Returns: | ||
GraphMetricConfig | ||
""" | ||
name = name_override or self.state_machine_name | ||
idx = discriminator or "0" | ||
if time_unit == "seconds": | ||
divisor = " / 1000" | ||
elif time_unit == "minutes": | ||
divisor = " / 1000 / 60" | ||
elif time_unit == "hours": | ||
divisor = " / 1000 / 60 / 60" | ||
else: | ||
divisor = "" | ||
|
||
return GraphMetricConfig( | ||
metric="ExecutionTime", | ||
statistic="Average", | ||
label=f"{name} Execution Time", | ||
dimension_map=self.dimension_map, | ||
metric_expression=f"time_msec_{idx} {divisor}", | ||
using_metrics={f"time_msec_{idx}": self.state_machine.metric_time()}, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.