From a6534ecbe595968cb3ab33c608bf06e1bfe39fa6 Mon Sep 17 00:00:00 2001 From: Dan Fuchs Date: Sun, 23 Jun 2024 15:27:33 -0500 Subject: [PATCH] Workers and manager to handle GitHub check runs --- pyproject.toml | 1 + src/mobu/config.py | 60 +++ src/mobu/constants.py | 17 +- src/mobu/exceptions.py | 7 + src/mobu/models/repo.py | 25 ++ src/mobu/services/github_ci/__init__.py | 0 src/mobu/services/github_ci/ci_manager.py | 337 +++++++++++++++++ .../services/github_ci/ci_notebook_job.py | 142 +++++++ src/mobu/storage/github.py | 339 +++++++++++++++++ tests/conftest.py | 44 ++- tests/services/__init__.py | 0 tests/services/ci_manager_test.py | 357 ++++++++++++++++++ tests/support/constants.py | 32 ++ tests/support/gafaelfawr.py | 4 +- tests/support/github.py | 316 ++++++++++++++++ 15 files changed, 1678 insertions(+), 3 deletions(-) create mode 100644 src/mobu/models/repo.py create mode 100644 src/mobu/services/github_ci/__init__.py create mode 100644 src/mobu/services/github_ci/ci_manager.py create mode 100644 src/mobu/services/github_ci/ci_notebook_job.py create mode 100644 src/mobu/storage/github.py create mode 100644 tests/services/__init__.py create mode 100644 tests/services/ci_manager_test.py create mode 100644 tests/support/github.py diff --git a/pyproject.toml b/pyproject.toml index 9214039f..60285141 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -161,6 +161,7 @@ ignore = [ "TID252", # if we're going to use relative imports, use them always "TRY003", # good general advice but lint is way too aggressive "TRY301", # sometimes raising exceptions inside try is the best flow + "UP040", # Python 3.12 supports `type` alias kw, but mypy doesn't yet # The following settings should be disabled when using ruff format # per https://docs.astral.sh/ruff/formatter/#conflicting-lint-rules diff --git a/src/mobu/config.py b/src/mobu/config.py index 951757a4..fce30b34 100644 --- a/src/mobu/config.py +++ b/src/mobu/config.py @@ -11,11 +11,65 @@ __all__ = [ "Configuration", + "GitHubCiApp", "GitHubRefreshApp", "config", ] +class GitHubCiApp(BaseSettings): + """Configuration for GitHub CI app functionality.""" + + enabled: bool = Field( + False, + title="Whether to enable the GitHub CI app functionality", + validation_alias="MOBU_GITHUB_CI_APP_ENABLED", + ) + + id: int | None = Field( + None, + title="Github CI app id", + description=( + "Found on the GitHub app's settings page (NOT the installation" + " configuration page). For example:" + " https://github.com/organizations/lsst-sqre/settings/apps/mobu-ci-data-dev-lsst-cloud" + ), + validation_alias="MOBU_GITHUB_CI_APP_ID", + examples=[123456], + ) + + private_key: str | None = Field( + None, + title="Github CI app private key", + description=( + "Generated when the GitHub app was set up. This should NOT be" + " base64 enocded, and will contain newlines. You can find this" + " in 1Password; check the Phalanx mobu values for more details." + ), + validation_alias="MOBU_GITHUB_CI_APP_PRIVATE_KEY", + examples=[ + dedent(""" + -----BEGIN RSA PRIVATE KEY----- + abc123MeowMeow456abc123MeowMeow456abc123MeowMeow456abc123MeowMeo + abc123MeowMeow456abc123MeowMeow456abc123MeowMeow456abc123MeowMeo + abc123MeowMeow456abc123MeowMeow456abc123MeowMeow456abc123MeowMeo + etc, etc + -----END RSA PRIVATE KEY----- + """) + ], + ) + + webhook_secret: str | None = Field( + None, + title="Github CI app webhook secret", + description=( + "Generated when the GitHub app was set up. You can find this" + " in 1Password; check the Phalanx mobu values for more details." + ), + validation_alias="MOBU_GITHUB_CI_APP_WEBHOOK_SECRET", + ) + + class GitHubRefreshApp(BaseSettings): """Configuration for GitHub refresh app functionality.""" @@ -86,7 +140,13 @@ class Configuration(BaseSettings): examples=["gt-vilSCi1ifK_MyuaQgMD2dQ.d6SIJhowv5Hs3GvujOyUig"], ) + github_ci_app: GitHubCiApp = Field(GitHubCiApp()) + + github_config_path: Path | None = Field( None, + title="Path to YAML file defining settings for GitHub app integration", + validation_alias="MOBU_GITHUB_CONFIG_PATH", + examples=["/etc/mobu/github_config.yaml"], ) github_refresh_app: GitHubRefreshApp = Field(GitHubRefreshApp()) diff --git a/src/mobu/constants.py b/src/mobu/constants.py index a9b96366..8c3e8360 100644 --- a/src/mobu/constants.py +++ b/src/mobu/constants.py @@ -3,16 +3,31 @@ from __future__ import annotations from datetime import timedelta +from pathlib import Path __all__ = [ + "GITHUB_CI_SCOPES", + "GITHUB_REPO_CONFIG_PATH", "GITHUB_WEBHOOK_WAIT_SECONDS", - "NOTEBOOK_REPO_URL", "NOTEBOOK_REPO_BRANCH", + "NOTEBOOK_REPO_URL", "TOKEN_LIFETIME", "USERNAME_REGEX", "WEBSOCKET_OPEN_TIMEOUT", ] + +GITHUB_CI_SCOPES = [ + "exec:notebook", + "exec:portal", + "read:image", + "read:tap", +] +"""All NotebookRunner business run via GitHub CI get these scopes.""" + +GITHUB_REPO_CONFIG_PATH = Path("mobu.yaml") +"""The path to a config file with repo-specific configuration.""" + GITHUB_WEBHOOK_WAIT_SECONDS = 1 """GithHub needs some time to actually be in the state in a webhook payload.""" diff --git a/src/mobu/exceptions.py b/src/mobu/exceptions.py index 1890ad64..8f744b4d 100644 --- a/src/mobu/exceptions.py +++ b/src/mobu/exceptions.py @@ -34,6 +34,7 @@ "FlockNotFoundError", "GafaelfawrParseError", "GafaelfawrWebError", + "GitHubFileNotFoundError", "JupyterProtocolError", "JupyterTimeoutError", "JupyterWebError", @@ -347,6 +348,12 @@ def to_slack(self) -> SlackMessage: ) +class GitHubFileNotFoundError(Exception): + """Tried to retrieve contents for a non-existent file in a GitHub + repo. + """ + + class JupyterProtocolError(MobuSlackException): """Some error occurred when talking to JupyterHub or JupyterLab.""" diff --git a/src/mobu/models/repo.py b/src/mobu/models/repo.py new file mode 100644 index 00000000..484abc28 --- /dev/null +++ b/src/mobu/models/repo.py @@ -0,0 +1,25 @@ +"""Models related to GitHub repos for the GitHub CI app functionality.""" + +from pathlib import Path + +from pydantic import BaseModel, ConfigDict, Field + + +class RepoConfig(BaseModel): + """In-repo configuration for mobu behavior. + + This can be placed into a yaml file in the root of a repo to configure + certain mobu behavior. + """ + + exclude_dirs: set[Path] = Field( + set(), + title="Any notebooks in these directories will not be run", + description=( + " These directories are relative to the repo root. Any notebooks" + " in child directories of these directories will also be excluded." + ), + examples=["some-dir", "some-dir/some-other-dir"], + ) + + model_config = ConfigDict(extra="forbid") diff --git a/src/mobu/services/github_ci/__init__.py b/src/mobu/services/github_ci/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mobu/services/github_ci/ci_manager.py b/src/mobu/services/github_ci/ci_manager.py new file mode 100644 index 00000000..c57cb519 --- /dev/null +++ b/src/mobu/services/github_ci/ci_manager.py @@ -0,0 +1,337 @@ +"""Manager for background workers that process work from GitHub checks.""" + +from __future__ import annotations + +import asyncio +from asyncio import Queue +from dataclasses import dataclass, field +from typing import TypeAlias + +from aiojobs import Job, Scheduler +from httpx import AsyncClient +from safir.github import GitHubAppClientFactory +from structlog.stdlib import BoundLogger + +from ...config import config +from ...models.ci_manager import CiManagerSummary, CiWorkerSummary +from ...models.user import User +from ...storage.gafaelfawr import GafaelfawrStorage +from ...storage.github import GitHubStorage +from .ci_notebook_job import CiNotebookJob + +__all__ = ["CiManager"] + + +@dataclass +class CiManagerLifecycle: + marked_remaining: asyncio.Event = field(default_factory=asyncio.Event) + + +@dataclass +class JobLifecycle: + processing: asyncio.Event = field(default_factory=asyncio.Event) + processed: asyncio.Event = field(default_factory=asyncio.Event) + + +@dataclass +class QueuedJob: + job: CiNotebookJob + lifecycle: JobLifecycle + + +QueueItem: TypeAlias = QueuedJob | None + + +class CiManager: + """Manages processing work for GitHub CI checks. + + This should be a process singleton. It is responsible for: + * Creating background workers to process GitHub CI events + * Ensuring they run at the appropriate level of concurrency given the + number of available users + * Ensuring GitHub CI checks are not left in a forever-in-progress state + when mobu shuts down + + Parameters + ---------- + users + A list of static users that are available to run jobs. Each of these + users will get assigned to a worker, and will process one job at a + time. + http_client + Shared HTTP client. + gafaelfawr_storage + Gafaelfawr storage client. + logger + Global logger to use for process-wide (not monkey) logging. + """ + + shutdown_error_msg = "Mobu stopped, try re-running this check." + + def __init__( + self, + users: list[User], + http_client: AsyncClient, + gafaelfawr_storage: GafaelfawrStorage, + logger: BoundLogger, + ) -> None: + self._users = users + self._gafaelfawr = gafaelfawr_storage + self._http_client = http_client + self._logger = logger.bind(ci_manager=True) + self._scheduler: Scheduler = Scheduler() + self._queue: Queue[QueueItem] = Queue() + self._jobs: list[Job] = [] + self.workers: list[Worker] = [] + + # Used for deterministic testing + self.lifecycle = CiManagerLifecycle() + + if not config.github_ci_app.id: + raise RuntimeError("MOBU_GITHUB_CI_APP_ID was not set") + if not config.github_ci_app.webhook_secret: + raise RuntimeError("MOBU_GITHUB_CI_APP_WEBHOOK_SECRET was not set") + if not config.github_ci_app.private_key: + raise RuntimeError("MOBU_GITHUB_CI_APP_PRIVATE_KEY was not set") + self._factory = GitHubAppClientFactory( + id=config.github_ci_app.id, + key=config.github_ci_app.private_key, + name="lsst-sqre/mobu CI app", + http_client=http_client, + ) + + async def start(self) -> None: + """Start the workers for the CI manager.""" + self._logger.info("Starting CI manager...") + self.workers = [ + Worker( + user=user, + queue=self._queue, + logger=self._logger, + ) + for user in self._users + ] + self._jobs = [ + await self._scheduler.spawn(worker.run()) + for worker in self.workers + ] + self._logger.info("CI manager started") + + async def aclose(self) -> None: + """Stop the workers and update GitHub CI checks for pending work. + + We initially fail in-progress jobs too because we don't want any GitHub + check runs to be forever in-progress if mobu gets killed before they + finish. If they do end up finishing in time, then they'll be + re-concluded as successful. + + We'd rather have false-negative GitHub checks than forever-in-progress + checks, because: + * There is no way to re-run in-progress checks from the GitHub UI + * There is no way to know for sure from the GitHub UI that a check + will never be concluded. + + A failed check can easily be re-run by any user, so this is an + acceptable tradeoff, assuming false-negatives happen infrequently. + + Scenarios: + + ### Job completes + + 1. Mobu is told to shut down + 2. Mobu tells GitHub that job has failed due to restart + 3. Job finishes before mobu is SIGKILLed + 4. Mobu tells GitHub that job is successful + 5. Mobu is SIGKILLed or exits cleanly + + Result: GitHub check run displays success, which is corrcect. + An incorrect failure status will have been displayed for a + brief period of time. + User action needed: None + + ### Job does not complete + + 1. Mobu is told to shut down + 2. Mobu tells GitHub that job has failed due to restart + 3. Mobu is SIGKILLed + + Result: GitHub check run displays failure status, which is correct. + User action needed: Re-run check in GitHub UI + + ### Job completes, but mobu is killed before it can tell GitHub + + This situation should be pretty rare. + 1. Mobu is told to shut down + 2. Mobu tells GitHub that job has failed due to restart + 3. Job finishes before mobu is SIGKILLed + 4. Mobu is SIGKILLed before it can tell GitHub the job has succeeded + + Result: GitHub check run displays failure status, which is incorrect. + User action needed: Re-run check in GitHub UI + """ + self._logger.info("Stopping CI manager...") + + # Tell workers with in-progress jobs to stop after their current job + for worker in self.workers: + worker.stop() + + # Tell GitHub all checks for in progress jobs are failed in case we + # shutdown uncleanly + awaits = [ + worker.current_job.check_run.fail(error=self.shutdown_error_msg) + for worker in self.workers + if worker.current_job is not None + ] + await asyncio.gather(*awaits) + + # Tell GitHub all checks queued are failed + awaits = [] + while not self._queue.empty(): + item = await self._queue.get() + if item is not None: + awaits.append( + item.job.check_run.fail(error=self.shutdown_error_msg) + ) + await asyncio.gather(*awaits) + self.lifecycle.marked_remaining.set() + + # Tell workers listening on a currently empty queue to stop + for _ in self.workers: + await self._queue.put(None) + + # Wait for workers to finish any in-progress jobs + for job in self._jobs: + await job.wait() + + await self._scheduler.close() + self._logger.info("CI manager stopped") + + async def enqueue( + self, + installation_id: int, + repo_owner: str, + repo_name: str, + ref: str, + ) -> JobLifecycle: + """Enqueue a job to run something for a given Git repo and commit. + + Parameters + ---------- + installation_id + The GitHub installation ID of the app that generated this work. + repo_owner. + A GitHub organization name. + repo_name + A GitHub repo name. + ref + A GitHub commit SHA. + + Returns + ------- + JobLifecycle + Only used in unit tests. + + Helpful for creating deterministic ordering scenarios when + processing multiple jobs. + """ + storage = await GitHubStorage.create( + factory=self._factory, + installation_id=installation_id, + repo_name=repo_name, + repo_owner=repo_owner, + ref=ref, + ) + + check_run = await storage.create_check_run( + name=f"Mobu ({config.environment_url})", + summary="Waiting for Mobu to run...", + ) + + job = CiNotebookJob( + github_storage=storage, + check_run=check_run, + http_client=self._http_client, + logger=self._logger, + gafaelfawr_storage=self._gafaelfawr, + ) + lifecycle = JobLifecycle() + await self._queue.put(QueuedJob(job=job, lifecycle=lifecycle)) + return lifecycle + + def summarize(self) -> CiManagerSummary: + return CiManagerSummary.model_validate( + { + "workers": [worker.summarize() for worker in self.workers], + "num_queued": self._queue.qsize(), + } + ) + + +class Worker: + """Run mobu work with a particular User. + + Parameters + ---------- + user + The user to do the work as. + queue + A queue to get the work from. + logger + The context logger. + """ + + def __init__( + self, + user: User, + queue: Queue[QueueItem], + logger: BoundLogger, + ) -> None: + self._user = user + self._queue = queue + self._logger = logger.bind(ci_worker=user.username) + self._stopping = False + self._num_processed = 0 + + self.current_job: CiNotebookJob | None = None + + async def run(self) -> None: + """Pick up work from a queue until signaled to stop. + + The ``lifecycle`` logic is only used in unit tests to generate + deterministic scenarios involving many jobs and workers. + """ + self._logger.info("Worker started") + while item := await self._queue.get(): + job = item.job + lifecycle = item.lifecycle + self.current_job = job + lifecycle.processing.set() + self._logger.info( + f"Processing job: {job}, with user: {self._user}" + ) + + await job.run(user=self._user) + + lifecycle.processed.set() + self.current_job = None + self._logger.info(f"Finished job: {job}, with user: {self._user}") + self._num_processed += 1 + if self._stopping: + break + self._logger.info("Worker stopped") + + def stop(self) -> None: + """Don't pick up any more work after finishing the current job.""" + self._stopping = True + + def summarize(self) -> CiWorkerSummary: + """Information about this worker.""" + return CiWorkerSummary.model_validate( + { + "user": self._user, + "num_processed": self._num_processed, + "current_job": self.current_job.summarize() + if self.current_job + else None, + } + ) diff --git a/src/mobu/services/github_ci/ci_notebook_job.py b/src/mobu/services/github_ci/ci_notebook_job.py new file mode 100644 index 00000000..52157975 --- /dev/null +++ b/src/mobu/services/github_ci/ci_notebook_job.py @@ -0,0 +1,142 @@ +"""GitHub CI checks for notebook repos.""" + +from pathlib import Path +from typing import Any + +import yaml +from httpx import AsyncClient +from structlog.stdlib import BoundLogger + +from mobu.constants import GITHUB_CI_SCOPES, GITHUB_REPO_CONFIG_PATH +from mobu.exceptions import GitHubFileNotFoundError +from mobu.models.business.notebookrunner import ( + NotebookRunnerConfig, + NotebookRunnerOptions, +) +from mobu.models.solitary import SolitaryConfig +from mobu.models.user import User +from mobu.services.solitary import Solitary + +from ...models.ci_manager import CiJobSummary +from ...models.repo import RepoConfig +from ...storage.gafaelfawr import GafaelfawrStorage +from ...storage.github import CheckRun, GitHubStorage + + +class CiNotebookJob: + """Runs changed notebooks and updates a GitHub CI check. + + Parameters + ---------- + github_storage: + GitHub storage client. + check_run: + A GitHub storage check run. + http_client: + Shared HTTP client. + gafaelfawr_storage: + Gafaelfawr storage client. + logger: + Context logger. + + """ + + def __init__( + self, + github_storage: GitHubStorage, + check_run: CheckRun, + http_client: AsyncClient, + gafaelfawr_storage: GafaelfawrStorage, + logger: BoundLogger, + ) -> None: + self._github = github_storage + self.check_run = check_run + self._http_client = http_client + self._gafaelfawr = gafaelfawr_storage + self._logger = logger.bind(ci_job_type="NotebookJob") + self._notebooks: list[Path] = [] + + async def run(self, user: User) -> None: + """Run all relevant changed notebooks and report back to GitHub. + + Run only changed notebooks that aren't excluded in the mobu config + file in the repo. If there is no mobu config file, then don't exclude + any changed notebooks. + """ + # Get mobu config from repo, if it exists + exclude_dirs: set[Path] = set() + try: + raw_config = await self._github.get_file_content( + path=GITHUB_REPO_CONFIG_PATH + ) + parsed_config: dict[str, Any] = yaml.safe_load(raw_config) + repo_config = RepoConfig.model_validate(parsed_config) + exclude_dirs = repo_config.exclude_dirs + except GitHubFileNotFoundError: + self._logger.debug("Mobu config file not found in repo") + except Exception as exc: + await self.check_run.fail( + error=( + "Error retreiving and parsing config file " + f"{GITHUB_REPO_CONFIG_PATH}: {exc!s}" + ), + ) + return + + # Get changed notebook files + files = await self._github.get_changed_files() + + self._notebooks = [ + file + for file in files + if file.suffix == ".ipynb" + and not self._is_excluded(file, exclude_dirs) + ] + + # Don't do anything if there are no notebooks to run + if not bool(self._notebooks): + await self.check_run.succeed( + details="No changed notebooks to run.", + ) + return + + # Run notebooks using a Solitary runner + summary = "Running these notebooks via Mobu:\n" + "\n".join( + [f"* {notebook}" for notebook in self._notebooks] + ) + await self.check_run.start(summary=summary) + solitary_config = SolitaryConfig( + user=user, + scopes=GITHUB_CI_SCOPES, + business=NotebookRunnerConfig( + type="NotebookRunner", + options=NotebookRunnerOptions( + max_executions=len(self._notebooks), + repo_ref=self._github.ref, + repo_url=f"https://github.com/{self._github.repo_owner}/{self._github.repo_name}.git", + notebooks_to_run=self._notebooks, + ), + ), + ) + solitary = Solitary( + solitary_config=solitary_config, + gafaelfawr_storage=self._gafaelfawr, + http_client=self._http_client, + logger=self._logger, + ) + + result = await solitary.run() + if result.success: + await self.check_run.succeed() + else: + await self.check_run.fail(error=result.error or "Unknown Error") + + def _is_excluded(self, notebook: Path, exclude_dirs: set[Path]) -> bool: + """Exclude a notebook if any of its parent directories are excluded.""" + return bool(set(notebook.parents) & exclude_dirs) + + def summarize(self) -> CiJobSummary: + """Information about this job.""" + return CiJobSummary.model_validate( + {"commit_url": self._github.commit_url} + ) diff --git a/src/mobu/storage/github.py b/src/mobu/storage/github.py new file mode 100644 index 00000000..f6dd0733 --- /dev/null +++ b/src/mobu/storage/github.py @@ -0,0 +1,339 @@ +"""Tools for interacting with the GitHub REST API.""" + +from datetime import UTC, datetime +from enum import StrEnum +from pathlib import Path +from typing import Self +from urllib.parse import urlencode + +from gidgethub import HTTPException +from gidgethub.httpx import GitHubAPI +from pydantic import ( + AwareDatetime, + Base64Str, + BaseModel, + Field, + field_serializer, +) +from safir.github import GitHubAppClientFactory +from safir.github.models import GitHubCheckRunConclusion, GitHubCheckRunStatus + +from ..exceptions import GitHubFileNotFoundError + +__all__ = ["GitHubStorage", "CheckRun"] + + +class _CheckRunRequestOutput(BaseModel): + title: str | None = Field(None) + summary: str | None = Field(None) + text: str | None = Field(None) + + +class _CheckRunRequest(BaseModel): + name: str | None = Field(None) + head_sha: str = Field() + status: GitHubCheckRunStatus | None = Field(None) + conclusion: GitHubCheckRunConclusion | None = Field(None) + started_at: AwareDatetime | None = Field(None) + completed_at: AwareDatetime | None = Field(None) + output: _CheckRunRequestOutput | None = Field(None) + + @field_serializer("started_at", "completed_at") + def serialize_datetime(self, dt: datetime | None) -> str | None: + if dt is not None: + return dt.astimezone(UTC).isoformat() + return None + + +class _CheckRunResponse(BaseModel): + id: int = Field() + + +class _FileStatus(StrEnum): + added = "added" + removed = "removed" + modified = "modified" + renamed = "renamed" + copied = "copied" + changed = "changed" + unchanged = "unchanged" + + +class _ChangedFileResponse(BaseModel): + filename: Path = Field() + status: _FileStatus = Field() + + +class _FileContentsResponse(BaseModel): + content: Base64Str = Field() + + +class GitHubStorage: + """Tools to interact with the GitHub API. + + All interactions are scoped to a paricular repo and ref. + + Parameters + ---------- + client + An auth'd GitHub API client. + repo_owner + A GitHub organization. + repo_name + A GitHub repo. + ref + A Git ref. + """ + + def __init__( + self, + client: GitHubAPI, + repo_owner: str, + repo_name: str, + ref: str, + ) -> None: + self.client = client + self.repo_owner = repo_owner + self.repo_name = repo_name + self.ref = ref + self.commit_url = f"https://github.com/{self.repo_owner}/{self.repo_name}/commit/{self.ref}" + self._api_path = f"/repos/{repo_owner}/{repo_name}" + + @classmethod + async def create( + cls, + factory: GitHubAppClientFactory, + installation_id: int, + repo_owner: str, + repo_name: str, + ref: str, + ) -> Self: + """Create an auth'd GitHub client and construct an instance. + + Parameters + ---------- + factory + A GitHub client factory with credentials. + installation_id + The ID of an installed GitHub app. + repo_owner + A GitHub organization. + repo_name + A GitHub repo. + ref + A GitHub ref. + """ + client = await factory.create_installation_client( + installation_id=installation_id, + ) + return cls( + client=client, repo_name=repo_name, repo_owner=repo_owner, ref=ref + ) + + async def get_changed_files(self) -> list[Path]: + """Get a list of files whose contents have changed. + + Returns + ------- + list[Path] + List of paths relative to the repo root. + """ + path = f"{self._api_path}/commits/{self.ref}" + files = [ + _ChangedFileResponse.model_validate(info) + async for info in self.client.getiter(path, iterable_key="files") + ] + return [ + file.filename + for file in files + if file.status + in ( + _FileStatus.modified, + _FileStatus.changed, + _FileStatus.added, + ) + ] + + async def get_file_content(self, path: Path) -> str: + """Get the contents of a file in a GitHub repo. + + Raises ``GitHubFileNotFoundError`` if the file doesn't exist. + + Parameters + ---------- + path + Path of the file relative to the repo root. + + Returns + ------- + str + The contents of the file + """ + qs = urlencode({"ref": self.ref}) + api_path = f"{self._api_path}/contents/{path}?{qs}" + try: + res = await self.client.getitem( + api_path, url_vars={"ref": self.ref} + ) + except HTTPException as exc: + if exc.status_code == 404: + raise GitHubFileNotFoundError from None + file = _FileContentsResponse.model_validate(res) + return file.content + + async def create_check_run( + self, + name: str, + summary: str, + details: str | None = None, + ) -> "CheckRun": + """Create a check run and return an object to manage it. + + Parameters + ---------- + name + The name of the checkrun. This will also be output title. + summary + The output summary. + details + The output details + + Returns + ------- + CheckRun + An object to manage GitHub check run status. + """ + path = f"/repos/{self.repo_owner}/{self.repo_name}/check-runs" + data = _CheckRunRequest( + head_sha=self.ref, + name=name, + status=GitHubCheckRunStatus.queued, + started_at=datetime.now(UTC), + output=_CheckRunRequestOutput( + title=name, + summary=summary, + text=details, + ), + ).model_dump(exclude_unset=True, exclude_none=True) + + res = await self.client.post(path, data=data) + check_run = _CheckRunResponse.model_validate(res) + + return CheckRun( + github_storage=self, + id=check_run.id, + name=name, + summary=summary, + details=details, + ) + + +class CheckRun: + """Manage GitHub check runs via the GitHub API. + + Parameters + ---------- + github_storage + To interact with the github api + id + The GitHub API check run ID. + name + The name of the checkrun. This will also be output title. + summary + The output summary. + details + The output details + """ + + def __init__( + self, + github_storage: GitHubStorage, + id: int, + name: str, + summary: str, + details: str | None = None, + ) -> None: + self._client = github_storage.client + self._ref = github_storage.ref + self._name = name + self._summary = summary + self._text = details + self._api_path = ( + f"/repos/{github_storage.repo_owner}/" + f"{github_storage.repo_name}/check-runs/{id}" + ) + + async def fail(self, error: str) -> None: + """Conclude a GitHub check run as a failure. + + Parameters + ---------- + error + The error that occurred. + """ + await self._update( + conclusion=GitHubCheckRunConclusion.failure, + text=error, + ) + + async def succeed(self, details: str | None = None) -> None: + """Conclude a GitHub check run as a success. + + Parameters + ---------- + details + Output details to display. + """ + await self._update( + conclusion=GitHubCheckRunConclusion.success, + text=details, + ) + + async def start( + self, + summary: str | None = None, + details: str | None = None, + ) -> None: + """Update a GitHub check run to: in progress. + + Parameters + ---------- + summary + The output summary to display. + details + Output details to display. + """ + await self._update( + status=GitHubCheckRunStatus.in_progress, + summary=summary, + text=details, + ) + + async def _update( + self, + status: GitHubCheckRunStatus | None = None, + conclusion: GitHubCheckRunConclusion | None = None, + summary: str | None = None, + text: str | None = None, + ) -> None: + """Update a check run, update internal state if necessary.""" + self._summary = summary or self._summary + self._text = text or self._text + + now = datetime.now(UTC) + request = _CheckRunRequest( + head_sha=self._ref, + status=status, + conclusion=conclusion, + completed_at=now if conclusion is not None else None, + output=_CheckRunRequestOutput( + title=self._name, + summary=self._summary, + text=self._text, + ), + ) + data = request.model_dump(exclude_unset=True, exclude_none=True) + await self._client.patch( + self._api_path, + data=data, + ) diff --git a/tests/conftest.py b/tests/conftest.py index e74f5e5c..f38dc38d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,8 @@ from collections.abc import AsyncIterator, Iterator from contextlib import asynccontextmanager +from pathlib import Path +from textwrap import dedent from unittest.mock import DEFAULT, patch import pytest @@ -16,14 +18,17 @@ from safir.testing.slack import MockSlackWebhook, mock_slack_webhook from mobu import main -from mobu.config import config +from mobu.config import GitHubCiApp, GitHubRefreshApp, config from mobu.services.business.gitlfs import GitLFSBusiness from .support.constants import ( TEST_BASE_URL, + TEST_GITHUB_CI_APP_PRIVATE_KEY, + TEST_GITHUB_CI_APP_SECRET, TEST_GITHUB_REFRESH_APP_SECRET, ) from .support.gafaelfawr import make_gafaelfawr_token +from .support.github import GitHubMocker from .support.gitlfs import ( no_git_lfs_data, uninstall_git_lfs, @@ -56,6 +61,33 @@ def _configure() -> Iterator[None]: @pytest.fixture +def _enable_github_ci_app(tmp_path: Path) -> Iterator[None]: + """Enable the GitHub CI app functionality. + """ + github_config = tmp_path / "github_config.yaml" + github_config.write_text( + dedent(""" + users: + - username: bot-mobu-unittest-1 + - username: bot-mobu-unittest-2 + accepted_github_orgs: + - org1 + - org2 + - lsst-sqre + """) + ) + config.github_ci_app.id = 1 + config.github_ci_app.enabled = True + config.github_ci_app.webhook_secret = TEST_GITHUB_CI_APP_SECRET + config.github_ci_app.private_key = TEST_GITHUB_CI_APP_PRIVATE_KEY + config.github_config_path = github_config + + yield + + config.github_ci_app = GitHubCiApp() + config.github_config_path = None + + @pytest.fixture def _enable_github_refresh_app(tmp_path: Path) -> Iterator[None]: """Enable the GitHub Refresh app routes. @@ -66,6 +98,9 @@ def _enable_github_refresh_app(tmp_path: Path) -> Iterator[None]: github_config = tmp_path / "github_config.yaml" github_config.write_text( dedent(""" + users: + - username: bot-mobu-unittest-1 + - username: bot-mobu-unittest-2 accepted_github_orgs: - org1 - org2 @@ -186,3 +221,10 @@ def _no_monkey_business() -> Iterator[None]: "mobu.services.flock.Monkey", start=DEFAULT, stop=DEFAULT ): yield + + +@pytest.fixture +def github_mocker() -> Iterator[GitHubMocker]: + github_mocker = GitHubMocker() + with github_mocker.router: + yield github_mocker diff --git a/tests/services/__init__.py b/tests/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/services/ci_manager_test.py b/tests/services/ci_manager_test.py new file mode 100644 index 00000000..3520baf0 --- /dev/null +++ b/tests/services/ci_manager_test.py @@ -0,0 +1,357 @@ +"""Tests for CiManager.""" + +import asyncio + +import pytest +import respx +import structlog +from httpx import AsyncClient +from pydantic import HttpUrl +from pytest_mock import MockerFixture + +from mobu.models.ci_manager import ( + CiJobSummary, + CiManagerSummary, + CiWorkerSummary, +) +from mobu.models.user import User +from mobu.services.business.base import Business +from mobu.services.github_ci.ci_manager import CiManager +from mobu.storage.gafaelfawr import GafaelfawrStorage + +from ..support.gafaelfawr import mock_gafaelfawr +from ..support.github import GitHubMocker, MockJob + + +def create_ci_manager(respx_mock: respx.Router) -> CiManager: + """Create a CiManger with appropriately mocked dependencies.""" + mock_gafaelfawr( + respx_mock, + scopes=[ + "exec:notebook", + "exec:portal", + "read:image", + "read:tap", + ], + ) + http_client = AsyncClient() + logger = structlog.get_logger() + gafaelfawr = GafaelfawrStorage(http_client=http_client, logger=logger) + + return CiManager( + http_client=http_client, + gafaelfawr_storage=gafaelfawr, + logger=logger, + users=[User(username="user1"), User(username="user2")], + ) + + +async def setup_and_run_jobs( + jobs: list[MockJob], + github_mocker: GitHubMocker, + respx_mock: respx.Router, + mocker: MockerFixture, +) -> None: + """Create a CiManager, enqueue all jobs, wait for them all to be processed, + wait for the manager to shut down. + """ + mocker.patch.object( + Business, "run_once", new=github_mocker.get_mock_run_function() + ) + + ci_manager = create_ci_manager(respx_mock) + await ci_manager.start() + events: list[asyncio.Event] = [] + for job in jobs: + lifecycle = await ci_manager.enqueue( + installation_id=job.installation_id, + repo_owner=job.repo_owner, + repo_name=job.repo_name, + ref=job.ref, + ) + events.append(lifecycle.processed) + await asyncio.gather(*[event.wait() for event in events]) + await ci_manager.aclose() + + +@pytest.mark.asyncio +@pytest.mark.usefixtures( + "_enable_github_ci_app", +) +async def test_stops_on_empty_queue( + respx_mock: respx.Router, +) -> None: + ci_manager = create_ci_manager(respx_mock) + await ci_manager.start() + expected_summary = CiManagerSummary( + workers=[ + CiWorkerSummary( + user=User(username="user1", uidnumber=None, gidnumber=None), + num_processed=0, + current_job=None, + ), + CiWorkerSummary( + user=User(username="user2", uidnumber=None, gidnumber=None), + num_processed=0, + current_job=None, + ), + ], + num_queued=0, + ) + assert ci_manager.summarize() == expected_summary + await asyncio.wait_for(ci_manager.aclose(), timeout=0.5) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures( + "_enable_github_ci_app", +) +async def test_no_changed_files( + respx_mock: respx.Router, + github_mocker: GitHubMocker, + mocker: MockerFixture, +) -> None: + jobs = [ + github_mocker.job_no_changed_files(id="ref1"), + ] + + await setup_and_run_jobs( + jobs=jobs, + github_mocker=github_mocker, + mocker=mocker, + respx_mock=respx_mock, + ) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures( + "_enable_github_ci_app", +) +async def test_invalid_config( + respx_mock: respx.Router, + github_mocker: GitHubMocker, + mocker: MockerFixture, +) -> None: + jobs = [ + github_mocker.job_invalid_config(id="ref1"), + ] + + await setup_and_run_jobs( + jobs=jobs, + github_mocker=github_mocker, + mocker=mocker, + respx_mock=respx_mock, + ) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures( + "_enable_github_ci_app", +) +async def test_missing_config( + respx_mock: respx.Router, + github_mocker: GitHubMocker, + mocker: MockerFixture, +) -> None: + jobs = [ + github_mocker.job_missing_config(id="ref1", should_fail=False), + ] + + await setup_and_run_jobs( + jobs=jobs, + github_mocker=github_mocker, + mocker=mocker, + respx_mock=respx_mock, + ) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures( + "_enable_github_ci_app", +) +async def test_runs_jobs( + respx_mock: respx.Router, + github_mocker: GitHubMocker, + mocker: MockerFixture, +) -> None: + jobs = [ + github_mocker.job_processed_completely(id="ref1", should_fail=False), + github_mocker.job_processed_completely(id="ref2", should_fail=False), + github_mocker.job_processed_completely(id="ref3", should_fail=True), + github_mocker.job_processed_completely(id="ref4", should_fail=True), + ] + + mocker.patch.object( + Business, "run_once", new=github_mocker.get_mock_run_function() + ) + + await setup_and_run_jobs( + jobs=jobs, + github_mocker=github_mocker, + mocker=mocker, + respx_mock=respx_mock, + ) + + +@pytest.mark.asyncio +@pytest.mark.usefixtures( + "_enable_github_ci_app", +) +async def test_shutdown( + respx_mock: respx.Router, + github_mocker: GitHubMocker, + mocker: MockerFixture, +) -> None: + """Test that all queued jobs conclude in GitHub during shutdown.""" + completed_jobs = [ + github_mocker.job_processed_completely( + id="complete_1", should_fail=True + ), + github_mocker.job_processed_completely( + id="complete_2", should_fail=False + ), + ] + in_progress_jobs = [ + github_mocker.job_processed_while_shutting_down( + id="in_progress_1", should_fail=True + ), + github_mocker.job_processed_while_shutting_down( + id="in_progress_2", should_fail=False + ), + ] + queued_jobs = [ + github_mocker.job_queued_while_shutting_down(id="queued_1"), + github_mocker.job_queued_while_shutting_down(id="queued_2"), + ] + + mocker.patch.object( + Business, + "run_once", + new=github_mocker.get_mock_run_function(blocking_jobs=True), + ) + + ci_manager = create_ci_manager(respx_mock) + await ci_manager.start() + + completed_lifecycles = [ + await ci_manager.enqueue( + installation_id=job.installation_id, + repo_owner=job.repo_owner, + repo_name=job.repo_name, + ref=job.ref, + ) + for job in completed_jobs + ] + + in_progress_lifecycles = [ + await ci_manager.enqueue( + installation_id=job.installation_id, + repo_owner=job.repo_owner, + repo_name=job.repo_name, + ref=job.ref, + ) + for job in in_progress_jobs + ] + + queued_lifecycles = [ + await ci_manager.enqueue( + installation_id=job.installation_id, + repo_owner=job.repo_owner, + repo_name=job.repo_name, + ref=job.ref, + ) + for job in queued_jobs + ] + + # Wait for the first two jobs to start processing + await asyncio.gather( + *[lifecycle.processing.wait() for lifecycle in completed_lifecycles] + ) + + # Wait for the first two jobs to complete + for job in completed_jobs: + job.proceed_event.set() + await asyncio.gather( + *[lifecycle.processed.wait() for lifecycle in completed_lifecycles] + ) + + # Wait for the next two jobs to start processing + await asyncio.gather( + *[lifecycle.processing.wait() for lifecycle in in_progress_lifecycles] + ) + + # We'll check this later + summary = ci_manager.summarize() + + # Tell the CI manager to stop and let the next two jobs proceed only when + # we know the shutdown process has reached a certain point + task = asyncio.create_task(ci_manager.aclose()) + await ci_manager.lifecycle.marked_remaining.wait() + + for job in in_progress_jobs: + job.proceed_event.set() + await task + + # Make sure the jobs that began processing finished processing + assert all( + lifecycle.processed.is_set() for lifecycle in in_progress_lifecycles + ) + + # Make sure the queued jobs never procsesed + assert not any( + lifecycle.processed.is_set() for lifecycle in queued_lifecycles + ) + assert not any( + lifecycle.processing.is_set() for lifecycle in queued_lifecycles + ) + + # Check the point-in-time summary. It's possible for either worker to + # have either in_progress job. + expected_summary1 = CiManagerSummary( + workers=[ + CiWorkerSummary( + user=User(username="user1", uidnumber=None, gidnumber=None), + num_processed=1, + current_job=CiJobSummary( + commit_url=HttpUrl( + "https://github.com/repo_owner_in_progress_1/repo_name_in_progress_1/commit/ref_in_progress_1" + ) + ), + ), + CiWorkerSummary( + user=User(username="user2", uidnumber=None, gidnumber=None), + num_processed=1, + current_job=CiJobSummary( + commit_url=HttpUrl( + "https://github.com/repo_owner_in_progress_2/repo_name_in_progress_2/commit/ref_in_progress_2" + ) + ), + ), + ], + num_queued=2, + ) + + expected_summary2 = CiManagerSummary( + workers=[ + CiWorkerSummary( + user=User(username="user1", uidnumber=None, gidnumber=None), + num_processed=1, + current_job=CiJobSummary( + commit_url=HttpUrl( + "https://github.com/repo_owner_in_progress_2/repo_name_in_progress_2/commit/ref_in_progress_2" + ) + ), + ), + CiWorkerSummary( + user=User(username="user2", uidnumber=None, gidnumber=None), + num_processed=1, + current_job=CiJobSummary( + commit_url=HttpUrl( + "https://github.com/repo_owner_in_progress_1/repo_name_in_progress_1/commit/ref_in_progress_1" + ) + ), + ), + ], + num_queued=2, + ) + assert summary in (expected_summary1, expected_summary2) diff --git a/tests/support/constants.py b/tests/support/constants.py index 386cf6a9..d93b18b1 100644 --- a/tests/support/constants.py +++ b/tests/support/constants.py @@ -6,3 +6,35 @@ TEST_GITHUB_REFRESH_APP_SECRET = "some-webhook-secret" """Webhook secret used for hashing test github refresh app webhook payloads.""" +TEST_GITHUB_CI_APP_SECRET = "some-ci-webhook-secret" +"""Webhook secret used for hashing test github ci app webhook payloads.""" + +TEST_GITHUB_CI_APP_PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY----- +MIIEpQIBAAKCAQEA1HgzBfJv2cOjQryCwe8NEelriOTNFWKZUivevUrRhlqcmZJd +CvuCJRr+xCN+OmO8qwgJJR98feNujxVg+J9Ls3/UOA4HcF9nYH6aqVXELAE8Hk/A +Lvxi96ms1DDuAvQGaYZ+lANxlvxeQFOZSbjkz/9mh8aLeGKwqJLp3p+OhUBQpwvA +UAPg82+OUtgTW3nSljjeFr14B8qAneGSc/wl0ni++1SRZUXFSovzcqQOkla3W27r +rLfrD6LXgj/TsDs4vD1PnIm1zcVenKT7TfYI17bsG/O/Wecwz2Nl19pL7gDosNru +F3ogJWNq1Lyn/ijPQnkPLpZHyhvuiycYcI3DiQIDAQABAoIBAQCt9uzwBZ0HVGQs +lGULnUu6SsC9iXlR9TVMTpdFrij4NODb7Tc5cs0QzJWkytrjvB4Se7XhK3KnMLyp +cvu/Fc7J3fRJIVN98t+V5pOD6rGAxlIPD4Vv8z6lQcw8wQNgb6WAaZriXh93XJNf +YBO2hSj0FU5CBZLUsxmqLQBIQ6RR/OUGAvThShouE9K4N0vKB2UPOCu5U+d5zS3W +44Q5uatxYiSHBTYIZDN4u27Nfo5WA+GTvFyeNsO6tNNWlYfRHSBtnm6SZDY/5i4J +fxP2JY0waM81KRvuHTazY571lHM/TTvFDRUX5nvHIu7GToBKahfVLf26NJuTZYXR +5c09GAXBAoGBAO7a9M/dvS6eDhyESYyCjP6w61jD7UYJ1fudaYFrDeqnaQ857Pz4 +BcKx3KMmLFiDvuMgnVVj8RToBGfMV0zP7sDnuFRJnWYcOeU8e2sWGbZmWGWzv0SD ++AhppSZThU4mJ8aa/tgsepCHkJnfoX+3wN7S9NfGhM8GDGxTHJwBpxINAoGBAOO4 +ZVtn9QEblmCX/Q5ejInl43Y9nRsfTy9lB9Lp1cyWCJ3eep6lzT60K3OZGVOuSgKQ +vZ/aClMCMbqsAAG4fKBjREA6p7k4/qaMApHQum8APCh9WPsKLaavxko8ZDc41kZt +hgKyUs2XOhW/BLjmzqwGryidvOfszDwhH7rNVmRtAoGBALYGdvrSaRHVsbtZtRM3 +imuuOCx1Y6U0abZOx9Cw3PIukongAxLlkL5G/XX36WOrQxWkDUK930OnbXQM7ZrD ++5dW/8p8L09Zw2VHKmb5eK7gYA1hZim4yJTgrdL/Y1+jBDz+cagcfWsXZMNfAZxr +VLh628x0pVF/sof67pqVR9UhAoGBAMcQiLoQ9GJVhW1HMBYBnQVnCyJv1gjBo+0g +emhrtVQ0y6+FrtdExVjNEzboXPWD5Hq9oKY+aswJnQM8HH1kkr16SU2EeN437pQU +zKI/PtqN8AjNGp3JVgLioYp/pHOJofbLA10UGcJTMpmT9ELWsVA8P55X1a1AmYDu +y9f2bFE5AoGAdjo95mB0LVYikNPa+NgyDwLotLqrueb9IviMmn6zKHCwiOXReqXD +X9slB8RA15uv56bmN04O//NyVFcgJ2ef169GZHiRFIgIy0Pl8LYkMhCYKKhyqM7g +xN+SqGqDTKDC22j00S7jcvCaa1qadn1qbdfukZ4NXv7E2d/LO0Y2Kkc= +-----END RSA PRIVATE KEY----- + +""" diff --git a/tests/support/gafaelfawr.py b/tests/support/gafaelfawr.py index 32472e9b..05b7a71e 100644 --- a/tests/support/gafaelfawr.py +++ b/tests/support/gafaelfawr.py @@ -40,12 +40,14 @@ def mock_gafaelfawr( gid: int | None = None, *, any_uid: bool = False, + scopes: list[str] | None = None, ) -> None: """Mock out the call to Gafaelfawr to create a user token. Optionally verifies that the username and UID provided to Gafaelfawr are correct. """ + scopes = scopes or ["exec:notebook"] admin_token = config.gafaelfawr_token assert admin_token assert admin_token.startswith("gt-") @@ -55,7 +57,7 @@ def handler(request: Request) -> Response: expected = { "username": username if username else ANY, "token_type": "service", - "scopes": ["exec:notebook"], + "scopes": scopes, "expires": ANY, "name": "Mobu Test User", } diff --git a/tests/support/github.py b/tests/support/github.py new file mode 100644 index 00000000..7b4e2f28 --- /dev/null +++ b/tests/support/github.py @@ -0,0 +1,316 @@ +"""Functions and constants for mocking out GitHub API behavior.""" + +import asyncio +from base64 import b64encode +from collections.abc import Callable +from textwrap import dedent + +import respx + +from mobu.constants import GITHUB_REPO_CONFIG_PATH +from mobu.services.business.base import Business +from mobu.services.github_ci.ci_manager import CiManager + +__all__ = ["GitHubMocker", "MockJob"] + + +class MockJob: + """State and config for some fake work to be done by CiManager in tests.""" + + def __init__(self, id: str, *, should_fail: bool = False) -> None: + self.id = id + self.installation_id = 123 + self.should_fail = should_fail + self.proceed_event = asyncio.Event() + self.path_prefix = f"/repos/{self.repo_owner}/{self.repo_name}" + + @property + def repo_owner(self) -> str: + return f"repo_owner_{self.id}" + + @property + def repo_name(self) -> str: + return f"repo_name_{self.id}" + + @property + def ref(self) -> str: + return f"ref_{self.id}" + + +class GitHubMocker: + """A big bucket of mocks and state to mock GitHub API behavior. + + Create an instance of this class and then call functions to mock GitHub API + responses and job functionality based on desired behavior. + This is very stateful, and asserts that an exact set of HTTP calls are + made--no more, and no less. + The general pattern: + 1. Call a bunch of job_* methods to mock behavior for those usecases + 2. Hang on to the ``MockJob``s returned from those calls + 3. Patch ``Business.run_once`` with ``get_mock_run_function`` + 4. Start a CiManager, and ``enqueue`` jobs with the info from the MockJob + values + 5. Do whatever you need to do to cause the situations :) + This may include waiting and triggering various ``asyncio.Event``s + """ + + def __init__(self) -> None: + self._blocking_jobs = False + self.jobs: list[MockJob] = [] + + self.router = respx.mock( + base_url="https://api.github.com", + assert_all_mocked=True, + assert_all_called=True, + ) + + # Mock the endpoint that gives us a token + self.router.post( + url__regex=r"/app/installations/(?P\d+)/access_tokens", + ).respond( + json={ + "token": "whatever", + "expires_at": "whenever", + } + ) + + def job_invalid_config(self, id: str) -> MockJob: + """Causes an invalid in-repo config file.""" + job = MockJob(id=id) + self.jobs.append(job) + + self._mock_get_repo_config_content(job, valid=False) + + # Create a check run with a `queued` status + self.router.post( + path=f"{job.path_prefix}/check-runs", + ).respond(json={"id": "1"}) + + # The check fails due to an invalid config + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", + json__conclusion="failure", + ) + + return job + + def job_missing_config(self, id: str, *, should_fail: bool) -> MockJob: + """Causes an missing in-repo config file.""" + job = MockJob(id=id, should_fail=should_fail) + self.jobs.append(job) + + self._mock_get_changed_files(job) + + # Missing config file in the repo should be fine + self._mock_get_repo_config_content(job, missing=True) + + # Create a check run with a `queued` status + self.router.post( + path=f"{job.path_prefix}/check-runs", + ).respond(json={"id": "1"}) + + # Eventually mark the check run 'in progress' + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", json__status="in_progress" + ) + + # Mark the check run failed or succeeded + conclusion = "failure" if job.should_fail else "success" + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", + json__conclusion=conclusion, + ) + + return job + + def job_no_changed_files(self, id: str) -> MockJob: + """Causes an GitHub API response indicating no notebooks have + changed. + """ + job = MockJob(id=id) + self.jobs.append(job) + + self._mock_get_changed_files(job, has_changed_files=False) + self._mock_get_repo_config_content(job) + + # Create a check run with a `queued` status + self.router.post( + path=f"{job.path_prefix}/check-runs", + ).respond(json={"id": "1"}) + + # The check never starts because there are no changed files + + # The check always succeeds + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", + json__conclusion="success", + ) + + return job + + def job_processed_completely( + self, id: str, *, should_fail: bool + ) -> MockJob: + """Mock the GitHub API for a job that processes completely with + no interruption. + """ + job = MockJob(id=id, should_fail=should_fail) + self.jobs.append(job) + + self._mock_get_changed_files(job) + self._mock_get_repo_config_content(job) + + # Create a check run with a `queued` status + self.router.post( + path=f"{job.path_prefix}/check-runs", + ).respond(json={"id": "1"}) + + # Eventually mark the check run 'in progress' + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", json__status="in_progress" + ) + + # Mark the check run failed or succeeded + conclusion = "failure" if job.should_fail else "success" + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", + json__conclusion=conclusion, + ) + + return job + + def job_processed_while_shutting_down( + self, id: str, *, should_fail: bool + ) -> MockJob: + """Mock the GitHub API for a job that processes completely, but is + in-progress when mobu shuts down. + """ + job = MockJob(id=id, should_fail=should_fail) + self.jobs.append(job) + + self._mock_get_changed_files(job) + self._mock_get_repo_config_content(job) + + # Create a check run with a `queued` status + self.router.post( + path=f"{job.path_prefix}/check-runs", + ).respond(json={"id": "1"}) + + # Eventually mark the check run 'in progress' + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", json__status="in_progress" + ) + + # The check will be marked as failed due to Mobu shutdown... + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", + json__conclusion="failure", + json__output__text=CiManager.shutdown_error_msg, + ) + + # But it should eventually complete + conclusion = "failure" if job.should_fail else "success" + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", + json__conclusion=conclusion, + ) + + return job + + def job_queued_while_shutting_down(self, id: str) -> MockJob: + """Mock the GitHub API for a job that gets queued, but never + processed. + """ + job = MockJob(id=id) + self.jobs.append(job) + + # Create a check run with a `queued` status + self.router.post( + path=f"{job.path_prefix}/check-runs", + ).respond(json={"id": "1"}) + + # The check should never be marked as in progress + + # The check should never be updated based on compleion of the job + + # The check will be marked as failed due to Mobu shutdown + self.router.patch( + path=f"{job.path_prefix}/check-runs/1", + json__conclusion="failure", + json__output__text=CiManager.shutdown_error_msg, + ) + + return job + + def enable_blocking_jobs(self) -> None: + """Make it so that you must set an asyncio.Event in your test for the + job to progress. + """ + self._blocking_jobs = True + + def get_mock_run_function( + self, *, blocking_jobs: bool = False + ) -> Callable: + """Patch this in to mobu.services.buiness.base.Business.run_once. + + It will: + * Raise an exception while trying to run any job that was + configured to fail + * Optionally require an asyncio.Event to be set in order to progress + """ + + async def run_once(host_self: Business) -> None: + ref = host_self.options.repo_ref + job = next(job for job in self.jobs if job.ref == ref) + if blocking_jobs: + await job.proceed_event.wait() + if job.should_fail: + raise RuntimeError("Blowing up on purpose!") + + return run_once + + def _mock_get_changed_files( + self, job: MockJob, *, has_changed_files: bool = True + ) -> None: + """Mock different responses from the commits API.""" + if has_changed_files: + changes = [ + {"filename": "notebook_changed1.ipynb", "status": "modified"}, + {"filename": "notebook_deleted.ipynb", "status": "removed"}, + {"filename": "not_a_notebook.txt", "status": "modified"}, + {"filename": "notebook_changed2.ipynb", "status": "modified"}, + ] + else: + changes = [ + {"filename": "notebook_deleted.ipynb", "status": "removed"}, + ] + + self.router.get(path=f"{job.path_prefix}/commits/{job.ref}").respond( + json={"files": changes} + ) + + def _mock_get_repo_config_content( + self, job: MockJob, *, valid: bool = True, missing: bool = False + ) -> None: + """Mock different responses from the contents API.""" + if valid: + content = dedent(""" + exclude_dirs: + - exclude_me + """) + else: + content = dedent(""" + nope: + - exclude_me + """) + + route = self.router.get( + path=f"{job.path_prefix}/contents/{GITHUB_REPO_CONFIG_PATH}", + params={"ref": job.ref}, + ) + if missing: + route.respond(404) + else: + route.respond( + json={"content": b64encode(content.encode()).decode()} + )