From ff9797e9ce67874372a97fbe5fc557a0a4da2642 Mon Sep 17 00:00:00 2001 From: Bruno Grande Date: Tue, 2 May 2023 10:50:12 -0700 Subject: [PATCH 1/5] Add `resume` & `session_id` to `LaunchInfo` model --- src/orca/services/nextflowtower/models.py | 80 +++++++++++++-------- tests/services/nextflowtower/test_models.py | 36 ++++++++++ 2 files changed, 87 insertions(+), 29 deletions(-) diff --git a/src/orca/services/nextflowtower/models.py b/src/orca/services/nextflowtower/models.py index ae735eb..ea0cf49 100644 --- a/src/orca/services/nextflowtower/models.py +++ b/src/orca/services/nextflowtower/models.py @@ -4,6 +4,7 @@ from enum import Enum from typing import Any, Iterable, Optional +from pydantic import root_validator from pydantic.dataclasses import dataclass from typing_extensions import Self @@ -102,6 +103,27 @@ class LaunchInfo: user_secrets: list[str] = field(default_factory=list) workspace_secrets: list[str] = field(default_factory=list) label_ids: list[int] = field(default_factory=list) + resume: bool = False + session_id: Optional[str] = None + + @root_validator() + def check_resume_and_session_id(cls, values: dict[str, Any]): + """Make sure that resume and session_id are in sync. + + Args: + values: Dictionary of attributes. + + Raises: + ValueError: If resume is enabled and a session ID + is not provided. + + Returns: + Unmodified dictionary of attributes. + """ + if values["resume"] and values["session_id"] is None: + message = "Resume can only be enabled with a session ID." + raise ValueError(message) + return values def fill_in(self, attr: str, value: Any): """Fill in any missing values. @@ -148,36 +170,36 @@ def to_dict(self) -> dict[str, Any]: Returns: JSON representation. """ - output = { - "launch": { - "computeEnvId": self.get("compute_env_id"), - "configProfiles": dedup(self.profiles), - "configText": self.nextflow_config, - "dateCreated": None, - "entryName": None, - "headJobCpus": None, - "headJobMemoryMb": None, - "id": None, - "labelIds": dedup(self.label_ids), - "mainScript": None, - "optimizationId": None, - "paramsText": json.dumps(self.params), - "pipeline": self.get("pipeline"), - "postRunScript": None, - "preRunScript": self.pre_run_script, - "pullLatest": False, - "resume": False, - "revision": self.revision, - "runName": self.run_name, - "schemaName": None, - "stubRun": False, - "towerConfig": None, - "userSecrets": dedup(self.user_secrets), - "workDir": self.get("work_dir"), - "workspaceSecrets": dedup(self.workspace_secrets), - } + launch = { + "computeEnvId": self.get("compute_env_id"), + "configProfiles": dedup(self.profiles), + "configText": self.nextflow_config, + "dateCreated": None, + "entryName": None, + "headJobCpus": None, + "headJobMemoryMb": None, + "id": None, + "labelIds": dedup(self.label_ids), + "mainScript": None, + "optimizationId": None, + "paramsText": json.dumps(self.params), + "pipeline": self.get("pipeline"), + "postRunScript": None, + "preRunScript": self.pre_run_script, + "pullLatest": False, + "revision": self.revision, + "runName": self.run_name, + "schemaName": None, + "stubRun": False, + "towerConfig": None, + "userSecrets": dedup(self.user_secrets), + "workDir": self.get("work_dir"), + "workspaceSecrets": dedup(self.workspace_secrets), } - return output + if self.resume: + launch["resume"] = self.resume + launch["sessionId"] = self.get("session_id") + return {"launch": launch} @dataclass(kw_only=False) diff --git a/tests/services/nextflowtower/test_models.py b/tests/services/nextflowtower/test_models.py index 7cfe204..c474ce7 100644 --- a/tests/services/nextflowtower/test_models.py +++ b/tests/services/nextflowtower/test_models.py @@ -3,6 +3,18 @@ from orca.services.nextflowtower.models import LaunchInfo +@pytest.fixture +def launch_info(): + yield LaunchInfo( + compute_env_id="5ykJF", + pipeline="foo/bar", + revision="1.1.0", + profiles=["test"], + params={"foo": "bar"}, + work_dir="s3://foo/work", + ) + + def test_that_getting_an_launch_info_attribute_works(): launch_info = LaunchInfo(pipeline="foo") assert launch_info.get("pipeline") == "foo" @@ -32,3 +44,27 @@ def test_for_an_error_when_adding_in_with_nonlist_launch_info_attribute(): launch_info = LaunchInfo(pipeline="foo") with pytest.raises(ValueError): launch_info.add_in("pipeline", [4, 5, 6]) + + +def test_that_launch_info_can_be_created_with_resume_enabled(): + launch_info = LaunchInfo(resume=True, session_id="foo") + assert launch_info + + +def test_that_launch_info_can_be_serialized_with_resume_enabled(launch_info): + launch_info.resume = True + launch_info.session_id = "foo" + json = launch_info.to_dict() + assert "resume" in json["launch"] + assert "sessionId" in json["launch"] + + +def test_that_launch_info_can_be_serialized_with_resume_disabled(launch_info): + json = launch_info.to_dict() + assert "resume" not in json["launch"] + assert "sessionId" not in json["launch"] + + +def test_for_an_error_when_enabling_resume_without_session_id(): + with pytest.raises(ValueError): + LaunchInfo(resume=True) From 811bd8dccc3e220484e6e0abf110681dd0537332 Mon Sep 17 00:00:00 2001 From: Bruno Grande Date: Tue, 2 May 2023 19:59:41 -0700 Subject: [PATCH 2/5] Implement `list_workflows()` and refactor models --- src/orca/services/nextflowtower/client.py | 48 ++++- src/orca/services/nextflowtower/models.py | 190 ++++++++++------- src/orca/services/nextflowtower/ops.py | 43 ++-- tests/services/nextflowtower/responses.py | 223 ++++++++++++++++++++ tests/services/nextflowtower/test_client.py | 16 +- tests/services/nextflowtower/test_enums.py | 22 +- tests/services/nextflowtower/test_models.py | 4 +- tests/services/nextflowtower/test_ops.py | 10 +- 8 files changed, 437 insertions(+), 119 deletions(-) diff --git a/src/orca/services/nextflowtower/client.py b/src/orca/services/nextflowtower/client.py index 4192d36..2690e93 100644 --- a/src/orca/services/nextflowtower/client.py +++ b/src/orca/services/nextflowtower/client.py @@ -318,22 +318,50 @@ def launch_workflow( """ path = "/workflow/launch" params = self.generate_params(workspace_id) - payload = launch_info.to_dict() + payload = launch_info.to_json() json = self.post(path, params=params, json=payload) return self.unwrap(json, "workflowId") - def get_workflow(self, workspace_id: int, workflow_id: str) -> dict: - """Gets available information about a workflow run + def get_workflow( + self, + workflow_id: str, + workspace_id: Optional[int] = None, + ) -> models.Workflow: + """Get information about a workflow run. Attributes: - workspace_id (int): The ID number of the workspace the workflow - exists within. - workflow_id (str): The ID number for a workflow run to get - information about. + workflow_id: The ID number for a workflow run to get + information about. + workspace_id: The ID number of the workspace the workflow + exists within. Defaults to None. Returns: - response (dict): Dictionary containing information about the workflow run + Workflow instance. """ path = f"/workflow/{workflow_id}" - json = self.get(path=path, params={"workspaceId": workspace_id}) - return json + params = self.generate_params(workspace_id) + json = self.get(path=path, params=params) + unwrapped = self.unwrap(json, "workflow") + return models.Workflow.from_json(unwrapped) + + def list_workflows( + self, + search_filter: Optional[str] = None, + workspace_id: Optional[int] = None, + ) -> list[models.Workflow]: + """List available workflows that match search filter. + + Attributes: + search_filter: A Nextflow Tower search query, as you would + compose it in the runs search bar. Defaults to None. + workspace_id: The ID number of the workspace the workflow + exists within. Defaults to None. + + Returns: + List of workflow instances. + """ + path = "/workflow" + params = self.generate_params(workspace_id, search=search_filter) + json = self.get(path=path, params=params) + items = self.unwrap(json, "workflows") + return [models.Workflow.from_json(item["workflow"]) for item in items] diff --git a/src/orca/services/nextflowtower/models.py b/src/orca/services/nextflowtower/models.py index ea0cf49..9b70057 100644 --- a/src/orca/services/nextflowtower/models.py +++ b/src/orca/services/nextflowtower/models.py @@ -1,21 +1,22 @@ -import json -from dataclasses import field +import json as json_module +from dataclasses import field, fields from datetime import datetime from enum import Enum -from typing import Any, Iterable, Optional +from typing import Any, ClassVar, Iterable, Optional from pydantic import root_validator from pydantic.dataclasses import dataclass from typing_extensions import Self -from orca.services.nextflowtower.utils import dedup, parse_datetime +from orca.services.nextflowtower.utils import dedup -class TaskStatus(Enum): - """enum containing all possible status values for - Nextflow Tower runs. terminal_states set which - statuses result in a run being determined to be - "complete" +class WorkflowStatus(Enum): + """Valid values for the status of a Tower workflow. + + Attributes: + terminate_states: List of status values for a workflow + that is no longer in progress. """ SUBMITTED = "SUBMITTED" @@ -29,66 +30,106 @@ class TaskStatus(Enum): @dataclass(kw_only=False) -class User: - """Nextflow Tower user.""" +class BaseTowerModel: + """Base model for Nextflow Tower models. - id: int - username: str - email: str + Attributes: + _key_mapping: Mapping between Python and API key names. + Only discordant names need to be listed. + """ + + _key_mapping: ClassVar[dict[str, str]] @classmethod - def from_json(cls, response: dict[str, Any]) -> Self: - """Create user from API JSON response. + def from_json(cls, json: dict[str, Any], **kwargs: Any) -> Self: + """Create instance from API JSON response. + + Args: + json: API JSON response. + **kwargs: Special values. Returns: - User instance. + Class instance. """ - return cls(response["id"], response["userName"], response["email"]) + cls_kwargs = dict() + + # First: populate with special values + cls_kwargs.update(kwargs) + + # Second: populate with values with discordant key names + key_mapping = getattr(cls, "_key_mapping", {}) + for python_name, api_name in key_mapping.items(): + if python_name not in cls_kwargs: + value = None + target = json + for api_name_part in api_name.split("."): + if api_name_part in target: + value = target[api_name_part] + target = value + if value is not None: + cls_kwargs[python_name] = value + + # Third: populate with remaining dataclass fields + for cls_field in fields(cls): + if cls_field.name not in cls_kwargs and cls_field.name in json: + cls_kwargs[cls_field.name] = json[cls_field.name] + + return cls(**cls_kwargs) + + +@dataclass(kw_only=False) +class User(BaseTowerModel): + """Nextflow Tower user.""" + + id: int + username: str + email: str + + _key_mapping = {"username": "userName"} @dataclass(kw_only=False) -class Organization: +class Organization(BaseTowerModel): """Nextflow Tower organization.""" id: int name: str - @classmethod - def from_json(cls, response: dict[str, Any]) -> Self: - """Create organization from API JSON response. - - Returns: - Organization instance. - """ - return cls(response["orgId"], response["orgName"]) + _key_mapping = {"id": "orgId", "name": "orgName"} @dataclass(kw_only=False) -class Workspace: +class Workspace(BaseTowerModel): """Nextflow Tower workspace.""" id: int name: str org: Organization + _key_mapping = {"id": "workspaceId", "name": "workspaceName"} + @property def full_name(self) -> str: """Fully-qualified workspace name (with organization name).""" return f"{self.org.name}/{self.name}".lower() @classmethod - def from_json(cls, response: dict[str, Any]) -> Self: - """Create workspace from API JSON response. + def from_json(cls, json: dict[str, Any], **kwargs: Any) -> Self: + """Create instance from API JSON response. + + Args: + json: API JSON response. + **kwargs: Special values. Returns: - Workspace instance. + Class instance. """ - org = Organization.from_json(response) - return cls(response["workspaceId"], response["workspaceName"], org) + org = Organization.from_json(json) + return super().from_json(json, org=org, **kwargs) @dataclass(kw_only=False) -class LaunchInfo: +class LaunchInfo(BaseTowerModel): """Nextflow Tower workflow launch specification.""" pipeline: Optional[str] = None @@ -164,7 +205,7 @@ def get(self, name: str) -> Any: raise ValueError(message) return getattr(self, name) - def to_dict(self) -> dict[str, Any]: + def to_json(self) -> dict[str, Any]: """Generate JSON representation of a launch specification. Returns: @@ -182,7 +223,7 @@ def to_dict(self) -> dict[str, Any]: "labelIds": dedup(self.label_ids), "mainScript": None, "optimizationId": None, - "paramsText": json.dumps(self.params), + "paramsText": json_module.dumps(self.params), "pipeline": self.get("pipeline"), "postRunScript": None, "preRunScript": self.pre_run_script, @@ -203,7 +244,7 @@ def to_dict(self) -> dict[str, Any]: @dataclass(kw_only=False) -class Label: +class Label(BaseTowerModel): """Nextflow Tower workflow run label.""" id: int @@ -211,40 +252,17 @@ class Label: value: Optional[str] resource: bool - @classmethod - def from_json(cls, response: dict[str, Any]) -> Self: - """Create label from API JSON response. - - Returns: - Label instance. - """ - return cls(**response) - @dataclass(kw_only=False) -class ComputeEnvSummary: +class ComputeEnvSummary(BaseTowerModel): """Nextflow Tower compute environment summary.""" id: str name: str status: str work_dir: str - raw: dict - @classmethod - def from_json(cls, response: dict[str, Any]) -> Self: - """Create compute environment from API JSON response. - - Returns: - Compute environment instance. - """ - return cls( - response["id"], - response["name"], - response["status"], - response["workDir"], - response, - ) + _key_mapping = {"work_dir": "workDir"} @dataclass(kw_only=False) @@ -255,20 +273,42 @@ class ComputeEnv(ComputeEnvSummary): pre_run_script: str labels: list[Label] + _key_mapping = { + "date_created": "dateCreated", + "work_dir": "config.workDir", + "pre_run_script": "config.preRunScript", + } + @classmethod - def from_json(cls, response: dict[str, Any]) -> Self: - """Create compute environment from API JSON response. + def from_json(cls, json: dict[str, Any], **kwargs: Any) -> Self: + """Create instance from API JSON response. + + Args: + json: API JSON response. + **kwargs: Special values. Returns: - Compute environment instance. + Class instance. """ - return cls( - id=response["id"], - name=response["name"], - status=response["status"], - work_dir=response["config"]["workDir"], - date_created=parse_datetime(response["dateCreated"]), - pre_run_script=response["config"]["preRunScript"], - labels=[Label.from_json(label) for label in response["labels"]], - raw=response, - ) + labels = [Label.from_json(label) for label in json["labels"]] + return super().from_json(json, labels=labels, **kwargs) + + +@dataclass(kw_only=False) +class Workflow(BaseTowerModel): + """Nextflow Tower workflow run details.""" + + id: str + complete: Optional[datetime] + run_name: str + session_id: str + username: str + projectName: str + status: WorkflowStatus + + _key_mapping = { + "run_name": "runName", + "session_id": "sessionId", + "username": "userName", + "project_name": "projectName", + } diff --git a/src/orca/services/nextflowtower/ops.py b/src/orca/services/nextflowtower/ops.py index a5b01bb..cef62e6 100644 --- a/src/orca/services/nextflowtower/ops.py +++ b/src/orca/services/nextflowtower/ops.py @@ -1,5 +1,5 @@ from functools import cached_property -from typing import ClassVar, Optional, cast +from typing import ClassVar, Optional from pydantic.dataclasses import dataclass @@ -8,7 +8,7 @@ from orca.services.nextflowtower.client import NextflowTowerClient from orca.services.nextflowtower.client_factory import NextflowTowerClientFactory from orca.services.nextflowtower.config import NextflowTowerConfig -from orca.services.nextflowtower.models import LaunchInfo, TaskStatus +from orca.services.nextflowtower.models import LaunchInfo, Workflow, WorkflowStatus @dataclass(kw_only=False) @@ -132,19 +132,36 @@ def launch_workflow( return self.client.launch_workflow(launch_info, self.workspace_id) # TODO: Consider switching return value to a namedtuple - def get_workflow_status(self, workflow_id: str) -> tuple[TaskStatus, bool]: - """Gets status of workflow run + def get_workflow_status(self, workflow_id: str) -> tuple[WorkflowStatus, bool]: + """Retrieve status of a workflow run. Args: - workflow_id (str): The ID number for a workflow run to get information about + workflow_id: Workflow run ID. Returns: - tuple: Tuple containing 1. status (str) and - 2. Whether the workflow is done (boolean) + Workflow status and whether the workflow is done. """ - response = self.client.get_workflow( - workspace_id=self.workspace_id, workflow_id=workflow_id - ) - task_status = cast(TaskStatus, response["workflow"]["status"]) - is_done = task_status in TaskStatus.terminal_states.value - return task_status, is_done + workflow = self.client.get_workflow(workflow_id, self.workspace_id) + is_done = workflow.status.value in WorkflowStatus.terminal_states.value + return workflow.status, is_done + + def list_workflows( + self, + search_filter: str = "", + only_orca_launches: bool = True, + ) -> list[Workflow]: + """List available workflows that match search filter. + + Attributes: + search_filter: A Nextflow Tower search query, as you would + compose it in the runs search bar. Defaults to nothing. + only_orca_launches: Whether to filter list of workflows for + those that were launched by Orca. Defaults to True. + + Returns: + List of workflow instances. + """ + if only_orca_launches is None: + search_filter = f"{search_filter} label:{self.launch_label}" + workflows = self.client.list_workflows(search_filter, self.workspace_id) + return workflows diff --git a/tests/services/nextflowtower/responses.py b/tests/services/nextflowtower/responses.py index 187c283..fb093bc 100644 --- a/tests/services/nextflowtower/responses.py +++ b/tests/services/nextflowtower/responses.py @@ -301,3 +301,226 @@ "ownerId": 28, } } + + +list_workflows = { + "workflows": [ + { + "workflow": { + "id": "1QYjg", + "ownerId": 28, + "submit": "2023-05-02T16:31:16Z", + "start": "2023-05-02T16:40:32Z", + "complete": "2023-05-02T17:11:56Z", + "dateCreated": "2023-05-02T16:31:16Z", + "lastUpdated": "2023-05-02T17:11:58Z", + "runName": "hungry_cori", + "sessionId": "18f304ea", + "profile": "test", + "workDir": "s3://orca-service-test-project-tower-scratch/work", + "commitId": "5671b", + "userName": "orca-service-account", + "scriptId": "f421d", + "revision": "3.11.2", + "commandLine": "nextflow run nf-core/rnaseq ...", + "projectName": "nf-core/rnaseq", + "scriptName": "main.nf", + "launchId": "5q0uw", + "status": "SUCCEEDED", + "configFiles": [ + "/.nextflow/assets/nf-core/rnaseq/nextflow.config", + "/nextflow.config", + ], + "params": {}, + "configText": "foo", + "manifest": { + "nextflowVersion": "!>=22.10.1", + "defaultBranch": "master", + "version": "3.11.2", + "homePage": "https://github.com/nf-core/rnaseq", + "gitmodules": None, + "description": "RNA sequencing analysis pipeline ...", + "name": "nf-core/rnaseq", + "mainScript": "main.nf", + "author": "Harshil Patel, Phil Ewels, Rickard Hammarén", + }, + "nextflow": { + "version": "22.10.6", + "build": "5843", + "timestamp": "2023-01-23T23:20:00Z", + }, + "stats": {}, + "errorMessage": None, + "errorReport": None, + "deleted": None, + "peakLoadCpus": None, + "peakLoadTasks": None, + "peakLoadMemory": None, + "projectDir": "/.nextflow/assets/nf-core/rnaseq", + "homeDir": "/root", + "container": "", + "repository": "https://github.com/nf-core/rnaseq", + "containerEngine": None, + "scriptFile": "/.nextflow/assets/nf-core/rnaseq/main.nf", + "launchDir": "/", + "duration": 1895231, + "exitStatus": 0, + "resume": False, + "success": True, + }, + "progress": None, + "orgId": 23810, + "orgName": "Sage-Bionetworks", + "workspaceId": 17703, + "workspaceName": "orca-service-test-project", + "labels": None, + "starred": False, + "optimized": None, + }, + { + "workflow": { + "id": "3a6Fn", + "ownerId": 28, + "submit": "2023-05-01T18:38:10Z", + "start": "2023-05-01T18:46:03Z", + "complete": "2023-05-01T19:16:08Z", + "dateCreated": "2023-05-01T18:38:10Z", + "lastUpdated": "2023-05-01T19:16:10Z", + "runName": "boring_curie", + "sessionId": "2d729ac7", + "profile": "test", + "workDir": "s3://orca-service-test-project-tower-scratch/work", + "commitId": "5671b", + "userName": "orca-service-account", + "scriptId": "f421d", + "revision": "3.11.2", + "commandLine": "nextflow run nf-core/rnaseq -name boring_curie ...", + "projectName": "nf-core/rnaseq", + "scriptName": "main.nf", + "launchId": "2ZAjK", + "status": "SUCCEEDED", + "configFiles": [ + "/.nextflow/assets/nf-core/rnaseq/nextflow.config", + "/nextflow.config", + ], + "params": {}, + "configText": "foo", + "manifest": { + "nextflowVersion": "!>=22.10.1", + "defaultBranch": "master", + "version": "3.11.2", + "homePage": "https://github.com/nf-core/rnaseq", + "gitmodules": None, + "description": "RNA sequencing analysis pipeline for ...", + "name": "nf-core/rnaseq", + "mainScript": "main.nf", + "author": "Harshil Patel, Phil Ewels, Rickard Hammarén", + }, + "nextflow": { + "version": "22.10.6", + "build": "5843", + "timestamp": "2023-01-23T23:20:00Z", + }, + "stats": {}, + "errorMessage": None, + "errorReport": None, + "deleted": None, + "peakLoadCpus": None, + "peakLoadTasks": None, + "peakLoadMemory": None, + "projectDir": "/.nextflow/assets/nf-core/rnaseq", + "homeDir": "/root", + "container": "", + "repository": "https://github.com/nf-core/rnaseq", + "containerEngine": None, + "scriptFile": "/.nextflow/assets/nf-core/rnaseq/main.nf", + "launchDir": "/", + "duration": 1816350, + "exitStatus": 0, + "resume": False, + "success": True, + }, + "progress": None, + "orgId": 23810, + "orgName": "Sage-Bionetworks", + "workspaceId": 17703, + "workspaceName": "orca-service-test-project", + "labels": None, + "starred": False, + "optimized": None, + }, + { + "workflow": { + "id": "4XdxI", + "ownerId": 28, + "submit": "2023-04-28T22:33:26Z", + "start": "2023-04-28T22:39:53Z", + "complete": "2023-04-28T23:05:47Z", + "dateCreated": "2023-04-28T22:33:26Z", + "lastUpdated": "2023-04-28T23:05:50Z", + "runName": "gloomy_aryabhata", + "sessionId": "ced632a8", + "profile": "test", + "workDir": "s3://orca-service-test-project-tower-scratch/work", + "commitId": "5671b", + "userName": "orca-service-account", + "scriptId": "f421d", + "revision": "3.11.2", + "commandLine": "nextflow run nf-core/rnaseq -name gloomy_aryabhata ...", + "projectName": "nf-core/rnaseq", + "scriptName": "main.nf", + "launchId": "Qli6s", + "status": "SUCCEEDED", + "configFiles": [ + "/.nextflow/assets/nf-core/rnaseq/nextflow.config", + "/nextflow.config", + ], + "params": {}, + "configText": "foo", + "manifest": { + "nextflowVersion": "!>=22.10.1", + "defaultBranch": "master", + "version": "3.11.2", + "homePage": "https://github.com/nf-core/rnaseq", + "gitmodules": None, + "description": "RNA sequencing analysis pipeline for ...", + "name": "nf-core/rnaseq", + "mainScript": "main.nf", + "author": "Harshil Patel, Phil Ewels, Rickard Hammarén", + }, + "nextflow": { + "version": "22.10.6", + "build": "5843", + "timestamp": "2023-01-23T23:20:00Z", + }, + "stats": {}, + "errorMessage": None, + "errorReport": None, + "deleted": None, + "peakLoadCpus": None, + "peakLoadTasks": None, + "peakLoadMemory": None, + "projectDir": "/.nextflow/assets/nf-core/rnaseq", + "homeDir": "/root", + "container": "", + "repository": "https://github.com/nf-core/rnaseq", + "containerEngine": None, + "scriptFile": "/.nextflow/assets/nf-core/rnaseq/main.nf", + "launchDir": "/", + "duration": 1571074, + "exitStatus": 0, + "resume": False, + "success": True, + }, + "progress": None, + "orgId": 23810, + "orgName": "Sage-Bionetworks", + "workspaceId": 17703, + "workspaceName": "orca-service-test-project", + "labels": None, + "starred": False, + "optimized": None, + }, + ], + "totalSize": 3, +} diff --git a/tests/services/nextflowtower/test_client.py b/tests/services/nextflowtower/test_client.py index 3e9a88a..f2c2d23 100644 --- a/tests/services/nextflowtower/test_client.py +++ b/tests/services/nextflowtower/test_client.py @@ -133,9 +133,17 @@ def test_that_launch_workflow_works(client, mocker, get_response): def test_that_get_workflow_returns_expected_response(client, mocker, get_response): - expected = get_response("get_workflow") - mock = mocker.patch.object(client, "get") - mock.return_value = expected + response = get_response("get_workflow") + mock = mocker.patch.object(client, "request_json") + mock.return_value = response actual = client.get_workflow(workspace_id=98765, workflow_id="123456789") mock.assert_called_once() - assert actual == expected + assert actual == models.Workflow.from_json(response["workflow"]) + + +def test_that_list_workflows_works(client, mocker, get_response): + mock = mocker.patch.object(client, "request_json") + mock.return_value = get_response("list_workflows") + result = client.list_workflows() + mock.assert_called() + assert len(result) == 3 diff --git a/tests/services/nextflowtower/test_enums.py b/tests/services/nextflowtower/test_enums.py index bece47f..7c69a82 100644 --- a/tests/services/nextflowtower/test_enums.py +++ b/tests/services/nextflowtower/test_enums.py @@ -1,17 +1,17 @@ -from orca.services.nextflowtower.models import TaskStatus +from orca.services.nextflowtower.models import WorkflowStatus def test_that_TaskStatus_contant_values_are_correct(): - assert TaskStatus.SUBMITTED.value == "SUBMITTED" - assert TaskStatus.RUNNING.value == "RUNNING" - assert TaskStatus.SUCCEEDED.value == "SUCCEEDED" - assert TaskStatus.FAILED.value == "FAILED" - assert TaskStatus.CANCELLED.value == "CANCELLED" - assert TaskStatus.UNKNOWN.value == "UNKNOWN" + assert WorkflowStatus.SUBMITTED.value == "SUBMITTED" + assert WorkflowStatus.RUNNING.value == "RUNNING" + assert WorkflowStatus.SUCCEEDED.value == "SUCCEEDED" + assert WorkflowStatus.FAILED.value == "FAILED" + assert WorkflowStatus.CANCELLED.value == "CANCELLED" + assert WorkflowStatus.UNKNOWN.value == "UNKNOWN" def test_that_TaskStatus_terminal_states_are_in_terminal_states_list(): - assert TaskStatus.SUCCEEDED.value in TaskStatus.terminal_states.value - assert TaskStatus.FAILED.value in TaskStatus.terminal_states.value - assert TaskStatus.CANCELLED.value in TaskStatus.terminal_states.value - assert TaskStatus.UNKNOWN.value in TaskStatus.terminal_states.value + assert WorkflowStatus.SUCCEEDED.value in WorkflowStatus.terminal_states.value + assert WorkflowStatus.FAILED.value in WorkflowStatus.terminal_states.value + assert WorkflowStatus.CANCELLED.value in WorkflowStatus.terminal_states.value + assert WorkflowStatus.UNKNOWN.value in WorkflowStatus.terminal_states.value diff --git a/tests/services/nextflowtower/test_models.py b/tests/services/nextflowtower/test_models.py index c474ce7..f56cf76 100644 --- a/tests/services/nextflowtower/test_models.py +++ b/tests/services/nextflowtower/test_models.py @@ -54,13 +54,13 @@ def test_that_launch_info_can_be_created_with_resume_enabled(): def test_that_launch_info_can_be_serialized_with_resume_enabled(launch_info): launch_info.resume = True launch_info.session_id = "foo" - json = launch_info.to_dict() + json = launch_info.to_json() assert "resume" in json["launch"] assert "sessionId" in json["launch"] def test_that_launch_info_can_be_serialized_with_resume_disabled(launch_info): - json = launch_info.to_dict() + json = launch_info.to_json() assert "resume" not in json["launch"] assert "sessionId" not in json["launch"] diff --git a/tests/services/nextflowtower/test_ops.py b/tests/services/nextflowtower/test_ops.py index 4f8cd16..906559e 100644 --- a/tests/services/nextflowtower/test_ops.py +++ b/tests/services/nextflowtower/test_ops.py @@ -142,11 +142,12 @@ def test_that_get_workflow_status_returns_expected_tuple_workflow_is_complete( mocker, get_response, mocked_ops ): response = get_response("get_workflow") + expected = models.Workflow.from_json(response["workflow"]) mock = mocker.patch.object(mocked_ops, "client") - mock.get_workflow.return_value = response + mock.get_workflow.return_value = expected result = mocked_ops.get_workflow_status(workflow_id="123456789") mock.get_workflow.assert_called_once() - assert result == ("SUCCEEDED", True) + assert result == (models.WorkflowStatus("SUCCEEDED"), True) def test_that_get_workflow_status_returns_expected_tuple_workflow_is_not_complete( @@ -155,8 +156,9 @@ def test_that_get_workflow_status_returns_expected_tuple_workflow_is_not_complet response = get_response("get_workflow") response["workflow"]["complete"] = None response["workflow"]["status"] = "SUBMITTED" + expected = models.Workflow.from_json(response["workflow"]) mock = mocker.patch.object(mocked_ops, "client") - mock.get_workflow.return_value = response + mock.get_workflow.return_value = expected result = mocked_ops.get_workflow_status(workflow_id="123456789") mock.get_workflow.assert_called_once() - assert result == ("SUBMITTED", False) + assert result == (models.WorkflowStatus("SUBMITTED"), False) From 5ba526631e128353717dd8f34bd3890d0bdc4277 Mon Sep 17 00:00:00 2001 From: Bruno Grande Date: Wed, 3 May 2023 15:42:14 -0700 Subject: [PATCH 3/5] Make Tower `launch_workflow()` op idempotent --- src/orca/services/nextflowtower/client.py | 6 +- src/orca/services/nextflowtower/models.py | 76 +++++----- src/orca/services/nextflowtower/ops.py | 88 ++++++++++- src/orca/services/nextflowtower/utils.py | 53 ++++++- tests/services/nextflowtower/conftest.py | 1 + tests/services/nextflowtower/responses.py | 93 ++++++------ tests/services/nextflowtower/test_client.py | 3 +- .../nextflowtower/test_integration.py | 20 ++- tests/services/nextflowtower/test_ops.py | 141 +++++++++++++++++- tests/services/nextflowtower/test_utils.py | 44 ++++++ 10 files changed, 424 insertions(+), 101 deletions(-) diff --git a/src/orca/services/nextflowtower/client.py b/src/orca/services/nextflowtower/client.py index 2690e93..5d14c1f 100644 --- a/src/orca/services/nextflowtower/client.py +++ b/src/orca/services/nextflowtower/client.py @@ -83,7 +83,11 @@ def request_json(self, method: str, path: str, **kwargs) -> dict[str, Any]: A dictionary from deserializing the JSON response. """ response = self.request(method, path, **kwargs) - response.raise_for_status() + try: + response.raise_for_status() + except HTTPError as e: + # Add extra context if possible + raise HTTPError(response.text) from e return response.json() def request_paged(self, method: str, path: str, **kwargs) -> dict[str, Any]: diff --git a/src/orca/services/nextflowtower/models.py b/src/orca/services/nextflowtower/models.py index 9b70057..3920519 100644 --- a/src/orca/services/nextflowtower/models.py +++ b/src/orca/services/nextflowtower/models.py @@ -1,5 +1,5 @@ import json as json_module -from dataclasses import field, fields +from dataclasses import KW_ONLY, field, fields from datetime import datetime from enum import Enum from typing import Any, ClassVar, Iterable, Optional @@ -8,7 +8,7 @@ from pydantic.dataclasses import dataclass from typing_extensions import Self -from orca.services.nextflowtower.utils import dedup +from orca.services.nextflowtower.utils import dedup, get_nested class WorkflowStatus(Enum): @@ -38,6 +38,9 @@ class BaseTowerModel: Only discordant names need to be listed. """ + _: KW_ONLY + raw: Optional[dict[str, Any]] = field(default=None, repr=False, compare=False) + _key_mapping: ClassVar[dict[str, str]] @classmethod @@ -51,31 +54,37 @@ def from_json(cls, json: dict[str, Any], **kwargs: Any) -> Self: Returns: Class instance. """ - cls_kwargs = dict() - - # First: populate with special values - cls_kwargs.update(kwargs) + cls_kwargs = {"raw": json} - # Second: populate with values with discordant key names + # Populate with values with discordant key names key_mapping = getattr(cls, "_key_mapping", {}) for python_name, api_name in key_mapping.items(): - if python_name not in cls_kwargs: - value = None - target = json - for api_name_part in api_name.split("."): - if api_name_part in target: - value = target[api_name_part] - target = value - if value is not None: - cls_kwargs[python_name] = value - - # Third: populate with remaining dataclass fields + cls_kwargs[python_name] = get_nested(json, api_name) + + # Populate with remaining dataclass fields for cls_field in fields(cls): if cls_field.name not in cls_kwargs and cls_field.name in json: cls_kwargs[cls_field.name] = json[cls_field.name] + # Populate (and override) with special values + cls_kwargs.update(kwargs) + return cls(**cls_kwargs) + def get(self, name: str) -> Any: + """Retrieve attribute value, which cannot be None. + + Args: + name: Atribute name. + + Returns: + Attribute value (not None). + """ + if getattr(self, name, None) is None: + message = f"Attribute '{name}' must be set (not None) by this point." + raise ValueError(message) + return getattr(self, name) + @dataclass(kw_only=False) class User(BaseTowerModel): @@ -136,10 +145,10 @@ class LaunchInfo(BaseTowerModel): compute_env_id: Optional[str] = None work_dir: Optional[str] = None revision: Optional[str] = None - params: Optional[dict] = None nextflow_config: Optional[str] = None run_name: Optional[str] = None pre_run_script: Optional[str] = None + params: Optional[dict] = None profiles: list[str] = field(default_factory=list) user_secrets: list[str] = field(default_factory=list) workspace_secrets: list[str] = field(default_factory=list) @@ -167,7 +176,7 @@ def check_resume_and_session_id(cls, values: dict[str, Any]): return values def fill_in(self, attr: str, value: Any): - """Fill in any missing values. + """Fill in any missing or falsy values. Args: attr: Attribute name. @@ -191,20 +200,6 @@ def add_in(self, attr: str, values: Iterable[Any]): updated_values = dedup(updated_values) setattr(self, attr, updated_values) - def get(self, name: str) -> Any: - """Retrieve attribute value, which cannot be None. - - Args: - name: Atribute name. - - Returns: - Attribute value (not None). - """ - if getattr(self, name, None) is None: - message = f"Attribute '{name}' must be set (not None) by this point." - raise ValueError(message) - return getattr(self, name) - def to_json(self) -> dict[str, Any]: """Generate JSON representation of a launch specification. @@ -223,7 +218,7 @@ def to_json(self) -> dict[str, Any]: "labelIds": dedup(self.label_ids), "mainScript": None, "optimizationId": None, - "paramsText": json_module.dumps(self.params), + "paramsText": json_module.dumps(self.params) if self.params else "", "pipeline": self.get("pipeline"), "postRunScript": None, "preRunScript": self.pre_run_script, @@ -300,15 +295,26 @@ class Workflow(BaseTowerModel): id: str complete: Optional[datetime] + submit: Optional[datetime] run_name: str session_id: str username: str projectName: str + work_dir: str status: WorkflowStatus + params: Optional[dict[str, Any]] + commit_id: Optional[str] _key_mapping = { "run_name": "runName", "session_id": "sessionId", "username": "userName", "project_name": "projectName", + "work_dir": "workDir", + "commit_id": "commitId", } + + @property + def is_done(self) -> bool: + """Whether the workflow is done running.""" + return self.status.value in WorkflowStatus.terminal_states.value diff --git a/src/orca/services/nextflowtower/ops.py b/src/orca/services/nextflowtower/ops.py index cef62e6..1a5911a 100644 --- a/src/orca/services/nextflowtower/ops.py +++ b/src/orca/services/nextflowtower/ops.py @@ -9,6 +9,7 @@ from orca.services.nextflowtower.client_factory import NextflowTowerClientFactory from orca.services.nextflowtower.config import NextflowTowerConfig from orca.services.nextflowtower.models import LaunchInfo, Workflow, WorkflowStatus +from orca.services.nextflowtower.utils import increment_suffix @dataclass(kw_only=False) @@ -104,6 +105,7 @@ def launch_workflow( self, launch_info: LaunchInfo, compute_env_filter: Optional[str] = None, + ignore_previous_runs: bool = False, ) -> str: """Launch a workflow using the latest matching compute env. @@ -111,10 +113,32 @@ def launch_workflow( launch_info: Workflow launch information. compute_env_filter: Filter for matching compute environments. Default to None. + ignore_previous_runs: Whether to ignore previous + workflow runs with the same attributes. Note + that enabling this might result in duplicate + workflow runs. Returns: Workflow run ID. """ + # Make sure that essential attributes are set + if launch_info.pipeline is None or launch_info.run_name is None: + message = "LaunchInfo 'run_name' and 'pipeline' attributes must be set." + raise ValueError(message) + + # Update launch_info if there are previous workflow runs + if not ignore_previous_runs: + latest_run = self.get_latest_previous_workflow(launch_info) + if latest_run: + # Return ID for latest run if ongoing, succeeded, or cancelled + skip_statuses = {"SUCCEEDED", "CANCELLED"} + if not latest_run.is_done or latest_run.status.value in skip_statuses: + return latest_run.id + launch_info.fill_in("resume", True) + launch_info.fill_in("session_id", latest_run.session_id) + launch_info.run_name = increment_suffix(launch_info.run_name) + + # Get relevant compute environment and its resource tags compute_env_id = self.get_latest_compute_env(compute_env_filter) compute_env = self.client.get_compute_env(compute_env_id, self.workspace_id) label_ids = [label.id for label in compute_env.labels] @@ -123,8 +147,9 @@ def launch_workflow( query_label_id = self.create_label(self.launch_label) label_ids.append(query_label_id) + # TODO: Fill in revision using '/pipelines/info' endpoint # Update launch_info with compute_env defaults and label ID - launch_info.fill_in("compute_env_id", compute_env_id) + launch_info.fill_in("compute_env_id", compute_env.id) launch_info.fill_in("work_dir", compute_env.work_dir) launch_info.fill_in("pre_run_script", compute_env.pre_run_script) launch_info.add_in("label_ids", label_ids) @@ -145,11 +170,7 @@ def get_workflow_status(self, workflow_id: str) -> tuple[WorkflowStatus, bool]: is_done = workflow.status.value in WorkflowStatus.terminal_states.value return workflow.status, is_done - def list_workflows( - self, - search_filter: str = "", - only_orca_launches: bool = True, - ) -> list[Workflow]: + def list_workflows(self, search_filter: str = "") -> list[Workflow]: """List available workflows that match search filter. Attributes: @@ -161,7 +182,60 @@ def list_workflows( Returns: List of workflow instances. """ - if only_orca_launches is None: + if self.launch_label is not None: search_filter = f"{search_filter} label:{self.launch_label}" workflows = self.client.list_workflows(search_filter, self.workspace_id) return workflows + + def list_previous_workflows(self, launch_info: LaunchInfo) -> list[Workflow]: + """Retrieve the list of previously launched workflows. + + Args: + launch_info: Workflow launch information. + + Returns: + List of previously launched workflows. + """ + workflows = self.list_workflows() + + previous_workflows = list() + for workflow in workflows: + if workflow.projectName != launch_info.pipeline: + continue + + # TODO: Rename `run_name` to `unique_id` (or similar) + prefix = launch_info.run_name + if prefix and not workflow.run_name.startswith(prefix): + continue + + previous_workflows.append(workflow) + + return previous_workflows + + def get_latest_previous_workflow( + self, + launch_info: LaunchInfo, + ) -> Optional[Workflow]: + """Retrieve the latest run among previously launched workflows. + + Args: + launch_info: Workflow launch information. + + Returns: + Latest run among previously launched workflows. + """ + previous_runs = self.list_previous_workflows(launch_info) + if len(previous_runs) == 0: + return None + + # First check and return any ongoing runs + ongoing_runs = [run for run in previous_runs if not run.is_done] + if len(ongoing_runs) > 1: # pragma: no cover + message = f"Multiple ongoing workflow runs: {ongoing_runs}" + raise ValueError(message) + elif len(ongoing_runs) == 1: + return ongoing_runs[0] + + # Otherwise, return latest based on submission timestamp + sorted_runs = sorted(previous_runs, key=lambda x: x.get("submit")) + return sorted_runs[-1] diff --git a/src/orca/services/nextflowtower/utils.py b/src/orca/services/nextflowtower/utils.py index c626e39..a36c4b7 100644 --- a/src/orca/services/nextflowtower/utils.py +++ b/src/orca/services/nextflowtower/utils.py @@ -1,6 +1,6 @@ from collections.abc import Collection from datetime import datetime, timezone -from typing import TypeVar +from typing import Any, TypeVar T = TypeVar("T", int, str) @@ -29,3 +29,54 @@ def dedup(items: Collection[T]) -> list[T]: Deduplicated collection or None. """ return list(set(items)) + + +def increment_suffix(text: str, separator: str = "_") -> str: + """Increment integer suffix for a string. + + Args: + text: Text (already integer-suffixed or not). + separator: Separator between text and suffix. + Defaults to underscore. + + Returns: + Incremented suffixed text. + """ + prefix, sep, suffix = text.rpartition(separator) + # "foo".rpartition("_") -> ('', '', 'foo') + if sep == "": + return f"{text}{separator}2" + # "foo_".rpartition("_") -> ('foo', '_', '') + elif suffix == "": + return f"{text}2" + # "foo_1".rpartition("_") -> ('foo', '_', '1') + elif suffix.isdigit(): + suffix_int = int(suffix) + incremented = suffix_int + 1 + return f"{prefix}{sep}{incremented}" + # "foo_bar".rpartition("_") -> ('foo', '_', 'bar') + else: + return f"{text}{separator}2" + + +def get_nested(dictionary: dict[str, Any], keys: str) -> Any: + """Retrieve nested value in a dictionary. + + Args: + dictionary: Dictionary (nested or not). + keys: Period-delimited list of keys + (one per dictionary level). + + Raises: + ValueError: If the keys don't resolve to a value. + + Returns: + Nested value corresponding to the list of keys. + """ + target = dictionary + for api_name_part in keys.split("."): + if api_name_part not in target: + message = f"Keys ({keys}) don't resolve to a value in: {dictionary}" + raise ValueError(message) + target = target[api_name_part] + return target diff --git a/tests/services/nextflowtower/conftest.py b/tests/services/nextflowtower/conftest.py index ebc27bc..35adc0e 100644 --- a/tests/services/nextflowtower/conftest.py +++ b/tests/services/nextflowtower/conftest.py @@ -27,6 +27,7 @@ def ops(config): yield NextflowTowerOps(config) +# TODO: Mock `client` using a property mock @pytest.fixture def mocked_ops(config, client, mocker): mocker.patch.object(NextflowTowerOps, "client", return_value=client) diff --git a/tests/services/nextflowtower/responses.py b/tests/services/nextflowtower/responses.py index fb093bc..e4151d3 100644 --- a/tests/services/nextflowtower/responses.py +++ b/tests/services/nextflowtower/responses.py @@ -303,35 +303,38 @@ } +# The following list of workflows includes a standalone workflow and a +# pair of workflows, one having been relaunched from the other. +# Some long fields (params, configText, stats) have been truncated. list_workflows = { "workflows": [ { "workflow": { - "id": "1QYjg", - "ownerId": 28, - "submit": "2023-05-02T16:31:16Z", - "start": "2023-05-02T16:40:32Z", - "complete": "2023-05-02T17:11:56Z", - "dateCreated": "2023-05-02T16:31:16Z", - "lastUpdated": "2023-05-02T17:11:58Z", - "runName": "hungry_cori", + "id": "4qqHV", + "ownerId": 2, + "submit": "2023-05-03T17:12:28Z", + "start": "2023-05-03T17:19:40Z", + "complete": None, + "dateCreated": "2023-05-03T17:12:28Z", + "lastUpdated": "2023-05-03T17:19:40Z", + "runName": "hungry_cori_2", "sessionId": "18f304ea", "profile": "test", "workDir": "s3://orca-service-test-project-tower-scratch/work", - "commitId": "5671b", - "userName": "orca-service-account", + "commitId": "5671b65af97fe78a2f9b4d05d850304918b1b86e", + "userName": "bgrande", "scriptId": "f421d", "revision": "3.11.2", - "commandLine": "nextflow run nf-core/rnaseq ...", + "commandLine": "nextflow run nf-core/rnaseq -name hungry_cori_2 ...", "projectName": "nf-core/rnaseq", "scriptName": "main.nf", - "launchId": "5q0uw", - "status": "SUCCEEDED", + "launchId": "weG6r", + "status": "RUNNING", "configFiles": [ "/.nextflow/assets/nf-core/rnaseq/nextflow.config", "/nextflow.config", ], - "params": {}, + "params": {"outdir": "foo"}, "configText": "foo", "manifest": { "nextflowVersion": "!>=22.10.1", @@ -339,7 +342,7 @@ "version": "3.11.2", "homePage": "https://github.com/nf-core/rnaseq", "gitmodules": None, - "description": "RNA sequencing analysis pipeline ...", + "description": "RNA sequencing analysis pipeline for ...", "name": "nf-core/rnaseq", "mainScript": "main.nf", "author": "Harshil Patel, Phil Ewels, Rickard Hammarén", @@ -349,7 +352,7 @@ "build": "5843", "timestamp": "2023-01-23T23:20:00Z", }, - "stats": {}, + "stats": None, "errorMessage": None, "errorReport": None, "deleted": None, @@ -363,10 +366,10 @@ "containerEngine": None, "scriptFile": "/.nextflow/assets/nf-core/rnaseq/main.nf", "launchDir": "/", - "duration": 1895231, - "exitStatus": 0, - "resume": False, - "success": True, + "duration": None, + "exitStatus": None, + "resume": True, + "success": None, }, "progress": None, "orgId": 23810, @@ -379,31 +382,31 @@ }, { "workflow": { - "id": "3a6Fn", + "id": "1QYjg", "ownerId": 28, - "submit": "2023-05-01T18:38:10Z", - "start": "2023-05-01T18:46:03Z", - "complete": "2023-05-01T19:16:08Z", - "dateCreated": "2023-05-01T18:38:10Z", - "lastUpdated": "2023-05-01T19:16:10Z", - "runName": "boring_curie", - "sessionId": "2d729ac7", + "submit": "2023-05-02T16:31:16Z", + "start": "2023-05-02T16:40:32Z", + "complete": "2023-05-02T17:11:56Z", + "dateCreated": "2023-05-02T16:31:16Z", + "lastUpdated": "2023-05-02T17:11:58Z", + "runName": "hungry_cori", + "sessionId": "18f304ea", "profile": "test", "workDir": "s3://orca-service-test-project-tower-scratch/work", "commitId": "5671b", "userName": "orca-service-account", "scriptId": "f421d", "revision": "3.11.2", - "commandLine": "nextflow run nf-core/rnaseq -name boring_curie ...", + "commandLine": "nextflow run nf-core/rnaseq ...", "projectName": "nf-core/rnaseq", "scriptName": "main.nf", - "launchId": "2ZAjK", + "launchId": "5q0uw", "status": "SUCCEEDED", "configFiles": [ "/.nextflow/assets/nf-core/rnaseq/nextflow.config", "/nextflow.config", ], - "params": {}, + "params": {"outdir": "foo"}, "configText": "foo", "manifest": { "nextflowVersion": "!>=22.10.1", @@ -411,7 +414,7 @@ "version": "3.11.2", "homePage": "https://github.com/nf-core/rnaseq", "gitmodules": None, - "description": "RNA sequencing analysis pipeline for ...", + "description": "RNA sequencing analysis pipeline ...", "name": "nf-core/rnaseq", "mainScript": "main.nf", "author": "Harshil Patel, Phil Ewels, Rickard Hammarén", @@ -435,7 +438,7 @@ "containerEngine": None, "scriptFile": "/.nextflow/assets/nf-core/rnaseq/main.nf", "launchDir": "/", - "duration": 1816350, + "duration": 1895231, "exitStatus": 0, "resume": False, "success": True, @@ -451,31 +454,31 @@ }, { "workflow": { - "id": "4XdxI", + "id": "3a6Fn", "ownerId": 28, - "submit": "2023-04-28T22:33:26Z", - "start": "2023-04-28T22:39:53Z", - "complete": "2023-04-28T23:05:47Z", - "dateCreated": "2023-04-28T22:33:26Z", - "lastUpdated": "2023-04-28T23:05:50Z", - "runName": "gloomy_aryabhata", - "sessionId": "ced632a8", + "submit": "2023-05-01T18:38:10Z", + "start": "2023-05-01T18:46:03Z", + "complete": "2023-05-01T19:16:08Z", + "dateCreated": "2023-05-01T18:38:10Z", + "lastUpdated": "2023-05-01T19:16:10Z", + "runName": "boring_curie", + "sessionId": "2d729ac7", "profile": "test", "workDir": "s3://orca-service-test-project-tower-scratch/work", "commitId": "5671b", "userName": "orca-service-account", "scriptId": "f421d", "revision": "3.11.2", - "commandLine": "nextflow run nf-core/rnaseq -name gloomy_aryabhata ...", + "commandLine": "nextflow run nf-core/rnaseq -name boring_curie ...", "projectName": "nf-core/rnaseq", "scriptName": "main.nf", - "launchId": "Qli6s", + "launchId": "2ZAjK", "status": "SUCCEEDED", "configFiles": [ "/.nextflow/assets/nf-core/rnaseq/nextflow.config", "/nextflow.config", ], - "params": {}, + "params": {"outdir": "foo"}, "configText": "foo", "manifest": { "nextflowVersion": "!>=22.10.1", @@ -507,7 +510,7 @@ "containerEngine": None, "scriptFile": "/.nextflow/assets/nf-core/rnaseq/main.nf", "launchDir": "/", - "duration": 1571074, + "duration": 1816350, "exitStatus": 0, "resume": False, "success": True, diff --git a/tests/services/nextflowtower/test_client.py b/tests/services/nextflowtower/test_client.py index f2c2d23..fd26c6e 100644 --- a/tests/services/nextflowtower/test_client.py +++ b/tests/services/nextflowtower/test_client.py @@ -41,7 +41,7 @@ def test_that_get_user_info_works(client, mocker, get_response): assert actual == models.User.from_json(expected["user"]) -def test_that_get_user_info_fails_with_nonstandard_response(client, mocker): +def test_that_get_user_info_fails_with_400_response(client, mocker): mock = mocker.patch.object(client, "request_json") mock.return_value = {"message": "foobar"} with pytest.raises(HTTPError): @@ -125,6 +125,7 @@ def test_that_launch_workflow_works(client, mocker, get_response): launch_spec = models.LaunchInfo( compute_env_id="foo", pipeline="bar", + run_name="foobar", work_dir="s3://path", profiles=["test"], ) diff --git a/tests/services/nextflowtower/test_integration.py b/tests/services/nextflowtower/test_integration.py index 1e64972..2d1b6d7 100644 --- a/tests/services/nextflowtower/test_integration.py +++ b/tests/services/nextflowtower/test_integration.py @@ -34,15 +34,21 @@ def test_that_a_valid_client_can_be_constructed_and_tested(client): assert client.list_user_workspaces() -@pytest.mark.cost @pytest.mark.integration def test_that_a_workflow_can_be_launched(ops): - scratch_bucket = "s3://orca-service-test-project-tower-scratch/" launch_info = models.LaunchInfo( - pipeline="nf-core/rnaseq", - revision="3.11.2", - profiles=["test"], - params={"outdir": f"{scratch_bucket}/2days/launch_test"}, + pipeline="nextflow-io/hello", + run_name="test_launch_workflow", ) - workflow_id = ops.launch_workflow(launch_info, "ondemand") + workflow_id = ops.launch_workflow(launch_info, "spot", ignore_previous_runs=True) + assert workflow_id + + +@pytest.mark.integration +def test_that_a_workflow_can_be_relaunched(ops): + launch_info = models.LaunchInfo( + pipeline="nextflow-io/hello", + run_name="test_relaunch_workflow", + ) + workflow_id = ops.launch_workflow(launch_info, "spot") assert workflow_id diff --git a/tests/services/nextflowtower/test_ops.py b/tests/services/nextflowtower/test_ops.py index 906559e..4b4c53f 100644 --- a/tests/services/nextflowtower/test_ops.py +++ b/tests/services/nextflowtower/test_ops.py @@ -121,6 +121,7 @@ def test_that_launch_workflow_works(mocked_ops, get_response, mocker): launch_info = models.LaunchInfo( compute_env_id="5ykJF", pipeline="some/pipeline", + run_name="foobar", revision="1.1", profiles=["test"], params={"outdir": "foo"}, @@ -129,13 +130,12 @@ def test_that_launch_workflow_works(mocked_ops, get_response, mocker): mocker.patch.object(mocked_ops, "get_latest_compute_env") - compute_env_response = get_response("get_compute_env")["computeEnv"] - compute_env = models.ComputeEnv.from_json(compute_env_response) - mocked_ops.client.get_compute_env.return_value = compute_env + response = get_response("get_compute_env") + mocker.patch.object(mocked_ops.client, "get", return_value=response) mocked_ops.launch_workflow(launch_info, "ondemand") mocked_ops.client.launch_workflow.assert_called_once() - assert launch_info.compute_env_id == compute_env.id + assert launch_info.compute_env_id == response["computeEnv"]["id"] def test_that_get_workflow_status_returns_expected_tuple_workflow_is_complete( @@ -162,3 +162,136 @@ def test_that_get_workflow_status_returns_expected_tuple_workflow_is_not_complet result = mocked_ops.get_workflow_status(workflow_id="123456789") mock.get_workflow.assert_called_once() assert result == (models.WorkflowStatus("SUBMITTED"), False) + + +def test_that_list_workflows_filters_on_launch_label(mocked_ops, mocker): + mock = mocker.patch.object(mocked_ops.client, "list_workflows") + mocked_ops.list_workflows() + mock.assert_called_once() + search_filter = mock.call_args.args[0] + assert f"label:{mocked_ops.launch_label}" in search_filter + + +def test_that_list_workflows_doesnt_filter_on_launch_label_when_absent( + mocked_ops, mocker +): + mock = mocker.patch.object(mocked_ops.client, "list_workflows") + mocked_ops.launch_label = None + mocked_ops.list_workflows() + mock.assert_called_once() + search_filter = mock.call_args.args[0] + assert f"label:{mocked_ops.launch_label}" not in search_filter + + +def test_that_list_previous_workflows_matches_the_right_entries( + mocked_ops, client, mocker, get_response +): + mock = mocker.patch.object(client, "get") + mock.return_value = get_response("list_workflows") + workflows = client.list_workflows() + + launch_info = models.LaunchInfo( + pipeline="nf-core/rnaseq", + revision="3.11.2", + profiles=["test"], + run_name="hungry_cori", + params={"outdir": "foo"}, + ) + mocker.patch.object(mocked_ops, "list_workflows", return_value=workflows) + result = mocked_ops.list_previous_workflows(launch_info) + assert len(result) == 2 + + +def test_that_get_latest_previous_workflow_returns_an_ongoing_run( + mocked_ops, client, mocker, get_response +): + mock = mocker.patch.object(client, "get") + mock.return_value = get_response("get_workflow") + workflow = client.get_workflow("foo") + workflow.status = models.WorkflowStatus("RUNNING") + + launch_info = models.LaunchInfo( + pipeline="nf-core/rnaseq", + revision="3.11.2", + profiles=["test"], + run_name="hungry_cori", + params={"outdir": "foo"}, + ) + mocker.patch.object(mocked_ops, "list_previous_workflows", return_value=[workflow]) + result = mocked_ops.get_latest_previous_workflow(launch_info) + assert result.id == workflow.id + + +def test_for_an_error_when_launching_a_workflow_without_a_run_name(mocked_ops): + launch_info = models.LaunchInfo(pipeline="nf-core/rnaseq") + with pytest.raises(ValueError): + mocked_ops.launch_workflow(launch_info) + + +def test_for_an_error_when_launching_a_workflow_without_a_pipeline(mocked_ops): + launch_info = models.LaunchInfo(run_name="foobar") + with pytest.raises(ValueError): + mocked_ops.launch_workflow(launch_info) + + +def test_that_launch_workflow_considers_previous_runs( + mocked_ops, client, mocker, get_response +): + wf_mock = mocker.patch.object(client, "get") + wf_mock.return_value = get_response("get_workflow") + workflow = client.get_workflow("foo") + workflow.status = models.WorkflowStatus("FAILED") + + latest_wf_mock = mocker.patch.object(mocked_ops, "get_latest_previous_workflow") + latest_wf_mock.return_value = workflow + + mocker.patch.object(mocked_ops, "get_latest_compute_env") + + latest_ce_mock = mocker.patch.object(client, "get") + latest_ce_mock.return_value = get_response("get_compute_env") + compute_env = client.get_compute_env("foo") + mocker.patch.object(mocked_ops.client, "get_compute_env", return_value=compute_env) + + mocker.patch.object(mocked_ops, "create_label", return_value=123) + + launch_info = models.LaunchInfo( + pipeline="nextflow-io/example-workflow", + run_name="example-run", + ) + + launch_mock = mocker.patch.object(mocked_ops.client, "launch_workflow") + mocked_ops.launch_workflow(launch_info) + + launch_mock.assert_called_once() + assert launch_info.run_name == "example-run_2" + assert launch_info.resume + assert launch_info.session_id == workflow.session_id + + +def test_that_launch_workflow_works_when_there_are_no_previous_runs( + mocked_ops, client, mocker, get_response +): + latest_wf_mock = mocker.patch.object(mocked_ops, "get_latest_previous_workflow") + latest_wf_mock.return_value = None + + mocker.patch.object(mocked_ops, "get_latest_compute_env") + + latest_ce_mock = mocker.patch.object(client, "get") + latest_ce_mock.return_value = get_response("get_compute_env") + compute_env = client.get_compute_env("foo") + mocker.patch.object(mocked_ops.client, "get_compute_env", return_value=compute_env) + + mocker.patch.object(mocked_ops, "create_label", return_value=123) + + launch_info = models.LaunchInfo( + pipeline="nextflow-io/example-workflow", + run_name="example-run", + ) + + launch_mock = mocker.patch.object(mocked_ops.client, "launch_workflow") + mocked_ops.launch_workflow(launch_info) + + launch_mock.assert_called_once() + assert launch_info.run_name == "example-run" + assert not launch_info.resume + assert launch_info.session_id is None diff --git a/tests/services/nextflowtower/test_utils.py b/tests/services/nextflowtower/test_utils.py index 8c64ef2..c535da3 100644 --- a/tests/services/nextflowtower/test_utils.py +++ b/tests/services/nextflowtower/test_utils.py @@ -1,5 +1,7 @@ from datetime import datetime, timezone +import pytest + from orca.services.nextflowtower import utils @@ -12,3 +14,45 @@ def test_that_launch_info_dedup_works(): secrets = ["foo", "bar", "baz", "foo"] dedupped = utils.dedup(secrets) assert len(dedupped) == 3 + + +def test_that_increment_suffix_works_with_unsuffixed_strings_with_underscore(): + assert utils.increment_suffix("foo_bar") == "foo_bar_2" + + +def test_that_increment_suffix_works_with_unsuffixed_strings(): + assert utils.increment_suffix("foo") == "foo_2" + + +def test_that_increment_suffix_works_with_underscore_suffixed_strings(): + assert utils.increment_suffix("foo_") == "foo_2" + + +def test_that_increment_suffix_works_with_1_suffixed_strings(): + assert utils.increment_suffix("foo_1") == "foo_2" + + +def test_that_increment_suffix_works_with_2_suffixed_strings(): + assert utils.increment_suffix("foo_2") == "foo_3" + + +def test_that_increment_suffix_works_with_99_suffixed_strings(): + assert utils.increment_suffix("foo_99") == "foo_100" + + +def test_that_get_nested_works_with_single_key(): + input = {"foo": {"bar": {"baz": 123}}} + result = utils.get_nested(input, "foo") + assert result == {"bar": {"baz": 123}} + + +def test_that_get_nested_works_with_many_keys(): + input = {"foo": {"bar": {"baz": 123}}} + result = utils.get_nested(input, "foo.bar.baz") + assert result == 123 + + +def test_for_an_error_when_using_nonexistent_keys_with_get_nested_works(): + input = {"foo": {"bar": {"baz": 123}}} + with pytest.raises(ValueError): + utils.get_nested(input, "foo.bar.atchoo") From e3d3a17b8fc888d88d9c2915a43b02659d3acfef Mon Sep 17 00:00:00 2001 From: Bruno Grande Date: Wed, 3 May 2023 21:13:25 -0700 Subject: [PATCH 4/5] Fix copy-paste typo with `projectName` --- src/orca/services/nextflowtower/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/orca/services/nextflowtower/models.py b/src/orca/services/nextflowtower/models.py index 3920519..0a21094 100644 --- a/src/orca/services/nextflowtower/models.py +++ b/src/orca/services/nextflowtower/models.py @@ -299,7 +299,7 @@ class Workflow(BaseTowerModel): run_name: str session_id: str username: str - projectName: str + project_name: str work_dir: str status: WorkflowStatus params: Optional[dict[str, Any]] From ccd3a5739be80c801d4737a6ab9bd39121055060 Mon Sep 17 00:00:00 2001 From: Bruno Grande Date: Wed, 3 May 2023 22:24:53 -0700 Subject: [PATCH 5/5] Fix `projectName` typo in ops class --- src/orca/services/nextflowtower/ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/orca/services/nextflowtower/ops.py b/src/orca/services/nextflowtower/ops.py index 1a5911a..fd2ab95 100644 --- a/src/orca/services/nextflowtower/ops.py +++ b/src/orca/services/nextflowtower/ops.py @@ -200,7 +200,7 @@ def list_previous_workflows(self, launch_info: LaunchInfo) -> list[Workflow]: previous_workflows = list() for workflow in workflows: - if workflow.projectName != launch_info.pipeline: + if workflow.project_name != launch_info.pipeline: continue # TODO: Rename `run_name` to `unique_id` (or similar)