diff --git a/changelog.d/20231013_094529_derek_cloudwatch_emf_request_lifecycle_hooks_sc_26163.rst b/changelog.d/20231013_094529_derek_cloudwatch_emf_request_lifecycle_hooks_sc_26163.rst new file mode 100644 index 0000000..0398fac --- /dev/null +++ b/changelog.d/20231013_094529_derek_cloudwatch_emf_request_lifecycle_hooks_sc_26163.rst @@ -0,0 +1,16 @@ + +Features +-------- + +- `[sc-26163] ` + Added support for ``RequestLifecycleHook`` class registration to the flask + ``ActionProviderBlueprint`` class. + Classes may be provided at Blueprint instantiation time to register before, after, + and/or teardown functionality wrapping route invocation. + +- `[sc-26163] ` + Added a CloudWatchEMFLogger ``RequestLifecycleHook`` class. + When attached to an ``ActionProviderBlueprint``, it will emit request count, latency, + and response category (2xxs, 4xxs, 5xxs) count metrics through CloudWatch EMF. Metrics + are emitted both for the aggregate AP dimension set and the individual route dimension + set. diff --git a/globus_action_provider_tools/flask/apt_blueprint.py b/globus_action_provider_tools/flask/apt_blueprint.py index 65c5ce4..b0bc9eb 100644 --- a/globus_action_provider_tools/flask/apt_blueprint.py +++ b/globus_action_provider_tools/flask/apt_blueprint.py @@ -51,6 +51,7 @@ def __init__( globus_auth_client_name: t.Optional[str] = None, additional_scopes: t.Iterable[str] = (), action_repository: t.Optional[AbstractActionRepository] = None, + request_lifecycle_hooks: t.Optional[t.List[t.Any]] = None, **kwarg, ): """Create a new ActionProviderBlueprint. All arguments not listed here are the @@ -71,6 +72,11 @@ def __init__( ``globus_auth_scope`` value of the input provider description. Only needed if more than one scope has been allocated for the Action Provider's Globus Auth client_id. + + :param request_lifecycle_hooks: A list of classes defining a before_request, + after_request, and/or teardown_request method. If any of these functions exist + they will be registered with the blueprint. RequestLifecycleHook classes are + registered and therefore executed in the order they are provided. """ super().__init__(*args, **kwarg) @@ -86,6 +92,15 @@ def __init__( self.register_error_handler(Exception, blueprint_error_handler) self.record_once(self._create_token_checker) + if request_lifecycle_hooks: + for hooks in request_lifecycle_hooks: + if hasattr(hooks, "before_request"): + self.before_request(hooks.before_request) + if hasattr(hooks, "after_request"): + self.after_request(hooks.after_request) + if hasattr(hooks, "teardown_request"): + self.teardown_request(hooks.teardown_request) + self.add_url_rule( "/", "action_introspect", @@ -152,6 +167,7 @@ def _action_introspect(self): """ Runs as an Action Provider's introspection endpoint. """ + self._register_route_type("introspect") if not g.auth_state.check_authorization( self.provider_description.visible_to, allow_public=True, @@ -166,6 +182,7 @@ def _action_introspect(self): return jsonify(self.provider_description), 200 def _action_enumerate(self): + self._register_route_type("enumerate") if not g.auth_state.check_authorization( self.provider_description.runnable_by, allow_public=True, @@ -206,6 +223,7 @@ def action_enumerate(self, func: ActionEnumerationCallback): return func def _action_run(self): + self._register_route_type("run") if not g.auth_state.check_authorization( self.provider_description.runnable_by, allow_all_authenticated_users=True, @@ -257,6 +275,7 @@ def action_run(self, func: ActionRunCallback): return func def _action_resume(self, action_id: str): + self._register_route_type("resume") # Attempt to lookup the Action based on its action_id if there was an # Action Repo defined. If an action is found, verify access to it. action = None @@ -307,6 +326,7 @@ def action_status(self, func: ActionStatusCallback): return func def _action_status(self, action_id: str): + self._register_route_type("status") """ Attempts to load an action_status via its action_id using an action_loader. If an action is successfully loaded, view access by the @@ -351,6 +371,7 @@ def action_cancel(self, func: ActionCancelCallback): return func def _action_cancel(self, action_id: str): + self._register_route_type("cancel") """ Executes a user-defined function for cancelling an Action. """ @@ -405,6 +426,7 @@ def action_release(self, func: ActionReleaseCallback): return func def _action_release(self, action_id: str): + self._register_route_type("release") """ Decorates a function to be run as an Action Provider's release endpoint. """ @@ -446,6 +468,7 @@ def action_log(self, func: ActionLogCallback): return func def _action_log(self, action_id: str): + self._register_route_type("log") # Attempt to use a user-defined function to lookup the Action based # on its action_id. If an action is found, authorize access to it action = None @@ -456,6 +479,10 @@ def _action_log(self, action_id: str): status = self.action_log_callback(action_id, g.auth_state) return jsonify(status), 200 + def _register_route_type(self, route_type: str): + if not hasattr(g, "route_type"): + g.route_type = route_type + def _load_action_by_id( self, repo: AbstractActionRepository, action_id: str ) -> ActionStatus: diff --git a/globus_action_provider_tools/flask/request_lifecycle_hooks/__init__.py b/globus_action_provider_tools/flask/request_lifecycle_hooks/__init__.py new file mode 100644 index 0000000..a670bcc --- /dev/null +++ b/globus_action_provider_tools/flask/request_lifecycle_hooks/__init__.py @@ -0,0 +1,5 @@ +from .cloudwatch_metrics import CloudWatchMetricEMFLogger + +__all__ = [ + "CloudWatchMetricEMFLogger", +] diff --git a/globus_action_provider_tools/flask/request_lifecycle_hooks/cloudwatch_metrics.py b/globus_action_provider_tools/flask/request_lifecycle_hooks/cloudwatch_metrics.py new file mode 100644 index 0000000..8286032 --- /dev/null +++ b/globus_action_provider_tools/flask/request_lifecycle_hooks/cloudwatch_metrics.py @@ -0,0 +1,245 @@ +from __future__ import annotations + +import json +import logging +import time +import typing as t +from datetime import datetime, timedelta + +from flask import Response, g + +log = logging.getLogger("action-provider-tools-cloudwatch-emf") + + +class CloudWatchMetricEMFLogger: + """ + Flask RequestLifecycleHooks to emit CloudWatch Metrics detailing action provider + usage via the CloudWatch EMF Format. + https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + + Metric Structure + ================ + + Aggregate + --------- + Namespace: {supplied_namespace} + Dimensions: + ActionProvider: {supplied_action_provider_name} + + Route-Specific + -------------- + Namespace: {supplied_namespace} + Dimensions: + ActionProvider: {supplied_action_provider_name} + Route: "run" | "resume" | "status" | ... + + Included Metrics: + * Count - The total number API requests in a given period. + * 2XXs - The number of successful responses returned in a given period. + * 4XXs - The number of client-side errors captured in a given period. + * 5XXs - The number of server-side errors captured in a given period. + * RequestLatency - The number of milliseconds between the request being received + and the response being sent. + """ + + def __init__( + self, namespace: str, action_provider_name: str, log_level: int | None = None + ): + """ + :param namespace: Custom CloudWatch Namespace target + :param action_provider_name: Action Provider Name to be used in metric dimension + sets + :param log_level: Optional log level to use when emitting metrics. If None, + metrics will be printed to stdout instead of logged. + """ + self._namespace = namespace + self._action_provider_name = action_provider_name + self._log_level = log_level + + def before_request(self): + g.request_start_perf_counter_ms = time.perf_counter() * 1000 + + def after_request(self, response: Response): + if hasattr(g, "route_type") and hasattr(g, "request_start_perf_counter_ms"): + request_latency_ms = ( + time.perf_counter() * 1000 - g.request_start_perf_counter_ms + ) + self.emit_route_metrics( + route_name=g.route_type, + request_latency_ms=request_latency_ms, + response_status=response.status_code, + ) + return response + + def teardown_request(self, error: BaseException | None): + # If a request errors mid-handling, after_request handlers will not be called, + # so we need to emit metrics for errors separately here + if error: + if hasattr(g, "route_type") and hasattr(g, "request_start_time"): + status_code = 500 + if hasattr(error, "code"): + status_code = error.code + request_latency_ms = ( + time.perf_counter() * 1000 - g.request_start_perf_counter_ms + ) + self.emit_route_metrics( + route_name=g.route_type, + request_latency_ms=request_latency_ms, + response_status=status_code, + ) + raise error + + def emit_route_metrics( + self, + route_name: str, + request_latency_ms: float, + response_status: int, + ): + emf_log = _serialize_to_emf( + namespace=self._namespace, + dimension_sets=[ + {"ActionProvider": self._action_provider_name}, + {"ActionProvider": self._action_provider_name, "Route": route_name}, + ], + metrics=[ + ("RequestCount", 1, "Count"), + ("2XXs", 1 if 200 <= response_status < 300 else 0, "Count"), + ("4XXs", 1 if 400 <= response_status < 500 else 0, "Count"), + ("5XXs", 1 if 500 <= response_status < 600 else 0, "Count"), + ("RequestLatency", request_latency_ms, "Milliseconds"), + ], + ) + emf_log_str = json.dumps(emf_log) + + if not self._log_level: + print(emf_log_str) + else: + log.log(self._log_level, emf_log_str) + + +# fmt: off +# https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html +CloudWatchUnit = t.Literal[ + "Seconds", "Microseconds", "Milliseconds", "Bytes", "Kilobytes", "Megabytes", + "Gigabytes", "Terabytes", "Bits", "Kilobits", "Megabits", "Gigabits", "Terabits", + "Percent", "Count", "Bytes/Second", "Kilobytes/Second", "Megabytes/Second", + "Gigabytes/Second", "Terabytes/Second", "Bits/Second", "Kilobits/Second", + "Megabits/Second", "Gigabits/Second", "Terabits/Second", "Count/Second", "None" +] +# fmt: on + + +def _serialize_to_emf( + namespace: str, + dimension_sets: list[dict[str, str]], + metrics: list[tuple[str, str | int | float, CloudWatchUnit | None]], + timestamp: datetime | None = None, +) -> dict[str, t.Any]: + """ + Serializes a list of metrics into CloudWatch Embedded Metric Format + https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Specification.html + + This results in an object like + ```json + { + "_aws": { + "Timestamp": 1680634571444, + "CloudWatchMetrics": [{ + "Namespace": "MyCoolNamespace", + "Dimensions": [["Foo"], ["Foo", "Bar"]], + "Metrics": [{ "Name": "MyCoolMetric", "Unit": "Milliseconds" }] + }] + }, + "Foo": "a", + "Bar": "b", + "MyCoolMetric": 37, + } + ``` + Note how there are three additional top-level keys besides "_aws". + This is because Dimension Values & Metric Values must be referenced not passed + explicitly + + :namespace str: Namespace + :metric_name str: Metric Name + :dimension_sets list[dict[str, str]]: A collection of Dimension Sets (each metric + will be emitted with each dimension set) + :metrics list[tuple[str, str | int | float, str | None]]: Metric Tuple in the format + (metric_name, value, Optional[unit]) + :timestamp datetime | None: Timestamp to use for the metric. If None, the current + time will be used. + :returns: An emf formatted dict + """ + timestamp = timestamp or datetime.now() + epoch_ms = int(timestamp.timestamp() * 1000) + _verify_no_emf_root_collisions( + {metric_name for metric_name, _, _ in metrics}, dimension_sets + ) + + emf_obj: t.Dict[str, t.Any] = {} + + emf_metrics = [] + for metric_name, value, unit in metrics: + emf_obj[metric_name] = value + emf_metric = {"Name": metric_name} + if unit is not None: + emf_metric["Unit"] = unit + emf_metrics.append(emf_metric) + + emf_dimension_sets = [] + for dimension_map in dimension_sets: + for dimension_name, dimension_value in dimension_map.items(): + emf_obj[dimension_name] = dimension_value + emf_dimension_sets.append(list(dimension_map.keys())) + + emf_obj["_aws"] = { + "Timestamp": epoch_ms, + "CloudWatchMetrics": [ + { + "Namespace": namespace, + "Dimensions": emf_dimension_sets, + "Metrics": emf_metrics, + } + ], + } + + return emf_obj + + +def _verify_no_emf_root_collisions( + metric_names: set[str], dimension_sets: list[dict[str, str]] +): + """ + Verify that there are no disallowed collisions between the root keys of the emf + object + + :raises: RuntimeError if names/values collide in ways that preclude them from + being emitted via EMF + """ + # Verify that no dimension names match any metric names + dimension_names = { + dimension_name + for dimension_map in dimension_sets + for dimension_name in dimension_map.keys() + } + + namespace_collisions = metric_names.intersection(dimension_names) + if namespace_collisions: + raise RuntimeError( + f"Cannot overlap dimension names and metric names ({namespace_collisions})" + ) + + # Verify that no dimension names in different dimension sets conflict + dimension_values: t.Dict[str, t.Set[str]] = {} + for dimension_map in dimension_sets: + for dimension_name, dimension_value in dimension_map.items(): + dimension_values.setdefault(dimension_name, set()).add(dimension_value) + dimension_collisions = { + dimension_name + for dimension_name, dimension_value in dimension_values.items() + if len(dimension_value) > 1 + } + if dimension_collisions: + raise RuntimeError( + f"Dimension sets with the same name must have the same value " + f"({dimension_collisions})" + ) diff --git a/tests/test_flask_helpers/app_utils.py b/tests/test_flask_helpers/app_utils.py index 7a42034..29505d4 100644 --- a/tests/test_flask_helpers/app_utils.py +++ b/tests/test_flask_helpers/app_utils.py @@ -67,7 +67,7 @@ class Config: ) -def test_action_enumeration( +def mock_action_enumeration_func( auth: AuthState, params: Dict[str, Set] ) -> List[ActionStatus]: statuses = params["statuses"] @@ -95,7 +95,7 @@ def test_action_enumeration( return matches -def test_action_run( +def mock_action_run_func( action_request: ActionRequest, auth: AuthState ) -> ActionCallbackReturn: action_status = ActionStatus( @@ -114,7 +114,7 @@ def test_action_run( return action_status -def test_action_status(action_id: str, auth: AuthState) -> ActionCallbackReturn: +def mock_action_status_func(action_id: str, auth: AuthState) -> ActionCallbackReturn: action_status = simple_backend.get(action_id) if action_status is None: raise ActionNotFound(f"No action with {action_id}") @@ -122,7 +122,7 @@ def test_action_status(action_id: str, auth: AuthState) -> ActionCallbackReturn: return action_status -def test_action_cancel(action_id: str, auth: AuthState) -> ActionCallbackReturn: +def mock_action_cancel_func(action_id: str, auth: AuthState) -> ActionCallbackReturn: action_status = simple_backend.get(action_id) if action_status is None: raise ActionNotFound(f"No action with {action_id}") @@ -138,7 +138,7 @@ def test_action_cancel(action_id: str, auth: AuthState) -> ActionCallbackReturn: return action_status -def test_action_release(action_id: str, auth: AuthState) -> ActionCallbackReturn: +def mock_action_release_func(action_id: str, auth: AuthState) -> ActionCallbackReturn: action_status = simple_backend.get(action_id) if action_status is None: raise ActionNotFound(f"No action with {action_id}") @@ -152,7 +152,7 @@ def test_action_release(action_id: str, auth: AuthState) -> ActionCallbackReturn return action_status -def test_action_log(action_id: str, auth: AuthState) -> ActionLogReturn: +def mock_action_log_func(action_id: str, auth: AuthState) -> ActionLogReturn: pagination = request.args.get("pagination") filters = request.args.get("filters") return ActionLogReturn( diff --git a/tests/test_flask_helpers/conftest.py b/tests/test_flask_helpers/conftest.py index 419ebcd..74fd5ef 100644 --- a/tests/test_flask_helpers/conftest.py +++ b/tests/test_flask_helpers/conftest.py @@ -20,12 +20,12 @@ from .app_utils import ( ap_description, - test_action_cancel, - test_action_enumeration, - test_action_log, - test_action_release, - test_action_run, - test_action_status, + mock_action_cancel_func, + mock_action_enumeration_func, + mock_action_log_func, + mock_action_release_func, + mock_action_run_func, + mock_action_status_func, ) @@ -44,12 +44,12 @@ def aptb_app(apt_blueprint_noauth, auth_state): url_prefix="/aptb", provider_description=ap_description, ) - aptb.action_run(test_action_run) - aptb.action_status(test_action_status) - aptb.action_cancel(test_action_cancel) - aptb.action_release(test_action_release) - aptb.action_log(test_action_log) - aptb.action_enumerate(test_action_enumeration) + aptb.action_run(mock_action_run_func) + aptb.action_status(mock_action_status_func) + aptb.action_cancel(mock_action_cancel_func) + aptb.action_release(mock_action_release_func) + aptb.action_log(mock_action_log_func) + aptb.action_enumerate(mock_action_enumeration_func) apt_blueprint_noauth(aptb) app.register_blueprint(aptb) @@ -71,12 +71,12 @@ def add_routes_app(flask_helpers_noauth, auth_state): client_secret="bogus", client_name=None, provider_description=ap_description, - action_run_callback=test_action_run, - action_status_callback=test_action_status, - action_cancel_callback=test_action_cancel, - action_release_callback=test_action_release, - action_log_callback=test_action_log, - action_enumeration_callback=test_action_enumeration, + action_run_callback=mock_action_run_func, + action_status_callback=mock_action_status_func, + action_cancel_callback=mock_action_cancel_func, + action_release_callback=mock_action_release_func, + action_log_callback=mock_action_log_func, + action_enumeration_callback=mock_action_enumeration_func, additional_scopes=[ "https://auth.globus.org/scopes/d3a66776-759f-4316-ba55-21725fe37323/secondary_scope" ], diff --git a/tests/test_flask_helpers/request_lifecycle_hooks/__init__.py b/tests/test_flask_helpers/request_lifecycle_hooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_flask_helpers/request_lifecycle_hooks/test_cloudwatch_metrics.py b/tests/test_flask_helpers/request_lifecycle_hooks/test_cloudwatch_metrics.py new file mode 100644 index 0000000..0047651 --- /dev/null +++ b/tests/test_flask_helpers/request_lifecycle_hooks/test_cloudwatch_metrics.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import json + +import pytest +from flask import Flask + +from globus_action_provider_tools import ActionRequest, AuthState +from globus_action_provider_tools.flask import ActionProviderBlueprint +from globus_action_provider_tools.flask.exceptions import ( + ActionNotFound, + ActionProviderError, +) +from globus_action_provider_tools.flask.helpers import assign_json_provider +from globus_action_provider_tools.flask.request_lifecycle_hooks import ( + CloudWatchMetricEMFLogger, +) +from tests.test_flask_helpers.app_utils import ap_description, mock_action_run_func + + +def erroring_4xx_run_route(action_request: ActionRequest, auth: AuthState): + raise ActionNotFound("Not Found") + + +def erroring_5xx_run_route(action_request: ActionRequest, auth: AuthState): + raise ActionProviderError("Internal Server Error") + + +@pytest.mark.parametrize( + "run_view_func,expected_2xxs,expected_4xxs,expected_5xxs", + [ + (mock_action_run_func, 1, 0, 0), + (erroring_4xx_run_route, 0, 1, 0), + (erroring_5xx_run_route, 0, 0, 1), + ], +) +def test_routes_emit_emf_logs( + apt_blueprint_noauth, + auth_state, + capsys, + run_view_func, + expected_2xxs, + expected_4xxs, + expected_5xxs, +): + app = Flask(__name__) + assign_json_provider(app) + aptb = ActionProviderBlueprint( + name="TrackedActionProvider", + import_name=__name__, + url_prefix="/tracked", + provider_description=ap_description, + request_lifecycle_hooks=[ + CloudWatchMetricEMFLogger( + namespace="ActionProviders", + action_provider_name="TrackedActionProvider", + ) + ], + ) + aptb.action_run(run_view_func) + + apt_blueprint_noauth(aptb) + app.register_blueprint(aptb) + + req = {"request_id": "0", "body": {"echo_string": "This is a test"}} + app.test_client().post("/tracked/run", json=req) + out, _ = capsys.readouterr() + emf_log = json.loads(out) + + assert "_aws" in emf_log + assert emf_log.get("RequestCount") == 1 + assert emf_log.get("2XXs") == expected_2xxs + assert emf_log.get("4XXs") == expected_4xxs + assert emf_log.get("5XXs") == expected_5xxs + assert emf_log.get("RequestLatency") > 0 + assert emf_log.get("ActionProvider") == "TrackedActionProvider" + assert emf_log.get("Route") == "run"