Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating Monitoring + CW Constructs #18

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from dataclasses import dataclass
from typing import List, Optional

from aibs_informatics_core.env import EnvBase
from attr import field
from aws_cdk import aws_cloudwatch as cw
from aws_cdk import aws_lambda as lambda_

from aibs_informatics_cdk_lib.constructs_.cw.types import (
AlarmMetricConfig,
GraphMetricConfig,
GroupedGraphMetricConfig,
)


@dataclass
class LambdaFunctionMetricConfigGenerator:
lambda_function: lambda_.IFunction
lambda_function_name: str = field(default=None)
dimension_map: dict = field(init=False)

def __post_init__(self):
if self.lambda_function_name is None:
self.lambda_function_name = self.lambda_function.function_name

self.dimension_map = {"FunctionName": self.lambda_function_name}

def get_invocations_metric(
self,
name_override: Optional[str] = None,
) -> GraphMetricConfig:
return GraphMetricConfig(
metric="Invocations",
label=f"{name_override or self.lambda_function_name} Invocations",
statistic="Sum",
dimension_map=self.dimension_map,
)

def get_errors_metric(
self,
name_override: Optional[str] = None,
discriminator: Optional[str] = None,
include_alarm: bool = False,
alarm_threshold: int = 1,
alarm_evaluation_periods: int = 3,
alarm_datapoints_to_alarm: int = 1,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
idx = discriminator or "0"
config = GraphMetricConfig(
metric="Errors",
statistic="Sum",
label=f"{name} Errors",
dimension_map=self.dimension_map,
)
if include_alarm:
config["alarm"] = AlarmMetricConfig(
name=f"{name} Errors Alarm {idx}",
threshold=alarm_threshold,
evaluation_periods=alarm_evaluation_periods,
datapoints_to_alarm=alarm_datapoints_to_alarm,
comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
)
return config

def get_availability_metric(
self,
name_override: Optional[str] = None,
discriminator: Optional[str] = None,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
idx = discriminator or "0"

return GraphMetricConfig(
metric="Availability",
statistic="Average",
dimension_map=self.dimension_map,
label=f"{name} %",
metric_expression=f"100 - 100 * errors_{idx} / MAX([errors_{idx}, invocations_{idx}])",
using_metrics={
f"errors_{idx}": self.lambda_function.metric_errors(),
f"invocations_{idx}": self.lambda_function.metric_invocations(),
},
)

def get_duration_avg_metric(
self,
name_override: Optional[str] = None,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
return GraphMetricConfig(
metric="Duration",
statistic="Average",
dimension_map=self.dimension_map,
label=f"{name} Avg",
)

def get_duration_max_metric(
self,
name_override: Optional[str] = None,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
return GraphMetricConfig(
metric="Duration",
statistic="Maximum",
dimension_map=self.dimension_map,
label=f"{name} Max",
)

def get_duration_min_metric(
self,
name_override: Optional[str] = None,
) -> GraphMetricConfig:
name = name_override or self.lambda_function_name
return GraphMetricConfig(
metric="Duration",
statistic="Minimum",
dimension_map=self.dimension_map,
label=f"{name} Min",
)
Comment on lines +110 to +120
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should start with a more minimal set of metrics (maybe just successes, failures, and durations?) and then only add if we know we really need them? Metrics like min duration don't seem the most useful?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the metrics you see in lambda monitoring dashboard. So I just replicated what is displayed there. This is the case already for ocs graphs.

I think things like min/max in 5 minute windows help give more insight into whether there are outlier runs. but lets chat at standup


def get_duration_metric_group(
self,
name_override: Optional[str] = None,
title: Optional[str] = None,
include_min_max_duration: bool = False,
) -> GroupedGraphMetricConfig:
name = name_override or self.lambda_function_name

avg = self.get_duration_avg_metric(name_override)
if include_min_max_duration:
min_ = self.get_duration_min_metric(name_override)
max_ = self.get_duration_max_metric(name_override)

return GroupedGraphMetricConfig(
title=title or f"{name} Duration",
namespace="AWS/Lambda",
metrics=[avg, min_, max_],
)

def get_success_failure_metrics(
self,
name_override: Optional[str] = None,
success_as_percent: bool = True,
) -> List[GraphMetricConfig]:
name = name_override or self.lambda_function_name

failures = self.get_errors_metric(name)
if success_as_percent:
success = self.get_availability_metric(name)
else:
success = self.get_invocations_metric(name)
success["axis_side"] = "right"
failures["axis_side"] = "left"
return [success, failures]

def get_success_failure_metric_group(
self,
name_override: Optional[str] = None,
title: Optional[str] = None,
success_as_percent: bool = True,
) -> GroupedGraphMetricConfig:
name = name_override or self.lambda_function_name

failures = self.get_errors_metric(name_override)
if success_as_percent:
success = self.get_availability_metric(name_override)
else:
success = self.get_invocations_metric(name_override)
success["axis_side"] = "right"
failures["axis_side"] = "left"

return GroupedGraphMetricConfig(
title=title or f"{name} Invocations",
namespace="AWS/Lambda",
metrics=[success, failures],
)
145 changes: 145 additions & 0 deletions src/aibs_informatics_cdk_lib/constructs_/cw/config_generators/sfn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from dataclasses import dataclass
from typing import Literal, Optional

from aibs_informatics_core.env import EnvBase
from attr import field
from aws_cdk import aws_cloudwatch as cw
from aws_cdk import aws_stepfunctions as sfn

from aibs_informatics_cdk_lib.constructs_.cw.types import AlarmMetricConfig, GraphMetricConfig

SFN_TIME_UNITS = Literal["hours", "minutes", "seconds", "milliseconds"]


@dataclass
class StateMachineMetricConfigGenerator:
state_machine: sfn.IStateMachine
state_machine_name: str
dimension_map: dict = field(init=False)

def __post_init__(self):
self.dimension_map = {"StateMachineArn": self.state_machine.state_machine_arn}

def get_execution_completion_metric(
self, name_override: Optional[str] = None
) -> GraphMetricConfig:
"""get the execution completion metric for the state machine

Args:
name_override (Optional[str], optional): override for name used.
Defaults to None.

Returns:
GraphMetricConfig
"""
return GraphMetricConfig(
metric="ExecutionsSucceeded",
label=f"{name_override or self.state_machine_name} Completed",
statistic="Sum",
dimension_map=self.dimension_map,
)

def get_execution_invocations_metric(
self, name_override: Optional[str] = None
) -> GraphMetricConfig:
"""get the execution invocations metric for the state machine

Args:
name_override (Optional[str], optional): override for name used.
Defaults to None.

Returns:
GraphMetricConfig
"""
return GraphMetricConfig(
metric="ExecutionsStarted",
label=f"{name_override or self.state_machine_name} Started",
statistic="Sum",
dimension_map=self.dimension_map,
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here, do we need to know number of invocations if we are already logging completions/failures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric gives us a sense as to what long running jobs have started. I added this to OCS because, like analysis jobs, the alignment jobs take a long time to run and I think seeing the start and completion times is helpful to see.


def get_execution_failures_metric(
self,
name_override: Optional[str] = None,
discriminator: Optional[str] = None,
alarm_threshold: int = 1,
alarm_evaluation_periods: int = 3,
alarm_datapoints_to_alarm: int = 1,
) -> GraphMetricConfig:
"""get the execution failures metric for the state machine

Args:
name_override (Optional[str], optional): override for name used.
Defaults to state machine name.
discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math.
Defaults to "0".
alarm_threshold (int, optional): Alarm threshold used. Defaults to 1.
alarm_evaluation_periods (int, optional): Alarm evaluation periods. Defaults to 3.
alarm_datapoints_to_alarm (int, optional): Alarm datapoints to alarm. Defaults to 1.

Returns:
GraphMetricConfig: _description_
"""
name = name_override or self.state_machine_name
idx = discriminator or "0"
return GraphMetricConfig(
metric="ExecutionErrors",
statistic="Sum",
label=f"{name} Errors",
dimension_map=self.dimension_map,
metric_expression=(
f"failed_{idx} + aborted_{idx} + timed_out_{idx} + throttled_{idx}"
),
using_metrics={
f"failed_{idx}": self.state_machine.metric_failed(),
f"aborted_{idx}": self.state_machine.metric_aborted(),
f"timed_out_{idx}": self.state_machine.metric_timed_out(),
f"throttled_{idx}": self.state_machine.metric_throttled(),
},
alarm=AlarmMetricConfig(
name=f"{name}-errors",
threshold=alarm_threshold,
evaluation_periods=alarm_evaluation_periods,
datapoints_to_alarm=alarm_datapoints_to_alarm,
comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
),
)

def get_execution_timing_metric(
self,
name_override: Optional[str] = None,
discriminator: Optional[str] = None,
time_unit: SFN_TIME_UNITS = "minutes",
) -> GraphMetricConfig:
"""get the execution time metric for the state machine

Args:
name_override (Optional[str], optional): override for name used.
Defaults to state machine name.
discriminator (Optional[str], optional): Required if grouping with other metric configs that specify the same metric math.
Defaults to "0".
time_unit (SFN_TIME_UNITS, optional): unit of time to use for metric.
Defaults to "minutes".

Returns:
GraphMetricConfig
"""
name = name_override or self.state_machine_name
idx = discriminator or "0"
if time_unit == "seconds":
divisor = " / 1000"
elif time_unit == "minutes":
divisor = " / 1000 / 60"
elif time_unit == "hours":
divisor = " / 1000 / 60 / 60"
else:
divisor = ""

return GraphMetricConfig(
metric="ExecutionTime",
statistic="Average",
label=f"{name} Execution Time",
dimension_map=self.dimension_map,
metric_expression=f"time_msec_{idx} {divisor}",
using_metrics={f"time_msec_{idx}": self.state_machine.metric_time()},
)
30 changes: 14 additions & 16 deletions src/aibs_informatics_cdk_lib/constructs_/cw/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
from aws_cdk import aws_cloudwatch_actions as cw_actions
from aws_cdk import aws_sns as sns

from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstruct, EnvBaseConstructMixins
from aibs_informatics_cdk_lib.constructs_.cw.types import GroupedGraphMetricConfig
from aibs_informatics_cdk_lib.constructs_.base import EnvBaseConstructMixins
from aibs_informatics_cdk_lib.constructs_.cw.types import (
GroupedGraphMetricConfig,
to_comparison_operator,
)


class DashboardMixins(EnvBaseConstructMixins):
Expand Down Expand Up @@ -105,7 +108,7 @@ def create_widgets_and_alarms(
Returns:
Tuple[List[cw.IWidget], List[cw.IAlarm]]: List of widgets and list of alarms
"""
self_stack = cdk.Stack.of(self.as_construct())
self_stack = cdk.Stack.of(self.dashboard)

graph_widgets: List[cw.IWidget] = []
metric_alarms: List[cw.IAlarm] = []
Expand Down Expand Up @@ -162,13 +165,13 @@ def create_widgets_and_alarms(
)

metric_axis = metric_config.get("axis_side", "left")
lr_graph_metrics[metric_axis].append(graph_metric)
lr_graph_metrics[metric_axis].append(graph_metric) # type: ignore # MathExpression implements IMetric

metric_alarm_config = metric_config.get("alarm")
if metric_alarm_config:
alarm_name = metric_alarm_config["name"]
alarm = graph_metric.create_alarm(
self_stack or self,
self_stack,
self.get_construct_id(alarm_name, alarm_id_discriminator),
# TODO: every time a change is made to these alarms, Cfn throws an error
# for trying to modify what is a custom resource. So instead, let
Expand All @@ -178,7 +181,9 @@ def create_widgets_and_alarms(
threshold=metric_alarm_config["threshold"],
evaluation_periods=metric_alarm_config["evaluation_periods"],
datapoints_to_alarm=metric_alarm_config["datapoints_to_alarm"],
comparison_operator=metric_alarm_config["comparison_operator"],
comparison_operator=to_comparison_operator(
metric_alarm_config["comparison_operator"]
),
)
lr_annotations[metric_axis].append(
cw.HorizontalAnnotation(
Expand All @@ -188,7 +193,7 @@ def create_widgets_and_alarms(
)
metric_alarms.append(alarm)
if alarm_topic:
alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic))
alarm.add_alarm_action(cw_actions.SnsAction(alarm_topic)) # type: ignore # SnsAction implements IAlarmAction

graph_widgets.append(
cw.GraphWidget(
Expand Down Expand Up @@ -253,13 +258,6 @@ def __init__(
self.dashboard = self


class DashboardTools(EnvBaseConstruct, DashboardMixins):
def __init__(
self,
scope: constructs.Construct,
id: Optional[str],
env_base: EnvBase,
dashboard: cw.Dashboard,
) -> None:
super().__init__(scope, id, env_base)
class DashboardTools(DashboardMixins):
def __init__(self, dashboard: cw.Dashboard) -> None:
self.dashboard = dashboard
Loading
Loading