From 4cefda184ac024c6fbcc62b2dc46a921d8d8ab33 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Mon, 28 Oct 2024 16:36:42 +0100 Subject: [PATCH 1/3] adding open telemetry, port 44081 --- lib/charms/grafana_agent/v0/cos_agent.py | 726 ++++++++++++++++++++--- src/charm.py | 15 + src/core/workload.py | 5 + src/managers/config.py | 4 +- 4 files changed, 658 insertions(+), 92 deletions(-) diff --git a/lib/charms/grafana_agent/v0/cos_agent.py b/lib/charms/grafana_agent/v0/cos_agent.py index 9261582f..cc4da25a 100644 --- a/lib/charms/grafana_agent/v0/cos_agent.py +++ b/lib/charms/grafana_agent/v0/cos_agent.py @@ -22,7 +22,7 @@ Using the `COSAgentProvider` object only requires instantiating it, typically in the `__init__` method of your charm (the one which sends telemetry). -The constructor of `COSAgentProvider` has only one required and nine optional parameters: +The constructor of `COSAgentProvider` has only one required and ten optional parameters: ```python def __init__( @@ -36,6 +36,7 @@ def __init__( log_slots: Optional[List[str]] = None, dashboard_dirs: Optional[List[str]] = None, refresh_events: Optional[List] = None, + tracing_protocols: Optional[List[str]] = None, scrape_configs: Optional[Union[List[Dict], Callable]] = None, ): ``` @@ -65,6 +66,8 @@ def __init__( - `refresh_events`: List of events on which to refresh relation data. +- `tracing_protocols`: List of requested tracing protocols that the charm requires to send traces. + - `scrape_configs`: List of standard scrape_configs dicts or a callable that returns the list in case the configs need to be generated dynamically. The contents of this list will be merged with the configs from `metrics_endpoints`. @@ -108,6 +111,7 @@ def __init__(self, *args): log_slots=["my-app:slot"], dashboard_dirs=["./src/dashboards_1", "./src/dashboards_2"], refresh_events=["update-status", "upgrade-charm"], + tracing_protocols=["otlp_http", "otlp_grpc"], scrape_configs=[ { "job_name": "custom_job", @@ -206,21 +210,34 @@ def __init__(self, *args): ``` """ -import base64 +import enum import json import logging -import lzma +import socket from collections import namedtuple from itertools import chain from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Optional, Set, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + ClassVar, + Dict, + List, + Literal, + MutableMapping, + Optional, + Set, + Tuple, + Union, +) import pydantic -from cosl import JujuTopology +from cosl import GrafanaDashboard, JujuTopology from cosl.rules import AlertRules from ops.charm import RelationChangedEvent from ops.framework import EventBase, EventSource, Object, ObjectEvents -from ops.model import Relation, Unit +from ops.model import ModelError, Relation from ops.testing import CharmType if TYPE_CHECKING: @@ -236,9 +253,9 @@ class _MetricsEndpointDict(TypedDict): LIBID = "dc15fa84cef84ce58155fb84f6c6213a" LIBAPI = 0 -LIBPATCH = 5 +LIBPATCH = 11 -PYDEPS = ["cosl", "pydantic<2"] +PYDEPS = ["cosl", "pydantic"] DEFAULT_RELATION_NAME = "cos-agent" DEFAULT_PEER_RELATION_NAME = "peers" @@ -251,32 +268,207 @@ class _MetricsEndpointDict(TypedDict): SnapEndpoint = namedtuple("SnapEndpoint", "owner, name") -class GrafanaDashboard(str): - """Grafana Dashboard encoded json; lzma-compressed.""" +# Note: MutableMapping is imported from the typing module and not collections.abc +# because subscripting collections.abc.MutableMapping was added in python 3.9, but +# most of our charms are based on 20.04, which has python 3.8. - # TODO Replace this with a custom type when pydantic v2 released (end of 2023 Q1?) - # https://github.com/pydantic/pydantic/issues/4887 - @staticmethod - def _serialize(raw_json: Union[str, bytes]) -> "GrafanaDashboard": - if not isinstance(raw_json, bytes): - raw_json = raw_json.encode("utf-8") - encoded = base64.b64encode(lzma.compress(raw_json)).decode("utf-8") - return GrafanaDashboard(encoded) +_RawDatabag = MutableMapping[str, str] - def _deserialize(self) -> Dict: - try: - raw = lzma.decompress(base64.b64decode(self.encode("utf-8"))).decode() - return json.loads(raw) - except json.decoder.JSONDecodeError as e: - logger.error("Invalid Dashboard format: %s", e) - return {} - def __repr__(self): - """Return string representation of self.""" - return "" +class TransportProtocolType(str, enum.Enum): + """Receiver Type.""" + + http = "http" + grpc = "grpc" + + +receiver_protocol_to_transport_protocol = { + "zipkin": TransportProtocolType.http, + "kafka": TransportProtocolType.http, + "tempo_http": TransportProtocolType.http, + "tempo_grpc": TransportProtocolType.grpc, + "otlp_grpc": TransportProtocolType.grpc, + "otlp_http": TransportProtocolType.http, + "jaeger_thrift_http": TransportProtocolType.http, +} + +_tracing_receivers_ports = { + # OTLP receiver: see + # https://github.com/open-telemetry/opentelemetry-collector/tree/v0.96.0/receiver/otlpreceiver + "otlp_http": 4318, + "otlp_grpc": 4317, + # Jaeger receiver: see + # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/receiver/jaegerreceiver + "jaeger_grpc": 14250, + "jaeger_thrift_http": 14268, + # Zipkin receiver: see + # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/receiver/zipkinreceiver + "zipkin": 9411, +} + +ReceiverProtocol = Literal["otlp_grpc", "otlp_http", "zipkin", "jaeger_thrift_http", "jaeger_grpc"] + + +class TracingError(Exception): + """Base class for custom errors raised by tracing.""" + + +class NotReadyError(TracingError): + """Raised by the provider wrapper if a requirer hasn't published the required data (yet).""" + + +class ProtocolNotRequestedError(TracingError): + """Raised if the user attempts to obtain an endpoint for a protocol it did not request.""" + + +class DataValidationError(TracingError): + """Raised when data validation fails on IPU relation data.""" + + +class AmbiguousRelationUsageError(TracingError): + """Raised when one wrongly assumes that there can only be one relation on an endpoint.""" + + +# TODO we want to eventually use `DatabagModel` from cosl but it likely needs a move to common package first +if int(pydantic.version.VERSION.split(".")[0]) < 2: # type: ignore + + class DatabagModel(pydantic.BaseModel): # type: ignore + """Base databag model.""" + + class Config: + """Pydantic config.""" + + # ignore any extra fields in the databag + extra = "ignore" + """Ignore any extra fields in the databag.""" + allow_population_by_field_name = True + """Allow instantiating this class by field name (instead of forcing alias).""" + + _NEST_UNDER = None + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + if cls._NEST_UNDER: + return cls.parse_obj(json.loads(databag[cls._NEST_UNDER])) + + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {f.alias for f in cls.__fields__.values()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.parse_raw(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + + if self._NEST_UNDER: + databag[self._NEST_UNDER] = self.json(by_alias=True) + return databag + + dct = self.dict() + for key, field in self.__fields__.items(): # type: ignore + value = dct[key] + databag[field.alias or key] = json.dumps(value) + + return databag + +else: + from pydantic import ConfigDict + + class DatabagModel(pydantic.BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # ignore any extra fields in the databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + # Custom config key: whether to nest the whole datastructure (as json) + # under a field or spread it out at the toplevel. + _NEST_UNDER=None, # type: ignore + arbitrary_types_allowed=True, + ) + """Pydantic config.""" + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + nest_under = cls.model_config.get("_NEST_UNDER") # type: ignore + if nest_under: + return cls.model_validate(json.loads(databag[nest_under])) # type: ignore + + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {(f.alias or n) for n, f in cls.__fields__.items()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.model_validate_json(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + nest_under = self.model_config.get("_NEST_UNDER") + if nest_under: + databag[nest_under] = self.model_dump_json( # type: ignore + by_alias=True, + # skip keys whose values are default + exclude_defaults=True, + ) + return databag + + dct = self.model_dump() # type: ignore + for key, field in self.model_fields.items(): # type: ignore + value = dct[key] + if value == field.default: + continue + databag[field.alias or key] = json.dumps(value) + + return databag -class CosAgentProviderUnitData(pydantic.BaseModel): +class CosAgentProviderUnitData(DatabagModel): """Unit databag model for `cos-agent` relation.""" # The following entries are the same for all units of the same principal. @@ -285,27 +477,33 @@ class CosAgentProviderUnitData(pydantic.BaseModel): metrics_alert_rules: dict log_alert_rules: dict dashboards: List[GrafanaDashboard] + # subordinate is no longer used but we should keep it until we bump the library to ensure + # we don't break compatibility. + subordinate: Optional[bool] = None # The following entries may vary across units of the same principal app. # this data does not need to be forwarded to the gagent leader metrics_scrape_jobs: List[Dict] log_slots: List[str] + # Requested tracing protocols. + tracing_protocols: Optional[List[str]] = None + # when this whole datastructure is dumped into a databag, it will be nested under this key. # while not strictly necessary (we could have it 'flattened out' into the databag), # this simplifies working with the model. KEY: ClassVar[str] = "config" -class CosAgentPeersUnitData(pydantic.BaseModel): +class CosAgentPeersUnitData(DatabagModel): """Unit databag model for `peers` cos-agent machine charm peer relation.""" # We need the principal unit name and relation metadata to be able to render identifiers # (e.g. topology) on the leader side, after all the data moves into peer data (the grafana # agent leader can only see its own principal, because it is a subordinate charm). - principal_unit_name: str - principal_relation_id: str - principal_relation_name: str + unit_name: str + relation_id: str + relation_name: str # The only data that is forwarded to the leader is data that needs to go into the app databags # of the outgoing o11y relations. @@ -325,7 +523,84 @@ def app_name(self) -> str: TODO: Switch to using `model_post_init` when pydantic v2 is released? https://github.com/pydantic/pydantic/issues/1729#issuecomment-1300576214 """ - return self.principal_unit_name.split("/")[0] + return self.unit_name.split("/")[0] + + +if int(pydantic.version.VERSION.split(".")[0]) < 2: # type: ignore + + class ProtocolType(pydantic.BaseModel): # type: ignore + """Protocol Type.""" + + class Config: + """Pydantic config.""" + + use_enum_values = True + """Allow serializing enum values.""" + + name: str = pydantic.Field( + ..., + description="Receiver protocol name. What protocols are supported (and what they are called) " + "may differ per provider.", + examples=["otlp_grpc", "otlp_http", "tempo_http"], + ) + + type: TransportProtocolType = pydantic.Field( + ..., + description="The transport protocol used by this receiver.", + examples=["http", "grpc"], + ) + +else: + + class ProtocolType(pydantic.BaseModel): + """Protocol Type.""" + + model_config = pydantic.ConfigDict( + # Allow serializing enum values. + use_enum_values=True + ) + """Pydantic config.""" + + name: str = pydantic.Field( + ..., + description="Receiver protocol name. What protocols are supported (and what they are called) " + "may differ per provider.", + examples=["otlp_grpc", "otlp_http", "tempo_http"], + ) + + type: TransportProtocolType = pydantic.Field( + ..., + description="The transport protocol used by this receiver.", + examples=["http", "grpc"], + ) + + +class Receiver(pydantic.BaseModel): + """Specification of an active receiver.""" + + protocol: ProtocolType = pydantic.Field(..., description="Receiver protocol name and type.") + url: str = pydantic.Field( + ..., + description="""URL at which the receiver is reachable. If there's an ingress, it would be the external URL. + Otherwise, it would be the service's fqdn or internal IP. + If the protocol type is grpc, the url will not contain a scheme.""", + examples=[ + "http://traefik_address:2331", + "https://traefik_address:2331", + "http://tempo_public_ip:2331", + "https://tempo_public_ip:2331", + "tempo_public_ip:2331", + ], + ) + + +class CosAgentRequirerUnitData(DatabagModel): # noqa: D101 + """Application databag model for the COS-agent requirer.""" + + receivers: List[Receiver] = pydantic.Field( + ..., + description="List of all receivers enabled on the tracing provider.", + ) class COSAgentProvider(Object): @@ -342,6 +617,7 @@ def __init__( log_slots: Optional[List[str]] = None, dashboard_dirs: Optional[List[str]] = None, refresh_events: Optional[List] = None, + tracing_protocols: Optional[List[str]] = None, *, scrape_configs: Optional[Union[List[dict], Callable]] = None, ): @@ -360,6 +636,7 @@ def __init__( in the form ["snap-name:slot", ...]. dashboard_dirs: Directory where the dashboards are stored. refresh_events: List of events on which to refresh relation data. + tracing_protocols: List of protocols that the charm will be using for sending traces. scrape_configs: List of standard scrape_configs dicts or a callable that returns the list in case the configs need to be generated dynamically. The contents of this list will be merged with the contents of `metrics_endpoints`. @@ -377,6 +654,8 @@ def __init__( self._log_slots = log_slots or [] self._dashboard_dirs = dashboard_dirs self._refresh_events = refresh_events or [self._charm.on.config_changed] + self._tracing_protocols = tracing_protocols + self._is_single_endpoint = charm.meta.relations[relation_name].limit == 1 events = self._charm.on[relation_name] self.framework.observe(events.relation_joined, self._on_refresh) @@ -401,6 +680,7 @@ def _on_refresh(self, event): dashboards=self._dashboards, metrics_scrape_jobs=self._scrape_jobs, log_slots=self._log_slots, + tracing_protocols=self._tracing_protocols, ) relation.data[self._charm.unit][data.KEY] = data.json() except ( @@ -465,6 +745,103 @@ def _dashboards(self) -> List[GrafanaDashboard]: dashboards.append(dashboard) return dashboards + @property + def relations(self) -> List[Relation]: + """The tracing relations associated with this endpoint.""" + return self._charm.model.relations[self._relation_name] + + @property + def _relation(self) -> Optional[Relation]: + """If this wraps a single endpoint, the relation bound to it, if any.""" + if not self._is_single_endpoint: + objname = type(self).__name__ + raise AmbiguousRelationUsageError( + f"This {objname} wraps a {self._relation_name} endpoint that has " + "limit != 1. We can't determine what relation, of the possibly many, you are " + f"referring to. Please pass a relation instance while calling {objname}, " + "or set limit=1 in the charm metadata." + ) + relations = self.relations + return relations[0] if relations else None + + def is_ready(self, relation: Optional[Relation] = None): + """Is this endpoint ready?""" + relation = relation or self._relation + if not relation: + logger.debug(f"no relation on {self._relation_name !r}: tracing not ready") + return False + if relation.data is None: + logger.error(f"relation data is None for {relation}") + return False + if not relation.app: + logger.error(f"{relation} event received but there is no relation.app") + return False + try: + unit = next(iter(relation.units), None) + if not unit: + return False + databag = dict(relation.data[unit]) + CosAgentRequirerUnitData.load(databag) + + except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): + logger.info(f"failed validating relation data for {relation}") + return False + return True + + def get_all_endpoints( + self, relation: Optional[Relation] = None + ) -> Optional[CosAgentRequirerUnitData]: + """Unmarshalled relation data.""" + relation = relation or self._relation + if not relation or not self.is_ready(relation): + return None + unit = next(iter(relation.units), None) + if not unit: + return None + return CosAgentRequirerUnitData.load(relation.data[unit]) # type: ignore + + def _get_tracing_endpoint( + self, relation: Optional[Relation], protocol: ReceiverProtocol + ) -> Optional[str]: + unit_data = self.get_all_endpoints(relation) + if not unit_data: + return None + receivers: List[Receiver] = [i for i in unit_data.receivers if i.protocol.name == protocol] + if not receivers: + logger.error(f"no receiver found with protocol={protocol!r}") + return None + if len(receivers) > 1: + logger.error( + f"too many receivers with protocol={protocol!r}; using first one. Found: {receivers}" + ) + return None + + receiver = receivers[0] + return receiver.url + + def get_tracing_endpoint( + self, protocol: ReceiverProtocol, relation: Optional[Relation] = None + ) -> Optional[str]: + """Receiver endpoint for the given protocol.""" + endpoint = self._get_tracing_endpoint(relation or self._relation, protocol=protocol) + if not endpoint: + requested_protocols = set() + relations = [relation] if relation else self.relations + for relation in relations: + try: + databag = CosAgentProviderUnitData.load(relation.data[self._charm.unit]) + except DataValidationError: + continue + + if databag.tracing_protocols: + requested_protocols.update(databag.tracing_protocols) + + if protocol not in requested_protocols: + raise ProtocolNotRequestedError(protocol, relation) + + return None + return endpoint + class COSAgentDataChanged(EventBase): """Event emitted by `COSAgentRequirer` when relation data changes.""" @@ -578,24 +955,63 @@ def _on_relation_data_changed(self, event: RelationChangedEvent): if not (provider_data := self._validated_provider_data(raw)): return - # Copy data from the principal relation to the peer relation, so the leader could + # write enabled receivers to cos-agent relation + try: + self.update_tracing_receivers() + except ModelError: + raise + + # Copy data from the cos_agent relation to the peer relation, so the leader could # follow up. # Save the originating unit name, so it could be used for topology later on by the leader. data = CosAgentPeersUnitData( # peer relation databag model - principal_unit_name=event.unit.name, - principal_relation_id=str(event.relation.id), - principal_relation_name=event.relation.name, + unit_name=event.unit.name, + relation_id=str(event.relation.id), + relation_name=event.relation.name, metrics_alert_rules=provider_data.metrics_alert_rules, log_alert_rules=provider_data.log_alert_rules, dashboards=provider_data.dashboards, ) - self.peer_relation.data[self._charm.unit][data.KEY] = data.json() + self.peer_relation.data[self._charm.unit][ + f"{CosAgentPeersUnitData.KEY}-{event.unit.name}" + ] = data.json() # We can't easily tell if the data that was changed is limited to only the data # that goes into peer relation (in which case, if this is not a leader unit, we wouldn't # need to emit `on.data_changed`), so we're emitting `on.data_changed` either way. self.on.data_changed.emit() # pyright: ignore + def update_tracing_receivers(self): + """Updates the list of exposed tracing receivers in all relations.""" + try: + for relation in self._charm.model.relations[self._relation_name]: + CosAgentRequirerUnitData( + receivers=[ + Receiver( + url=f"{self._get_tracing_receiver_url(protocol)}", + protocol=ProtocolType( + name=protocol, + type=receiver_protocol_to_transport_protocol[protocol], + ), + ) + for protocol in self.requested_tracing_protocols() + ], + ).dump(relation.data[self._charm.unit]) + + except ModelError as e: + # args are bytes + msg = e.args[0] + if isinstance(msg, bytes): + if msg.startswith( + b"ERROR cannot read relation application settings: permission denied" + ): + logger.error( + f"encountered error {e} while attempting to update_relation_data." + f"The relation must be gone." + ) + return + raise + def _validated_provider_data(self, raw) -> Optional[CosAgentProviderUnitData]: try: return CosAgentProviderUnitData(**json.loads(raw)) @@ -608,54 +1024,84 @@ def trigger_refresh(self, _): # FIXME: Figure out what we should do here self.on.data_changed.emit() # pyright: ignore - @property - def _principal_unit(self) -> Optional[Unit]: - """Return the principal unit for a relation. + def _get_requested_protocols(self, relation: Relation): + # Coherence check + units = relation.units + if len(units) > 1: + # should never happen + raise ValueError( + f"unexpected error: subordinate relation {relation} " + f"should have exactly one unit" + ) - Assumes that the relation is of type subordinate. - Relies on the fact that, for subordinate relations, the only remote unit visible to - *this unit* is the principal unit that this unit is attached to. - """ - if relations := self._principal_relations: - # Technically it's a list, but for subordinates there can only be one relation - principal_relation = next(iter(relations)) - if units := principal_relation.units: - # Technically it's a list, but for subordinates there can only be one - return next(iter(units)) + unit = next(iter(units), None) - return None + if not unit: + return None - @property - def _principal_relations(self): - # Technically it's a list, but for subordinates there can only be one. - return self._charm.model.relations[self._relation_name] + if not (raw := relation.data[unit].get(CosAgentProviderUnitData.KEY)): + return None + + if not (provider_data := self._validated_provider_data(raw)): + return None + + return provider_data.tracing_protocols + + def requested_tracing_protocols(self): + """All receiver protocols that have been requested by our related apps.""" + requested_protocols = set() + for relation in self._charm.model.relations[self._relation_name]: + try: + protocols = self._get_requested_protocols(relation) + except NotReadyError: + continue + if protocols: + requested_protocols.update(protocols) + return requested_protocols + + def _get_tracing_receiver_url(self, protocol: str): + scheme = "http" + try: + if self._charm.cert.enabled: # type: ignore + scheme = "https" + # not only Grafana Agent can implement cos_agent. If the charm doesn't have the `cert` attribute + # using our cert_handler, it won't have the `enabled` parameter. In this case, we pass and assume http. + except AttributeError: + pass + # the assumption is that a subordinate charm will always be accessible to its principal charm under its fqdn + if receiver_protocol_to_transport_protocol[protocol] == TransportProtocolType.grpc: + return f"{socket.getfqdn()}:{_tracing_receivers_ports[protocol]}" + return f"{scheme}://{socket.getfqdn()}:{_tracing_receivers_ports[protocol]}" @property - def _principal_unit_data(self) -> Optional[CosAgentProviderUnitData]: - """Return the principal unit's data. + def _remote_data(self) -> List[Tuple[CosAgentProviderUnitData, JujuTopology]]: + """Return a list of remote data from each of the related units. Assumes that the relation is of type subordinate. Relies on the fact that, for subordinate relations, the only remote unit visible to *this unit* is the principal unit that this unit is attached to. """ - if not (relations := self._principal_relations): - return None - - # Technically it's a list, but for subordinates there can only be one relation - principal_relation = next(iter(relations)) + all_data = [] - if not (units := principal_relation.units): - return None + for relation in self._charm.model.relations[self._relation_name]: + if not relation.units: + continue + unit = next(iter(relation.units)) + if not (raw := relation.data[unit].get(CosAgentProviderUnitData.KEY)): + continue + if not (provider_data := self._validated_provider_data(raw)): + continue - # Technically it's a list, but for subordinates there can only be one - unit = next(iter(units)) - if not (raw := principal_relation.data[unit].get(CosAgentProviderUnitData.KEY)): - return None + topology = JujuTopology( + model=self._charm.model.name, + model_uuid=self._charm.model.uuid, + application=unit.app.name, + unit=unit.name, + ) - if not (provider_data := self._validated_provider_data(raw)): - return None + all_data.append((provider_data, topology)) - return provider_data + return all_data def _gather_peer_data(self) -> List[CosAgentPeersUnitData]: """Collect data from the peers. @@ -673,18 +1119,21 @@ def _gather_peer_data(self) -> List[CosAgentPeersUnitData]: app_names: Set[str] = set() for unit in chain((self._charm.unit,), relation.units): - if not relation.data.get(unit) or not ( - raw := relation.data[unit].get(CosAgentPeersUnitData.KEY) - ): - logger.info(f"peer {unit} has not set its primary data yet; skipping for now...") + if not relation.data.get(unit): continue - data = CosAgentPeersUnitData(**json.loads(raw)) - app_name = data.app_name - # Have we already seen this principal app? - if app_name in app_names: - continue - peer_data.append(data) + for unit_name in relation.data.get(unit): # pyright: ignore + if not unit_name.startswith(CosAgentPeersUnitData.KEY): + continue + raw = relation.data[unit].get(unit_name) + if raw is None: + continue + data = CosAgentPeersUnitData(**json.loads(raw)) + # Have we already seen this principal app? + if (app_name := data.app_name) in app_names: + continue + peer_data.append(data) + app_names.add(app_name) return peer_data @@ -720,7 +1169,7 @@ def metrics_alerts(self) -> Dict[str, Any]: def metrics_jobs(self) -> List[Dict]: """Parse the relation data contents and extract the metrics jobs.""" scrape_jobs = [] - if data := self._principal_unit_data: + for data, topology in self._remote_data: for job in data.metrics_scrape_jobs: # In #220, relation schema changed from a simplified dict to the standard # `scrape_configs`. @@ -730,6 +1179,26 @@ def metrics_jobs(self) -> List[Dict]: "job_name": job["job_name"], "metrics_path": job["path"], "static_configs": [{"targets": [f"localhost:{job['port']}"]}], + # We include insecure_skip_verify because we are always scraping localhost. + # Even if we have the certs for the scrape targets, we'd rather specify the scrape + # jobs with localhost rather than the SAN DNS the cert was issued for. + "tls_config": {"insecure_skip_verify": True}, + } + + # Apply labels to the scrape jobs + for static_config in job.get("static_configs", []): + topo_as_dict = topology.as_dict(excluded_keys=["charm_name"]) + static_config["labels"] = { + # Be sure to keep labels from static_config + **static_config.get("labels", {}), + # TODO: We should add a new method in juju_topology.py + # that like `as_dict` method, returns the keys with juju_ prefix + # https://github.com/canonical/cos-lib/issues/18 + **{ + "juju_{}".format(key): value + for key, value in topo_as_dict.items() + if value + }, } scrape_jobs.append(job) @@ -739,8 +1208,18 @@ def metrics_jobs(self) -> List[Dict]: @property def snap_log_endpoints(self) -> List[SnapEndpoint]: """Fetch logging endpoints exposed by related snaps.""" + endpoints = [] + endpoints_with_topology = self.snap_log_endpoints_with_topology + for endpoint, _ in endpoints_with_topology: + endpoints.append(endpoint) + + return endpoints + + @property + def snap_log_endpoints_with_topology(self) -> List[Tuple[SnapEndpoint, JujuTopology]]: + """Fetch logging endpoints and charm topology for each related snap.""" plugs = [] - if data := self._principal_unit_data: + for data, topology in self._remote_data: targets = data.log_slots if targets: for target in targets: @@ -751,15 +1230,16 @@ def snap_log_endpoints(self) -> List[SnapEndpoint]: "endpoints; this should not happen." ) else: - plugs.append(target) + plugs.append((target, topology)) endpoints = [] - for plug in plugs: + for plug, topology in plugs: if ":" not in plug: logger.error(f"invalid plug definition received: {plug}. Ignoring...") else: endpoint = SnapEndpoint(*plug.split(":")) - endpoints.append(endpoint) + endpoints.append((endpoint, topology)) + return endpoints @property @@ -780,7 +1260,7 @@ def logs_alerts(self) -> Dict[str, Any]: model=self._charm.model.name, model_uuid=self._charm.model.uuid, application=app_name, - # For the topology unit, we could use `data.principal_unit_name`, but that unit + # For the topology unit, we could use `data.unit_name`, but that unit # name may not be very stable: `_gather_peer_data` de-duplicates by app name so # the exact unit name that turns up first in the iterator may vary from time to # time. So using the grafana-agent unit name instead. @@ -813,12 +1293,76 @@ def dashboards(self) -> List[Dict[str, str]]: dashboards.append( { - "relation_id": data.principal_relation_id, + "relation_id": data.relation_id, # We have the remote charm name - use it for the identifier - "charm": f"{data.principal_relation_name}-{app_name}", + "charm": f"{data.relation_name}-{app_name}", "content": content, "title": title, } ) return dashboards + + +def charm_tracing_config( + endpoint_requirer: COSAgentProvider, cert_path: Optional[Union[Path, str]] +) -> Tuple[Optional[str], Optional[str]]: + """Utility function to determine the charm_tracing config you will likely want. + + If no endpoint is provided: + disable charm tracing. + If https endpoint is provided but cert_path is not found on disk: + disable charm tracing. + If https endpoint is provided and cert_path is None: + ERROR + Else: + proceed with charm tracing (with or without tls, as appropriate) + + Usage: + If you are using charm_tracing >= v1.9: + >>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm + >>> from lib.charms.tempo_k8s.v0.cos_agent import charm_tracing_config + >>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path") + >>> class MyCharm(...): + >>> _cert_path = "/path/to/cert/on/charm/container.crt" + >>> def __init__(self, ...): + >>> self.cos_agent = COSAgentProvider(...) + >>> self.my_endpoint, self.cert_path = charm_tracing_config( + ... self.cos_agent, self._cert_path) + + If you are using charm_tracing < v1.9: + >>> from lib.charms.tempo_k8s.v1.charm_tracing import trace_charm + >>> from lib.charms.tempo_k8s.v2.tracing import charm_tracing_config + >>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path") + >>> class MyCharm(...): + >>> _cert_path = "/path/to/cert/on/charm/container.crt" + >>> def __init__(self, ...): + >>> self.cos_agent = COSAgentProvider(...) + >>> self.my_endpoint, self.cert_path = charm_tracing_config( + ... self.cos_agent, self._cert_path) + >>> @property + >>> def my_endpoint(self): + >>> return self._my_endpoint + >>> @property + >>> def cert_path(self): + >>> return self._cert_path + + """ + if not endpoint_requirer.is_ready(): + return None, None + + endpoint = endpoint_requirer.get_tracing_endpoint("otlp_http") + if not endpoint: + return None, None + + is_https = endpoint.startswith("https://") + + if is_https: + if cert_path is None: + raise TracingError("Cannot send traces to an https endpoint without a certificate.") + if not Path(cert_path).exists(): + # if endpoint is https BUT we don't have a server_cert yet: + # disable charm tracing until we do to prevent tls errors + return None, None + return endpoint, str(cert_path) + return endpoint, None diff --git a/src/charm.py b/src/charm.py index 339126f8..ecbb9251 100755 --- a/src/charm.py +++ b/src/charm.py @@ -28,12 +28,14 @@ from events.tls import TLSHandler from literals import ( CHARM_KEY, + GROUP, JMX_CC_PORT, JMX_EXPORTER_PORT, LOGS_RULES_DIR, METRICS_RULES_DIR, OS_REQUIREMENTS, SUBSTRATE, + USER, DebugLevel, Status, ) @@ -70,6 +72,13 @@ def __init__(self, *args): metrics_rules_dir=METRICS_RULES_DIR, logs_rules_dir=LOGS_RULES_DIR, log_slots=[f"{self.workload.SNAP_NAME}:{slot}" for slot in self.workload.LOG_SLOTS], + tracing_protocols=[ + "otlp_grpc", + "otlp_http", + "zipkin", + "jaeger_thrift_http", + "jaeger_grpc", + ] ) self.framework.observe(getattr(self.on, "install"), self._on_install) @@ -92,6 +101,12 @@ def _on_install(self, _) -> None: self._set_status(Status.SNAP_NOT_INSTALLED) return + command = ["wget", "https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases/latest/download/opentelemetry-javaagent.jar"] + self.workload.exec(command=command, working_dir=self.workload.paths.conf_path) + + self.workload.exec(["chmod", "-R", "777", f"{self.workload.paths.telemetry_jar}"]) + self.workload.exec(["chown", "-R", f"{USER}:{GROUP}", f"{self.workload.paths.telemetry_jar}"]) + self._set_os_config() def _set_os_config(self) -> None: diff --git a/src/core/workload.py b/src/core/workload.py index 53374f54..5efa3759 100644 --- a/src/core/workload.py +++ b/src/core/workload.py @@ -97,6 +97,11 @@ def jmx_cc_config(self): """The configuration for the CruiseControl JMX exporter.""" return f"{BALANCER.paths['CONF']}/jmx_cruise_control.yaml" + @property + def telemetry_jar(self): + """Telemetry JAR.""" + return f"{self.conf_path}/opentelemetry-javaagent.jar" + @property def cruise_control_properties(self): """The cruisecontrol.properties filepath.""" diff --git a/src/managers/config.py b/src/managers/config.py index fdd0316c..fae6c218 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -163,7 +163,7 @@ def log_level(self) -> str: return "KAFKA_CFG_LOGLEVEL=WARN" return f"KAFKA_CFG_LOGLEVEL={self.config.log_level}" - + @property def kafka_jmx_opts(self) -> str: """The JMX options for configuring the prometheus exporter. @@ -284,6 +284,8 @@ def kafka_opts(self) -> str: f"-Djava.security.auth.login.config={self.workload.paths.zk_jaas}", ] + opts.append(f"-javaagent:{self.workload.paths.telemetry_jar}") + http_proxy = os.environ.get("JUJU_CHARM_HTTP_PROXY") https_proxy = os.environ.get("JUJU_CHARM_HTTPS_PROXY") no_proxy = os.environ.get("JUJU_CHARM_NO_PROXY") From 013e7cd0fc4bcf6ddacf3ad4de55de393ad61748 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Wed, 30 Oct 2024 14:39:27 +0100 Subject: [PATCH 2/3] add tracing config --- src/charm.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/charm.py b/src/charm.py index ecbb9251..1aaa1765 100755 --- a/src/charm.py +++ b/src/charm.py @@ -8,7 +8,7 @@ import time from charms.data_platform_libs.v0.data_models import TypedCharmBase -from charms.grafana_agent.v0.cos_agent import COSAgentProvider +from charms.grafana_agent.v0.cos_agent import COSAgentProvider, charm_tracing_config from charms.operator_libs_linux.v0 import sysctl from charms.rolling_ops.v0.rollingops import RollingOpsManager, RunWithLock from ops import ( @@ -73,14 +73,13 @@ def __init__(self, *args): logs_rules_dir=LOGS_RULES_DIR, log_slots=[f"{self.workload.SNAP_NAME}:{slot}" for slot in self.workload.LOG_SLOTS], tracing_protocols=[ - "otlp_grpc", + # "otlp_grpc", "otlp_http", - "zipkin", - "jaeger_thrift_http", - "jaeger_grpc", ] ) + # self._telemetry_endpoint = charm_tracing_config(endpoint_requirer=self._grafana_agent, cert_path=None) + self.framework.observe(getattr(self.on, "install"), self._on_install) self.framework.observe(getattr(self.on, "remove"), self._on_remove) self.framework.observe(getattr(self.on, "config_changed"), self._on_roles_changed) From 358477a6f7c65c7210298c49b20f780c9dd05b78 Mon Sep 17 00:00:00 2001 From: Raul Zamora Date: Fri, 1 Nov 2024 21:52:19 +0100 Subject: [PATCH 3/3] fix otel params --- src/managers/config.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/managers/config.py b/src/managers/config.py index fae6c218..454cfd28 100644 --- a/src/managers/config.py +++ b/src/managers/config.py @@ -284,7 +284,9 @@ def kafka_opts(self) -> str: f"-Djava.security.auth.login.config={self.workload.paths.zk_jaas}", ] - opts.append(f"-javaagent:{self.workload.paths.telemetry_jar}") + opts.append( + f"-javaagent:{self.workload.paths.telemetry_jar} -Dotel.traces.exporter=otlp -Dotel.service.name={self.state.model.app.name}-{self.state.model.unit.name}" + ) http_proxy = os.environ.get("JUJU_CHARM_HTTP_PROXY") https_proxy = os.environ.get("JUJU_CHARM_HTTPS_PROXY")