From 7ca9ffd6d103f4f8feb1b503b63f20f3f818c50b Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 18 Jun 2024 11:56:00 +0200 Subject: [PATCH 1/4] docker,prometheus: added prometheus service and config [EC-299] --- container/compose.yml | 7 +++++++ container/prometheus.yml | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 container/prometheus.yml diff --git a/container/compose.yml b/container/compose.yml index f1328afef..6e17cee5b 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -102,3 +102,10 @@ services: KARAPACE_REGISTRY_PORT: 8081 KARAPACE_ADMIN_METADATA_MAX_AGE: 0 KARAPACE_LOG_LEVEL: WARNING + + prometheus: + image: prom/prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + ports: + - 9090:9090 diff --git a/container/prometheus.yml b/container/prometheus.yml new file mode 100644 index 000000000..20731bb4b --- /dev/null +++ b/container/prometheus.yml @@ -0,0 +1,18 @@ +global: + scrape_interval: 10s # How frequently to scrape targets by default. + scrape_timeout: 5s # How long until a scrape request times out. + evaluation_interval: 60s # How frequently to evaluate rules. + +# A scrape configuration +scrape_configs: + - job_name: karapace-registry + metrics_path: /metrics + static_configs: + - targets: + - karapace-registry:8081 + + - job_name: karapace-rest + metrics_path: /metrics + static_configs: + - targets: + - karapace-rest:8082 From 089e759d5442105a24f6409540fc6ee2c3a4fe85 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 18 Jun 2024 11:56:55 +0200 Subject: [PATCH 2/4] requirements: added `prometheus_client` dependency [EC-299] --- requirements/requirements-dev.txt | 2 ++ requirements/requirements.in | 1 + requirements/requirements.txt | 2 ++ 3 files changed, 5 insertions(+) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 45d07a105..0ec33d6ed 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -159,6 +159,8 @@ pkgutil-resolve-name==1.3.10 # jsonschema pluggy==1.5.0 # via pytest +prometheus-client==0.20.0 + # via -r requirements.txt protobuf==3.20.3 # via -r requirements.txt psutil==5.9.8 diff --git a/requirements/requirements.in b/requirements/requirements.in index 39b039494..2bd3ccfe7 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -19,6 +19,7 @@ ujson<6 watchfiles<1 xxhash~=3.3 zstandard +prometheus-client==0.20.0 # Patched dependencies # diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 6bf9bdd13..8192bda35 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -67,6 +67,8 @@ packaging==24.0 # via aiokafka pkgutil-resolve-name==1.3.10 # via jsonschema +prometheus-client==0.20.0 + # via -r requirements.in protobuf==3.20.3 # via -r requirements.in pygments==2.18.0 From 28c37db59afb4ed504371056ca897b5f890d325d Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 18 Jun 2024 13:25:55 +0200 Subject: [PATCH 3/4] prometheus: added prometheus instrumentation - we add the `PrometheusInstrumentation` class to house available metrics - we use a middleware to automatically instrument the HTTP requests mertrics, i.e. total, in progress, latency, etc. - we add unit tests [EC-299] --- karapace/instrumentation/__init__.py | 0 karapace/instrumentation/prometheus.py | 99 +++++++++++++++ karapace/karapace_all.py | 2 + tests/unit/instrumentation/test_prometheus.py | 119 ++++++++++++++++++ 4 files changed, 220 insertions(+) create mode 100644 karapace/instrumentation/__init__.py create mode 100644 karapace/instrumentation/prometheus.py create mode 100644 tests/unit/instrumentation/test_prometheus.py diff --git a/karapace/instrumentation/__init__.py b/karapace/instrumentation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/karapace/instrumentation/prometheus.py b/karapace/instrumentation/prometheus.py new file mode 100644 index 000000000..d4b168d35 --- /dev/null +++ b/karapace/instrumentation/prometheus.py @@ -0,0 +1,99 @@ +""" +karapace - prometheus instrumentation + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from __future__ import annotations + +from aiohttp.web import middleware, Request, RequestHandler, Response +from karapace.rapu import RestApp +from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, Histogram +from typing import ClassVar + +import logging +import time + +LOG = logging.getLogger(__name__) + + +class PrometheusInstrumentation: + METRICS_ENDPOINT_PATH: ClassVar[str] = "/metrics" + START_TIME_REQUEST_KEY: ClassVar[str] = "start_time" + + registry: ClassVar[CollectorRegistry] = CollectorRegistry() + + karapace_http_requests_total: ClassVar[Counter] = Counter( + registry=registry, + name="karapace_http_requests_total", + documentation="Total Request Count for HTTP/TCP Protocol", + labelnames=("method", "path", "status"), + ) + + karapace_http_requests_latency_seconds: ClassVar[Histogram] = Histogram( + registry=registry, + name="karapace_http_requests_latency_seconds", + documentation="Request Duration for HTTP/TCP Protocol", + labelnames=("method", "path"), + ) + + karapace_http_requests_in_progress: ClassVar[Gauge] = Gauge( + registry=registry, + name="karapace_http_requests_in_progress", + documentation="Request Duration for HTTP/TCP Protocol", + labelnames=("method", "path"), + ) + + @classmethod + def setup_metrics(cls: PrometheusInstrumentation, *, app: RestApp) -> None: + LOG.info("Setting up prometheus metrics") + app.route( + cls.METRICS_ENDPOINT_PATH, + callback=cls.serve_metrics, + method="GET", + schema_request=False, + with_request=False, + json_body=False, + auth=None, + ) + app.app.middlewares.insert(0, cls.http_request_metrics_middleware) + app.app[cls.karapace_http_requests_total] = cls.karapace_http_requests_total + app.app[cls.karapace_http_requests_latency_seconds] = cls.karapace_http_requests_latency_seconds + app.app[cls.karapace_http_requests_in_progress] = cls.karapace_http_requests_in_progress + + @classmethod + async def serve_metrics(cls: PrometheusInstrumentation) -> bytes: + return generate_latest(cls.registry) + + @classmethod + @middleware + async def http_request_metrics_middleware( + cls: PrometheusInstrumentation, + request: Request, + handler: RequestHandler, + ) -> None: + request[cls.START_TIME_REQUEST_KEY] = time.time() + + # Extract request labels + path = request.path + method = request.method + + # Increment requests in progress before handler + request.app[cls.karapace_http_requests_in_progress].labels(method, path).inc() + + # Call request handler + response: Response = await handler(request) + + # Instrument request latency + request.app[cls.karapace_http_requests_latency_seconds].labels(method, path).observe( + time.time() - request[cls.START_TIME_REQUEST_KEY] + ) + + # Instrument total requests + request.app[cls.karapace_http_requests_total].labels(method, path, response.status).inc() + + # Decrement requests in progress after handler + request.app[cls.karapace_http_requests_in_progress].labels(method, path).dec() + + return response diff --git a/karapace/karapace_all.py b/karapace/karapace_all.py index 353c5021a..620928428 100644 --- a/karapace/karapace_all.py +++ b/karapace/karapace_all.py @@ -6,6 +6,7 @@ from contextlib import closing from karapace import version as karapace_version from karapace.config import read_config +from karapace.instrumentation.prometheus import PrometheusInstrumentation from karapace.kafka_rest_apis import KafkaRest from karapace.rapu import RestApp from karapace.schema_registry_apis import KarapaceSchemaRegistryController @@ -63,6 +64,7 @@ def main() -> int: try: # `close` will be called by the callback `close_by_app` set by `KarapaceBase` + PrometheusInstrumentation.setup_metrics(app=app) app.run() except Exception as ex: # pylint: disable-broad-except app.stats.unexpected_exception(ex=ex, where="karapace") diff --git a/tests/unit/instrumentation/test_prometheus.py b/tests/unit/instrumentation/test_prometheus.py new file mode 100644 index 000000000..7fcbf4f19 --- /dev/null +++ b/tests/unit/instrumentation/test_prometheus.py @@ -0,0 +1,119 @@ +""" +karapace - prometheus instrumentation tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from _pytest.logging import LogCaptureFixture +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from karapace.rapu import RestApp +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from unittest.mock import AsyncMock, call, MagicMock, patch + +import aiohttp.web +import logging +import pytest + + +class TestPrometheusInstrumentation: + @pytest.fixture + def prometheus(self) -> PrometheusInstrumentation: + return PrometheusInstrumentation() + + def test_constants(self, prometheus: PrometheusInstrumentation) -> None: + assert prometheus.START_TIME_REQUEST_KEY == "start_time" + assert isinstance(prometheus.registry, CollectorRegistry) + + def test_metric_types(self, prometheus: PrometheusInstrumentation) -> None: + assert isinstance(prometheus.karapace_http_requests_total, Counter) + assert isinstance(prometheus.karapace_http_requests_latency_seconds, Histogram) + assert isinstance(prometheus.karapace_http_requests_in_progress, Gauge) + + def test_metric_values(self, prometheus: PrometheusInstrumentation) -> None: + # `_total` suffix is stripped off the metric name for `Counters`, but needed for clarity. + assert repr(prometheus.karapace_http_requests_total) == "prometheus_client.metrics.Counter(karapace_http_requests)" + assert ( + repr(prometheus.karapace_http_requests_latency_seconds) + == "prometheus_client.metrics.Histogram(karapace_http_requests_latency_seconds)" + ) + assert ( + repr(prometheus.karapace_http_requests_in_progress) + == "prometheus_client.metrics.Gauge(karapace_http_requests_in_progress)" + ) + + def test_setup_metrics(self, caplog: LogCaptureFixture, prometheus: PrometheusInstrumentation) -> None: + app = AsyncMock(spec=RestApp, app=AsyncMock(spec=aiohttp.web.Application)) + + with caplog.at_level(logging.INFO, logger="karapace.instrumentation.prometheus"): + prometheus.setup_metrics(app=app) + + app.route.assert_called_once_with( + prometheus.METRICS_ENDPOINT_PATH, + callback=prometheus.serve_metrics, + method="GET", + schema_request=False, + with_request=False, + json_body=False, + auth=None, + ) + app.app.middlewares.insert.assert_called_once_with(0, prometheus.http_request_metrics_middleware) + app.app.__setitem__.assert_has_calls( + [ + call(prometheus.karapace_http_requests_total, prometheus.karapace_http_requests_total), + call( + prometheus.karapace_http_requests_latency_seconds, + prometheus.karapace_http_requests_latency_seconds, + ), + call(prometheus.karapace_http_requests_in_progress, prometheus.karapace_http_requests_in_progress), + ] + ) + for log in caplog.records: + assert log.name == "karapace.instrumentation.prometheus" + assert log.levelname == "INFO" + assert log.message == "Setting up prometheus metrics" + + @patch("karapace.instrumentation.prometheus.generate_latest") + async def test_serve_metrics(self, generate_latest: MagicMock, prometheus: PrometheusInstrumentation) -> None: + await prometheus.serve_metrics() + generate_latest.assert_called_once_with(prometheus.registry) + + @patch("karapace.instrumentation.prometheus.time") + async def test_http_request_metrics_middleware( + self, + mock_time: MagicMock, + prometheus: PrometheusInstrumentation, + ) -> None: + mock_time.time.return_value = 10 + request = AsyncMock( + spec=aiohttp.web.Request, app=AsyncMock(spec=aiohttp.web.Application), path="/path", method="GET" + ) + handler = AsyncMock(spec=aiohttp.web.RequestHandler, return_value=MagicMock(status=200)) + + await prometheus.http_request_metrics_middleware(request=request, handler=handler) + + request.__setitem__.assert_called_once_with(prometheus.START_TIME_REQUEST_KEY, 10) + request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls( + [ + call("GET", "/path"), + call().inc(), + ] + ) + request.app[prometheus.karapace_http_requests_latency_seconds].labels.assert_has_calls( + [ + call("GET", "/path"), + call().observe(request.__getitem__.return_value.__rsub__.return_value), + ] + ) + request.app[prometheus.karapace_http_requests_total].labels.assert_has_calls( + [ + call("GET", "/path", 200), + call().inc(), + ] + ) + request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls( + [ + call("GET", "/path"), + call().dec(), + ] + ) From addaaa21f9dbc4e61f2aac635099a1c4450681d1 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 18 Jun 2024 14:04:40 +0200 Subject: [PATCH 4/4] tests: added integration tests for prometheus [EC-299] --- karapace/client.py | 5 ++- karapace/instrumentation/prometheus.py | 43 +++++++++++-------- karapace/karapace_all.py | 3 +- tests/integration/instrumentation/__init__.py | 0 .../instrumentation/test_prometheus.py | 30 +++++++++++++ tests/unit/instrumentation/test_prometheus.py | 13 +++--- 6 files changed, 65 insertions(+), 29 deletions(-) create mode 100644 tests/integration/instrumentation/__init__.py create mode 100644 tests/integration/instrumentation/test_prometheus.py diff --git a/karapace/client.py b/karapace/client.py index d699e6a73..dae79b244 100644 --- a/karapace/client.py +++ b/karapace/client.py @@ -91,6 +91,7 @@ async def get( headers: Optional[Headers] = None, auth: Optional[BasicAuth] = None, params: Optional[Mapping[str, str]] = None, + json_response: bool = True, ) -> Result: path = self.path_for(path) if not headers: @@ -105,8 +106,8 @@ async def get( params=params, ) as res: # required for forcing the response body conversion to json despite missing valid Accept headers - json_result = await res.json(content_type=None) - return Result(res.status, json_result, headers=res.headers) + result = await res.json(content_type=None) if json_response else await res.text() + return Result(res.status, result, headers=res.headers) async def delete( self, diff --git a/karapace/instrumentation/prometheus.py b/karapace/instrumentation/prometheus.py index d4b168d35..4e478fdc7 100644 --- a/karapace/instrumentation/prometheus.py +++ b/karapace/instrumentation/prometheus.py @@ -4,13 +4,14 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ +# mypy: disable-error-code="call-overload" from __future__ import annotations -from aiohttp.web import middleware, Request, RequestHandler, Response +from aiohttp.web import middleware, Request, Response from karapace.rapu import RestApp from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, Histogram -from typing import ClassVar +from typing import Awaitable, Callable, Final import logging import time @@ -19,34 +20,34 @@ class PrometheusInstrumentation: - METRICS_ENDPOINT_PATH: ClassVar[str] = "/metrics" - START_TIME_REQUEST_KEY: ClassVar[str] = "start_time" + METRICS_ENDPOINT_PATH: Final[str] = "/metrics" + START_TIME_REQUEST_KEY: Final[str] = "start_time" - registry: ClassVar[CollectorRegistry] = CollectorRegistry() + registry: Final[CollectorRegistry] = CollectorRegistry() - karapace_http_requests_total: ClassVar[Counter] = Counter( + karapace_http_requests_total: Final[Counter] = Counter( registry=registry, name="karapace_http_requests_total", documentation="Total Request Count for HTTP/TCP Protocol", labelnames=("method", "path", "status"), ) - karapace_http_requests_latency_seconds: ClassVar[Histogram] = Histogram( + karapace_http_requests_duration_seconds: Final[Histogram] = Histogram( registry=registry, - name="karapace_http_requests_latency_seconds", + name="karapace_http_requests_duration_seconds", documentation="Request Duration for HTTP/TCP Protocol", labelnames=("method", "path"), ) - karapace_http_requests_in_progress: ClassVar[Gauge] = Gauge( + karapace_http_requests_in_progress: Final[Gauge] = Gauge( registry=registry, name="karapace_http_requests_in_progress", - documentation="Request Duration for HTTP/TCP Protocol", + documentation="In-progress requests for HTTP/TCP Protocol", labelnames=("method", "path"), ) @classmethod - def setup_metrics(cls: PrometheusInstrumentation, *, app: RestApp) -> None: + def setup_metrics(cls, *, app: RestApp) -> None: LOG.info("Setting up prometheus metrics") app.route( cls.METRICS_ENDPOINT_PATH, @@ -57,22 +58,26 @@ def setup_metrics(cls: PrometheusInstrumentation, *, app: RestApp) -> None: json_body=False, auth=None, ) - app.app.middlewares.insert(0, cls.http_request_metrics_middleware) + app.app.middlewares.insert(0, cls.http_request_metrics_middleware) # type: ignore[arg-type] + + # disable-error-code="call-overload" is used at the top of this file to allow mypy checks. + # the issue is in the type difference (Counter, Gauge, etc) of the arguments which we are + # passing to `__setitem__()`, but we need to keep these objects in the `app.app` dict. app.app[cls.karapace_http_requests_total] = cls.karapace_http_requests_total - app.app[cls.karapace_http_requests_latency_seconds] = cls.karapace_http_requests_latency_seconds + app.app[cls.karapace_http_requests_duration_seconds] = cls.karapace_http_requests_duration_seconds app.app[cls.karapace_http_requests_in_progress] = cls.karapace_http_requests_in_progress @classmethod - async def serve_metrics(cls: PrometheusInstrumentation) -> bytes: + async def serve_metrics(cls) -> bytes: return generate_latest(cls.registry) @classmethod @middleware async def http_request_metrics_middleware( - cls: PrometheusInstrumentation, + cls, request: Request, - handler: RequestHandler, - ) -> None: + handler: Callable[[Request], Awaitable[Response]], + ) -> Response: request[cls.START_TIME_REQUEST_KEY] = time.time() # Extract request labels @@ -85,8 +90,8 @@ async def http_request_metrics_middleware( # Call request handler response: Response = await handler(request) - # Instrument request latency - request.app[cls.karapace_http_requests_latency_seconds].labels(method, path).observe( + # Instrument request duration + request.app[cls.karapace_http_requests_duration_seconds].labels(method, path).observe( time.time() - request[cls.START_TIME_REQUEST_KEY] ) diff --git a/karapace/karapace_all.py b/karapace/karapace_all.py index 620928428..240da1008 100644 --- a/karapace/karapace_all.py +++ b/karapace/karapace_all.py @@ -63,9 +63,8 @@ def main() -> int: logging.log(logging.DEBUG, "Config %r", config_without_secrets) try: - # `close` will be called by the callback `close_by_app` set by `KarapaceBase` PrometheusInstrumentation.setup_metrics(app=app) - app.run() + app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase` except Exception as ex: # pylint: disable-broad-except app.stats.unexpected_exception(ex=ex, where="karapace") raise diff --git a/tests/integration/instrumentation/__init__.py b/tests/integration/instrumentation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/instrumentation/test_prometheus.py b/tests/integration/instrumentation/test_prometheus.py new file mode 100644 index 000000000..cdac52ca8 --- /dev/null +++ b/tests/integration/instrumentation/test_prometheus.py @@ -0,0 +1,30 @@ +""" +karapace - prometheus instrumentation tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from http import HTTPStatus +from karapace.client import Client, Result +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from prometheus_client.parser import text_string_to_metric_families + + +async def test_metrics_endpoint(registry_async_client: Client) -> None: + result: Result = await registry_async_client.get( + PrometheusInstrumentation.METRICS_ENDPOINT_PATH, + json_response=False, + ) + assert result.status_code == HTTPStatus.OK.value + + +async def test_metrics_endpoint_parsed_response(registry_async_client: Client) -> None: + result: Result = await registry_async_client.get( + PrometheusInstrumentation.METRICS_ENDPOINT_PATH, + json_response=False, + ) + metrics = [family.name for family in text_string_to_metric_families(result.json_result)] + assert "karapace_http_requests" in metrics + assert "karapace_http_requests_duration_seconds" in metrics + assert "karapace_http_requests_in_progress" in metrics diff --git a/tests/unit/instrumentation/test_prometheus.py b/tests/unit/instrumentation/test_prometheus.py index 7fcbf4f19..c4b3da8d9 100644 --- a/tests/unit/instrumentation/test_prometheus.py +++ b/tests/unit/instrumentation/test_prometheus.py @@ -27,15 +27,15 @@ def test_constants(self, prometheus: PrometheusInstrumentation) -> None: def test_metric_types(self, prometheus: PrometheusInstrumentation) -> None: assert isinstance(prometheus.karapace_http_requests_total, Counter) - assert isinstance(prometheus.karapace_http_requests_latency_seconds, Histogram) + assert isinstance(prometheus.karapace_http_requests_duration_seconds, Histogram) assert isinstance(prometheus.karapace_http_requests_in_progress, Gauge) def test_metric_values(self, prometheus: PrometheusInstrumentation) -> None: # `_total` suffix is stripped off the metric name for `Counters`, but needed for clarity. assert repr(prometheus.karapace_http_requests_total) == "prometheus_client.metrics.Counter(karapace_http_requests)" assert ( - repr(prometheus.karapace_http_requests_latency_seconds) - == "prometheus_client.metrics.Histogram(karapace_http_requests_latency_seconds)" + repr(prometheus.karapace_http_requests_duration_seconds) + == "prometheus_client.metrics.Histogram(karapace_http_requests_duration_seconds)" ) assert ( repr(prometheus.karapace_http_requests_in_progress) @@ -62,8 +62,8 @@ def test_setup_metrics(self, caplog: LogCaptureFixture, prometheus: PrometheusIn [ call(prometheus.karapace_http_requests_total, prometheus.karapace_http_requests_total), call( - prometheus.karapace_http_requests_latency_seconds, - prometheus.karapace_http_requests_latency_seconds, + prometheus.karapace_http_requests_duration_seconds, + prometheus.karapace_http_requests_duration_seconds, ), call(prometheus.karapace_http_requests_in_progress, prometheus.karapace_http_requests_in_progress), ] @@ -92,6 +92,7 @@ async def test_http_request_metrics_middleware( await prometheus.http_request_metrics_middleware(request=request, handler=handler) + assert handler.assert_awaited_once # extra assert is to ignore pylint [pointless-statement] request.__setitem__.assert_called_once_with(prometheus.START_TIME_REQUEST_KEY, 10) request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls( [ @@ -99,7 +100,7 @@ async def test_http_request_metrics_middleware( call().inc(), ] ) - request.app[prometheus.karapace_http_requests_latency_seconds].labels.assert_has_calls( + request.app[prometheus.karapace_http_requests_duration_seconds].labels.assert_has_calls( [ call("GET", "/path"), call().observe(request.__getitem__.return_value.__rsub__.return_value),