diff --git a/container/compose.yml b/container/compose.yml index f1328afef..6e17cee5b 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -102,3 +102,10 @@ services: KARAPACE_REGISTRY_PORT: 8081 KARAPACE_ADMIN_METADATA_MAX_AGE: 0 KARAPACE_LOG_LEVEL: WARNING + + prometheus: + image: prom/prometheus + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml + ports: + - 9090:9090 diff --git a/container/prometheus.yml b/container/prometheus.yml new file mode 100644 index 000000000..20731bb4b --- /dev/null +++ b/container/prometheus.yml @@ -0,0 +1,18 @@ +global: + scrape_interval: 10s # How frequently to scrape targets by default. + scrape_timeout: 5s # How long until a scrape request times out. + evaluation_interval: 60s # How frequently to evaluate rules. + +# A scrape configuration +scrape_configs: + - job_name: karapace-registry + metrics_path: /metrics + static_configs: + - targets: + - karapace-registry:8081 + + - job_name: karapace-rest + metrics_path: /metrics + static_configs: + - targets: + - karapace-rest:8082 diff --git a/karapace/client.py b/karapace/client.py index d699e6a73..dae79b244 100644 --- a/karapace/client.py +++ b/karapace/client.py @@ -91,6 +91,7 @@ async def get( headers: Optional[Headers] = None, auth: Optional[BasicAuth] = None, params: Optional[Mapping[str, str]] = None, + json_response: bool = True, ) -> Result: path = self.path_for(path) if not headers: @@ -105,8 +106,8 @@ async def get( params=params, ) as res: # required for forcing the response body conversion to json despite missing valid Accept headers - json_result = await res.json(content_type=None) - return Result(res.status, json_result, headers=res.headers) + result = await res.json(content_type=None) if json_response else await res.text() + return Result(res.status, result, headers=res.headers) async def delete( self, diff --git a/karapace/instrumentation/__init__.py b/karapace/instrumentation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/karapace/instrumentation/prometheus.py b/karapace/instrumentation/prometheus.py new file mode 100644 index 000000000..4e478fdc7 --- /dev/null +++ b/karapace/instrumentation/prometheus.py @@ -0,0 +1,104 @@ +""" +karapace - prometheus instrumentation + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +# mypy: disable-error-code="call-overload" + +from __future__ import annotations + +from aiohttp.web import middleware, Request, Response +from karapace.rapu import RestApp +from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, Histogram +from typing import Awaitable, Callable, Final + +import logging +import time + +LOG = logging.getLogger(__name__) + + +class PrometheusInstrumentation: + METRICS_ENDPOINT_PATH: Final[str] = "/metrics" + START_TIME_REQUEST_KEY: Final[str] = "start_time" + + registry: Final[CollectorRegistry] = CollectorRegistry() + + karapace_http_requests_total: Final[Counter] = Counter( + registry=registry, + name="karapace_http_requests_total", + documentation="Total Request Count for HTTP/TCP Protocol", + labelnames=("method", "path", "status"), + ) + + karapace_http_requests_duration_seconds: Final[Histogram] = Histogram( + registry=registry, + name="karapace_http_requests_duration_seconds", + documentation="Request Duration for HTTP/TCP Protocol", + labelnames=("method", "path"), + ) + + karapace_http_requests_in_progress: Final[Gauge] = Gauge( + registry=registry, + name="karapace_http_requests_in_progress", + documentation="In-progress requests for HTTP/TCP Protocol", + labelnames=("method", "path"), + ) + + @classmethod + def setup_metrics(cls, *, app: RestApp) -> None: + LOG.info("Setting up prometheus metrics") + app.route( + cls.METRICS_ENDPOINT_PATH, + callback=cls.serve_metrics, + method="GET", + schema_request=False, + with_request=False, + json_body=False, + auth=None, + ) + app.app.middlewares.insert(0, cls.http_request_metrics_middleware) # type: ignore[arg-type] + + # disable-error-code="call-overload" is used at the top of this file to allow mypy checks. + # the issue is in the type difference (Counter, Gauge, etc) of the arguments which we are + # passing to `__setitem__()`, but we need to keep these objects in the `app.app` dict. + app.app[cls.karapace_http_requests_total] = cls.karapace_http_requests_total + app.app[cls.karapace_http_requests_duration_seconds] = cls.karapace_http_requests_duration_seconds + app.app[cls.karapace_http_requests_in_progress] = cls.karapace_http_requests_in_progress + + @classmethod + async def serve_metrics(cls) -> bytes: + return generate_latest(cls.registry) + + @classmethod + @middleware + async def http_request_metrics_middleware( + cls, + request: Request, + handler: Callable[[Request], Awaitable[Response]], + ) -> Response: + request[cls.START_TIME_REQUEST_KEY] = time.time() + + # Extract request labels + path = request.path + method = request.method + + # Increment requests in progress before handler + request.app[cls.karapace_http_requests_in_progress].labels(method, path).inc() + + # Call request handler + response: Response = await handler(request) + + # Instrument request duration + request.app[cls.karapace_http_requests_duration_seconds].labels(method, path).observe( + time.time() - request[cls.START_TIME_REQUEST_KEY] + ) + + # Instrument total requests + request.app[cls.karapace_http_requests_total].labels(method, path, response.status).inc() + + # Decrement requests in progress after handler + request.app[cls.karapace_http_requests_in_progress].labels(method, path).dec() + + return response diff --git a/karapace/karapace_all.py b/karapace/karapace_all.py index 353c5021a..240da1008 100644 --- a/karapace/karapace_all.py +++ b/karapace/karapace_all.py @@ -6,6 +6,7 @@ from contextlib import closing from karapace import version as karapace_version from karapace.config import read_config +from karapace.instrumentation.prometheus import PrometheusInstrumentation from karapace.kafka_rest_apis import KafkaRest from karapace.rapu import RestApp from karapace.schema_registry_apis import KarapaceSchemaRegistryController @@ -62,8 +63,8 @@ def main() -> int: logging.log(logging.DEBUG, "Config %r", config_without_secrets) try: - # `close` will be called by the callback `close_by_app` set by `KarapaceBase` - app.run() + PrometheusInstrumentation.setup_metrics(app=app) + app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase` except Exception as ex: # pylint: disable-broad-except app.stats.unexpected_exception(ex=ex, where="karapace") raise diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 45d07a105..0ec33d6ed 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -159,6 +159,8 @@ pkgutil-resolve-name==1.3.10 # jsonschema pluggy==1.5.0 # via pytest +prometheus-client==0.20.0 + # via -r requirements.txt protobuf==3.20.3 # via -r requirements.txt psutil==5.9.8 diff --git a/requirements/requirements.in b/requirements/requirements.in index 39b039494..2bd3ccfe7 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -19,6 +19,7 @@ ujson<6 watchfiles<1 xxhash~=3.3 zstandard +prometheus-client==0.20.0 # Patched dependencies # diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 6bf9bdd13..8192bda35 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -67,6 +67,8 @@ packaging==24.0 # via aiokafka pkgutil-resolve-name==1.3.10 # via jsonschema +prometheus-client==0.20.0 + # via -r requirements.in protobuf==3.20.3 # via -r requirements.in pygments==2.18.0 diff --git a/tests/integration/instrumentation/__init__.py b/tests/integration/instrumentation/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/instrumentation/test_prometheus.py b/tests/integration/instrumentation/test_prometheus.py new file mode 100644 index 000000000..cdac52ca8 --- /dev/null +++ b/tests/integration/instrumentation/test_prometheus.py @@ -0,0 +1,30 @@ +""" +karapace - prometheus instrumentation tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from http import HTTPStatus +from karapace.client import Client, Result +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from prometheus_client.parser import text_string_to_metric_families + + +async def test_metrics_endpoint(registry_async_client: Client) -> None: + result: Result = await registry_async_client.get( + PrometheusInstrumentation.METRICS_ENDPOINT_PATH, + json_response=False, + ) + assert result.status_code == HTTPStatus.OK.value + + +async def test_metrics_endpoint_parsed_response(registry_async_client: Client) -> None: + result: Result = await registry_async_client.get( + PrometheusInstrumentation.METRICS_ENDPOINT_PATH, + json_response=False, + ) + metrics = [family.name for family in text_string_to_metric_families(result.json_result)] + assert "karapace_http_requests" in metrics + assert "karapace_http_requests_duration_seconds" in metrics + assert "karapace_http_requests_in_progress" in metrics diff --git a/tests/unit/instrumentation/test_prometheus.py b/tests/unit/instrumentation/test_prometheus.py new file mode 100644 index 000000000..c4b3da8d9 --- /dev/null +++ b/tests/unit/instrumentation/test_prometheus.py @@ -0,0 +1,120 @@ +""" +karapace - prometheus instrumentation tests + +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from _pytest.logging import LogCaptureFixture +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from karapace.rapu import RestApp +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram +from unittest.mock import AsyncMock, call, MagicMock, patch + +import aiohttp.web +import logging +import pytest + + +class TestPrometheusInstrumentation: + @pytest.fixture + def prometheus(self) -> PrometheusInstrumentation: + return PrometheusInstrumentation() + + def test_constants(self, prometheus: PrometheusInstrumentation) -> None: + assert prometheus.START_TIME_REQUEST_KEY == "start_time" + assert isinstance(prometheus.registry, CollectorRegistry) + + def test_metric_types(self, prometheus: PrometheusInstrumentation) -> None: + assert isinstance(prometheus.karapace_http_requests_total, Counter) + assert isinstance(prometheus.karapace_http_requests_duration_seconds, Histogram) + assert isinstance(prometheus.karapace_http_requests_in_progress, Gauge) + + def test_metric_values(self, prometheus: PrometheusInstrumentation) -> None: + # `_total` suffix is stripped off the metric name for `Counters`, but needed for clarity. + assert repr(prometheus.karapace_http_requests_total) == "prometheus_client.metrics.Counter(karapace_http_requests)" + assert ( + repr(prometheus.karapace_http_requests_duration_seconds) + == "prometheus_client.metrics.Histogram(karapace_http_requests_duration_seconds)" + ) + assert ( + repr(prometheus.karapace_http_requests_in_progress) + == "prometheus_client.metrics.Gauge(karapace_http_requests_in_progress)" + ) + + def test_setup_metrics(self, caplog: LogCaptureFixture, prometheus: PrometheusInstrumentation) -> None: + app = AsyncMock(spec=RestApp, app=AsyncMock(spec=aiohttp.web.Application)) + + with caplog.at_level(logging.INFO, logger="karapace.instrumentation.prometheus"): + prometheus.setup_metrics(app=app) + + app.route.assert_called_once_with( + prometheus.METRICS_ENDPOINT_PATH, + callback=prometheus.serve_metrics, + method="GET", + schema_request=False, + with_request=False, + json_body=False, + auth=None, + ) + app.app.middlewares.insert.assert_called_once_with(0, prometheus.http_request_metrics_middleware) + app.app.__setitem__.assert_has_calls( + [ + call(prometheus.karapace_http_requests_total, prometheus.karapace_http_requests_total), + call( + prometheus.karapace_http_requests_duration_seconds, + prometheus.karapace_http_requests_duration_seconds, + ), + call(prometheus.karapace_http_requests_in_progress, prometheus.karapace_http_requests_in_progress), + ] + ) + for log in caplog.records: + assert log.name == "karapace.instrumentation.prometheus" + assert log.levelname == "INFO" + assert log.message == "Setting up prometheus metrics" + + @patch("karapace.instrumentation.prometheus.generate_latest") + async def test_serve_metrics(self, generate_latest: MagicMock, prometheus: PrometheusInstrumentation) -> None: + await prometheus.serve_metrics() + generate_latest.assert_called_once_with(prometheus.registry) + + @patch("karapace.instrumentation.prometheus.time") + async def test_http_request_metrics_middleware( + self, + mock_time: MagicMock, + prometheus: PrometheusInstrumentation, + ) -> None: + mock_time.time.return_value = 10 + request = AsyncMock( + spec=aiohttp.web.Request, app=AsyncMock(spec=aiohttp.web.Application), path="/path", method="GET" + ) + handler = AsyncMock(spec=aiohttp.web.RequestHandler, return_value=MagicMock(status=200)) + + await prometheus.http_request_metrics_middleware(request=request, handler=handler) + + assert handler.assert_awaited_once # extra assert is to ignore pylint [pointless-statement] + request.__setitem__.assert_called_once_with(prometheus.START_TIME_REQUEST_KEY, 10) + request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls( + [ + call("GET", "/path"), + call().inc(), + ] + ) + request.app[prometheus.karapace_http_requests_duration_seconds].labels.assert_has_calls( + [ + call("GET", "/path"), + call().observe(request.__getitem__.return_value.__rsub__.return_value), + ] + ) + request.app[prometheus.karapace_http_requests_total].labels.assert_has_calls( + [ + call("GET", "/path", 200), + call().inc(), + ] + ) + request.app[prometheus.karapace_http_requests_in_progress].labels.assert_has_calls( + [ + call("GET", "/path"), + call().dec(), + ] + )