Skip to content

Commit

Permalink
Merge pull request #19 from Sage-Bionetworks-Workflows/bgrande/ORCA-2…
Browse files Browse the repository at this point in the history
…17/tower-demo

[ORCA-217] Improve usability of Nextflow Tower service
  • Loading branch information
Bruno Grande authored May 4, 2023
2 parents 57de998 + 89455a2 commit 81009c2
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 13 deletions.
14 changes: 11 additions & 3 deletions src/orca/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@

import logging


# Set default logging handler to avoid "No handler found" warnings
logging.getLogger(__name__).addHandler(logging.NullHandler())
# Capture warnings made with the warnings standard module
logging.captureWarnings(True)

# Configure a stream handler
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)

# Configure a module logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(handler)
2 changes: 2 additions & 0 deletions src/orca/services/base/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class BaseOps(Generic[ConfigClass, ClientClass]):
3) Provide values to all class variables (defined below).
4) Provide implementations for all abstract methods.
5) Update the type hints for attributes and class variables.
6) Update the config attribute to have a default factory set to
the config class using the `dataclasses.field()` function.
Attributes:
config: A configuration object for this service.
Expand Down
4 changes: 2 additions & 2 deletions src/orca/services/nextflowtower/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from orca.services.nextflowtower.utils import dedup, get_nested


class WorkflowStatus(Enum):
class WorkflowStatus(str, Enum):
"""Valid values for the status of a Tower workflow.
Attributes:
Expand Down Expand Up @@ -317,4 +317,4 @@ class Workflow(BaseTowerModel):
@property
def is_done(self) -> bool:
"""Whether the workflow is done running."""
return self.status.value in WorkflowStatus.terminal_states.value
return self.status in WorkflowStatus.terminal_states
38 changes: 31 additions & 7 deletions src/orca/services/nextflowtower/ops.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging
from dataclasses import field
from functools import cached_property
from typing import ClassVar, Optional

Expand All @@ -11,6 +13,8 @@
from orca.services.nextflowtower.models import LaunchInfo, Workflow, WorkflowStatus
from orca.services.nextflowtower.utils import increment_suffix

logger = logging.getLogger(__name__)


@dataclass(kw_only=False)
class NextflowTowerOps(BaseOps):
Expand All @@ -23,7 +27,7 @@ class NextflowTowerOps(BaseOps):
client_factory_class: The class for constructing clients.
"""

config: NextflowTowerConfig
config: NextflowTowerConfig = field(default_factory=NextflowTowerConfig)

client_factory_class = NextflowTowerClientFactory

Expand Down Expand Up @@ -130,13 +134,19 @@ def launch_workflow(
if not ignore_previous_runs:
latest_run = self.get_latest_previous_workflow(launch_info)
if latest_run:
status = latest_run.status.value
run_repr = f"{latest_run.run_name} (id='{latest_run.id}', {status=})"
# Return ID for latest run if ongoing, succeeded, or cancelled
skip_statuses = {"SUCCEEDED", "CANCELLED"}
if not latest_run.is_done or latest_run.status.value in skip_statuses:
if not latest_run.is_done: # pragma: no cover
logger.info(f"Found an ongoing previous run: {run_repr}")
return latest_run.id
if status in {"SUCCEEDED", "UNKNOWN"}:
logger.info(f"Found a previous run: {run_repr}")
return latest_run.id
launch_info.fill_in("resume", True)
launch_info.fill_in("session_id", latest_run.session_id)
launch_info.run_name = increment_suffix(launch_info.run_name)
launch_info.run_name = increment_suffix(latest_run.run_name)
logger.info(f"Relaunching from a previous run: {run_repr}")

# Get relevant compute environment and its resource tags
compute_env_id = self.get_latest_compute_env(compute_env_filter)
Expand All @@ -154,7 +164,21 @@ def launch_workflow(
launch_info.fill_in("pre_run_script", compute_env.pre_run_script)
launch_info.add_in("label_ids", label_ids)

return self.client.launch_workflow(launch_info, self.workspace_id)
workflow_id = self.client.launch_workflow(launch_info, self.workspace_id)
workflow_repr = f"{launch_info.run_name} ({workflow_id})"
logger.info(f"Launched a new workflow run: {workflow_repr}")
return workflow_id

def get_workflow(self, workflow_id: str) -> Workflow:
"""Retrieve details about a workflow run.
Args:
workflow_id: Workflow run ID.
Returns:
Workflow instance.
"""
return self.client.get_workflow(workflow_id, self.workspace_id)

# TODO: Consider switching return value to a namedtuple
def get_workflow_status(self, workflow_id: str) -> tuple[WorkflowStatus, bool]:
Expand All @@ -166,8 +190,8 @@ def get_workflow_status(self, workflow_id: str) -> tuple[WorkflowStatus, bool]:
Returns:
Workflow status and whether the workflow is done.
"""
workflow = self.client.get_workflow(workflow_id, self.workspace_id)
is_done = workflow.status.value in WorkflowStatus.terminal_states.value
workflow = self.get_workflow(workflow_id)
is_done = workflow.status in WorkflowStatus.terminal_states
return workflow.status, is_done

def list_workflows(self, search_filter: str = "") -> list[Workflow]:
Expand Down
3 changes: 2 additions & 1 deletion src/orca/services/sevenbridges/ops.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from dataclasses import field
from functools import cached_property
from typing import Any, Optional, cast

Expand All @@ -23,7 +24,7 @@ class SevenBridgesOps(BaseOps):
client_factory_class: The class for constructing clients.
"""

config: SevenBridgesConfig
config: SevenBridgesConfig = field(default_factory=SevenBridgesConfig)

client_factory_class = SevenBridgesClientFactory

Expand Down
14 changes: 14 additions & 0 deletions tests/services/base/test_ops.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from dataclasses import fields
from typing import get_type_hints

from orca.services.base import BaseClientFactory, BaseConfig


Expand All @@ -6,6 +9,17 @@ def test_that_config_is_set(ops):
assert isinstance(ops.config, BaseConfig)


def test_that_config_has_a_default_factory(ops):
config_field = [f for f in fields(ops) if f.name == "config"][0]
assert getattr(config_field, "default_factory", None) is not None


def test_that_config_has_a_matching_default_factory(ops):
config_field = [f for f in fields(ops) if f.name == "config"][0]
config_type = get_type_hints(ops.__class__)["config"]
assert config_field.default_factory == config_type


def test_that_client_factory_class_is_set(service):
ops_class = service["ops"]
assert hasattr(ops_class, "client_factory_class")
Expand Down

0 comments on commit 81009c2

Please sign in to comment.