Skip to content

Commit

Permalink
DM-47389: Publish app metrics events
Browse files Browse the repository at this point in the history
  • Loading branch information
fajpunk committed Dec 17, 2024
1 parent 1e726ab commit 4601389
Show file tree
Hide file tree
Showing 40 changed files with 1,556 additions and 748 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: check-toml

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.0
rev: v0.8.3
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/20241217_101939_danfuchs_app_metrics_for_real.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- Publish [application metrics](https://safir.lsst.io/user-guide/metrics/index.html)
11 changes: 8 additions & 3 deletions docs/development/github.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ You can use https://smee.io to proxy GitHub webhook requests from the data-dev M
#. Configure the [data-dev GitHub Mobu CI app](https://github.com/organizations/lsst-sqre/settings/apps/mobu-ci-data-dev-lsst-cloud) to send webooks to the smee URL.
#. Run your local mobu against the ``idfdev`` env, as described :doc:`here <idfdev>`.
#. Point your local mobu at a local `Safir <https://github.com/lsst-sqre/safir>`__ .
#. Point your local mobu at a local or remote git `Safir <https://github.com/lsst-sqre/safir>`__ .

.. code-block:: diff
Expand All @@ -36,11 +36,16 @@ You can use https://smee.io to proxy GitHub webhook requests from the data-dev M
pyyaml
-safir>=5.0.0
+# safir>=5.0.0
+-e /home/danfuchs/src/safir
# For testing against a local safir
+-e /home/danfuchs/src/safir/safir
# For testing against a remote git safir
+safir @ git+https://github.com/lsst-sqre/safir@<branch>#subdirectory=safir
shortuuid
structlog
#. Patch your local Safir to handle the malformed requests that smee.io sends.
#. Patch your local or remote git Safir to handle the malformed requests that smee.io sends.
The requests sent by the smee proxy have ``:port`` suffixes in the ``X-Forwarded-For`` values.
Safir doesn't handle this (and I don't think it's Safir's fault; I _think_ the port should be in ``X-Forwarded-Port``), so you have to change Safir:

Expand Down
1 change: 1 addition & 0 deletions requirements/dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-c main.txt

# Testing
anys
asgi-lifespan
coverage[toml]
documenteer[guide]
Expand Down
420 changes: 218 additions & 202 deletions requirements/dev.txt

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions requirements/main.in
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ pydantic-settings>=2.6.1
pyvo<1.6
pyyaml
rubin-nublado-client>=8.0.3
safir>=6.5.1
safir>=9.0.1
shortuuid
structlog
websockets

# Uncomment this, change the branch, comment out safir above, and run make
# update-deps-no-hashes to test against an unreleased version of Safir.
# safir @ git+https://github.com/lsst-sqre/safir@tickets/DM-38272
#safir @ git+https://github.com/lsst-sqre/safir@tickets/DM-38272#subdirectory=safir
# Similar for rubin-nublado-client:
#rubin-nublado-client@git+https://github.com/lsst-sqre/nublado@tickets/DM-45702#subdirectory=client
661 changes: 300 additions & 361 deletions requirements/main.txt

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions ruff-shared.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ ignore = [
"S607", # using PATH is not a security vulnerability
"SIM102", # sometimes the formatting of nested if statements is clearer
"SIM117", # sometimes nested with contexts are clearer
"TCH001", # we decided to not maintain separate TYPE_CHECKING blocks
"TCH002", # we decided to not maintain separate TYPE_CHECKING blocks
"TCH003", # we decided to not maintain separate TYPE_CHECKING blocks
"TC001", # we decided to not maintain separate TYPE_CHECKING blocks
"TC002", # we decided to not maintain separate TYPE_CHECKING blocks
"TC003", # we decided to not maintain separate TYPE_CHECKING blocks
"TD003", # we don't require issues be created for TODOs
"TID252", # if we're going to use relative imports, use them always
"TRY003", # good general advice but lint is way too aggressive
Expand Down
6 changes: 6 additions & 0 deletions src/mobu/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pydantic.alias_generators import to_camel
from pydantic_settings import BaseSettings, SettingsConfigDict
from safir.logging import LogLevel, Profile
from safir.metrics import MetricsConfiguration, metrics_configuration_factory

from mobu.models.flock import FlockConfig

Expand Down Expand Up @@ -237,6 +238,11 @@ class Configuration(BaseSettings):
title="Log level of the application's logger",
)

metrics: MetricsConfiguration = Field(
default_factory=metrics_configuration_factory,
title="Metrics configuration",
)

github_ci_app: GitHubCiAppConfig | None = Field(
None,
title="GitHub CI app config",
Expand Down
9 changes: 7 additions & 2 deletions src/mobu/dependencies/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
from safir.dependencies.gafaelfawr import auth_logger_dependency
from safir.dependencies.http_client import http_client_dependency
from safir.dependencies.logger import logger_dependency
from safir.metrics import EventManager
from structlog.stdlib import BoundLogger

from ..events import Events
from ..factory import Factory, ProcessContext
from ..services.manager import FlockManager

Expand Down Expand Up @@ -87,12 +89,15 @@ def process_context(self) -> ProcessContext:
raise RuntimeError("ContextDependency not initialized")
return self._process_context

async def initialize(self) -> None:
async def initialize(self, event_manager: EventManager) -> None:
"""Initialize the process-wide shared context."""
if self._process_context:
await self._process_context.aclose()
http_client = await http_client_dependency()
self._process_context = ProcessContext(http_client)
events = Events()
self._process_context = ProcessContext(http_client, events)
event_manager.logger = self.process_context.logger
await events.initialize(event_manager)

async def aclose(self) -> None:
"""Clean up the per-process configuration."""
Expand Down
1 change: 1 addition & 0 deletions src/mobu/dependencies/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def initialize(
github_private_key=github_private_key,
scopes=scopes,
http_client=base_context.process_context.http_client,
events=base_context.process_context.events,
gafaelfawr_storage=base_context.process_context.gafaelfawr,
logger=base_context.process_context.logger,
)
Expand Down
103 changes: 103 additions & 0 deletions src/mobu/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""App metrics events."""

from datetime import timedelta
from typing import override

from safir.dependencies.metrics import EventMaker
from safir.metrics import EventManager, EventPayload


class EventBase(EventPayload):
"""Attributes on every mobu event."""

flock: str | None
business: str
username: str


class NotebookBase(EventBase):
"""Attributes for all notebook-related events."""

notebook: str
repo: str
repo_ref: str
repo_hash: str


class NotebookExecution(NotebookBase):
"""Reported after a notebook is finished executing."""

duration: timedelta | None
success: bool


class NotebookCellExecution(NotebookBase):
"""Reported after a notebook cell is finished executing."""

duration: timedelta | None
cell_id: str
success: bool


class NubladoPythonExecution(EventBase):
"""Reported after a nublado python execution."""

duration: timedelta | None
success: bool
code: str


class NubladoSpawnLab(EventBase):
"""Reported for every attempt to spawn a lab."""

duration: timedelta
success: bool


class NubladoDeleteLab(EventBase):
"""Reported for every attempt to delete a lab."""

duration: timedelta
success: bool


class GitLfsCheck(EventBase):
"""Reported from Git LFS businesses."""

success: bool
duration: timedelta | None = None


class TapQuery(EventBase):
"""Reported when a TAP query is executed."""

success: bool
duration: timedelta | None
sync: bool


class Events(EventMaker):
"""Container for app metrics event publishers."""

@override
async def initialize(self, manager: EventManager) -> None:
self.tap_query = await manager.create_publisher("tap_query", TapQuery)
self.git_lfs_check = await manager.create_publisher(
"git_lfs_check", GitLfsCheck
)
self.notebook_execution = await manager.create_publisher(
"notebook_execution", NotebookExecution
)
self.notebook_cell_execution = await manager.create_publisher(
"notebook_cell_execution", NotebookCellExecution
)
self.nublado_python_execution = await manager.create_publisher(
"nublado_python_execution", NubladoPythonExecution
)
self.nublado_spawn_lab = await manager.create_publisher(
"nublado_spawn_lab", NubladoSpawnLab
)

self.nublado_delete_lab = await manager.create_publisher(
"nublado_delete_", NubladoDeleteLab
)
15 changes: 13 additions & 2 deletions src/mobu/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from structlog.stdlib import BoundLogger

from .dependencies.config import config_dependency
from .events import Events
from .models.solitary import SolitaryConfig
from .services.manager import FlockManager
from .services.solitary import Solitary
Expand All @@ -26,22 +27,31 @@ class ProcessContext:
----------
http_client
Shared HTTP client.
events
Event publishers.
Attributes
----------
http_client
Shared HTTP client.
manager
Manager for all running flocks.
events
Object with attributes for all metrics event publishers.
"""

def __init__(self, http_client: AsyncClient) -> None:
def __init__(self, http_client: AsyncClient, events: Events) -> None:
self.http_client = http_client
self.logger = structlog.get_logger("mobu")
self.gafaelfawr = GafaelfawrStorage(self.http_client, self.logger)
self.events = events
self.manager = FlockManager(
self.gafaelfawr, self.http_client, self.logger
gafaelfawr_storage=self.gafaelfawr,
http_client=self.http_client,
logger=self.logger,
events=self.events,
)
self.events = events

async def aclose(self) -> None:
"""Clean up a process context.
Expand Down Expand Up @@ -104,6 +114,7 @@ def create_solitary(self, solitary_config: SolitaryConfig) -> Solitary:
self._context.http_client, self._logger
),
http_client=self._context.http_client,
events=self._context.events,
logger=self._logger,
)

Expand Down
5 changes: 4 additions & 1 deletion src/mobu/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
if not config.gafaelfawr_token:
raise RuntimeError("MOBU_GAFAELFAWR_TOKEN was not set")

await context_dependency.initialize()
event_manager = config.metrics.make_manager()
await event_manager.initialize()
await context_dependency.initialize(event_manager)

await context_dependency.process_context.manager.autostart()

status_interval = timedelta(days=1)
Expand Down
30 changes: 29 additions & 1 deletion src/mobu/services/business/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from collections.abc import AsyncIterable, AsyncIterator
from datetime import timedelta
from enum import Enum
from typing import Generic, TypeVar
from typing import Generic, TypedDict, TypeVar

from httpx import AsyncClient
from safir.datetime import current_datetime
from structlog.stdlib import BoundLogger

from ...asyncio import wait_first
from ...events import Events
from ...models.business.base import BusinessData, BusinessOptions
from ...models.user import AuthenticatedUser
from ..timings import Timings
Expand All @@ -25,6 +26,12 @@
__all__ = ["Business"]


class CommonEventAttrs(TypedDict):
flock: str | None
username: str
business: str


class BusinessCommand(Enum):
"""Commands sent over the internal control queue."""

Expand Down Expand Up @@ -57,8 +64,12 @@ class Business(Generic[T], metaclass=ABCMeta):
User with their authentication token to use to run the business.
http_client
Shared HTTP client.
events
Event publishers.
logger
Logger to use to report the results of business.
flock
Flock that is running this business, if it is running in a flock.
Attributes
----------
Expand All @@ -68,6 +79,8 @@ class Business(Generic[T], metaclass=ABCMeta):
User with their authentication token to use to run the business.
http_client
Shared HTTP client.
events
Event publishers.
logger
Logger to use to report the results of business. This will generally
be attached to a file rather than the main logger.
Expand All @@ -79,25 +92,32 @@ class Business(Generic[T], metaclass=ABCMeta):
Execution timings.
stopping
Whether `stop` has been called and further execution should stop.
flock
Flock that is running this business, if it is running in a flock.
"""

def __init__(
self,
*,
options: T,
user: AuthenticatedUser,
http_client: AsyncClient,
events: Events,
logger: BoundLogger,
flock: str | None,
) -> None:
self.options = options
self.user = user
self.http_client = http_client
self.events = events
self.logger = logger
self.success_count = 0
self.failure_count = 0
self.timings = Timings()
self.control: Queue[BusinessCommand] = Queue()
self.stopping = False
self.refreshing = False
self.flock = flock

# Methods that should be overridden by child classes if needed.

Expand Down Expand Up @@ -307,6 +327,14 @@ def dump(self) -> BusinessData:
timings=self.timings.dump(),
)

def common_event_attrs(self) -> CommonEventAttrs:
"""Attributes that are on every published event."""
return {
"flock": self.flock,
"username": self.user.username,
"business": self.__class__.__name__,
}

async def _pause_no_return(self, interval: timedelta) -> None:
"""Pause for up to an interval, handling commands.
Expand Down
Loading

0 comments on commit 4601389

Please sign in to comment.