Skip to content

Commit

Permalink
Merge pull request #15 from Sage-Bionetworks-Workflows/bgrande/ORCA-1…
Browse files Browse the repository at this point in the history
…63/tower-launch-workflow

[ORCA-163] Implement `launch_workflow()` and `LaunchInfo` dataclass
  • Loading branch information
Bruno Grande authored May 1, 2023
2 parents db0785f + 1069c75 commit 74f9c5c
Show file tree
Hide file tree
Showing 16 changed files with 1,226 additions and 382 deletions.
532 changes: 274 additions & 258 deletions Pipfile.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ dev =
jupyterlab~=3.6
vulture~=2.7
autopep8~=2.0
typing-extensions~=4.5

[options.entry_points]
# Add here console scripts like:
Expand All @@ -118,7 +119,7 @@ apache_airflow_provider =
# Comment those flags to avoid this pytest issue.
addopts =
--cov "orca" --cov-report "term-missing" --cov-report "xml"
-m "not slow and not integration and not acceptance"
-m "not slow and not integration and not acceptance and not cost"
--verbose
norecursedirs =
dist
Expand All @@ -135,6 +136,7 @@ markers =
slow: mark tests as slow (deselect with '-m "not slow"')
integration: mark tests that interact with external services
acceptance: mark end-to-end acceptance tests
cost: mark tests that have costs associated with them

[devpi:upload]
# Options for the devpi: PyPI server and packaging tool
Expand Down
277 changes: 207 additions & 70 deletions src/orca/services/nextflowtower/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from typing import Any
from typing import Any, Optional

import requests
from pydantic.dataclasses import dataclass
from requests.exceptions import HTTPError

from orca.services.nextflowtower import models


# TODO: Consider creating a `client` submodule folder to organize methods
@dataclass(kw_only=False)
class NextflowTowerClient:
"""Simple Python client for making requests to Nextflow Tower.
Expand Down Expand Up @@ -85,105 +86,241 @@ def request_json(self, method: str, path: str, **kwargs) -> dict[str, Any]:
response.raise_for_status()
return response.json()

# def request_paged(self, method: str, path: str, **kwargs) -> list[dict[str, Any]]:
# """Iterate through pages of results for a given request.

# See ``TowerClient.request`` for argument definitions.
def request_paged(self, method: str, path: str, **kwargs) -> dict[str, Any]:
"""Iterate through pages of results for a given request.
# Raises:
# HTTPError: If the response doesn't match the expectation
# for a paged endpoint.

# Returns:
# The cumulative list of items from all pages.
# """
# self.update_kwarg(kwargs, "params", "max", 50)
# self.update_kwarg(kwargs, "params", "offset", 0)
See ``TowerClient.request`` for argument definitions.
# num_items = 0
# total_size = 1 # Artificial value for initiating the while-loop
Raises:
HTTPError: If the response doesn't match the expectation
for a paged endpoint.
# all_items = list()
# while num_items < total_size:
# kwargs["params"]["offset"] = num_items
# json = self.request_json(method, path, **kwargs)
Returns:
The cumulative list of items from all pages.
"""
# Ensure defaults for pagination query parameters
self.update_kwarg(kwargs, "params", "max", 50)
self.update_kwarg(kwargs, "params", "offset", 0)

num_items = 0
all_items = list()
key_name = "items" # Setting a default value
total_size = float("inf") # Artificial value for initiating the while-loop
while num_items < total_size:
kwargs["params"]["offset"] = num_items
json = self.request_json(method, path, **kwargs)
total_size = json.pop("totalSize")
key_name, items = json.popitem()
num_items += len(items)
all_items.extend(items)

if len(all_items) != total_size:
message = f"Expected {total_size} items, but got: {all_items}"
raise HTTPError(message)

# if "totalSize" not in json:
# message = f"'totalSize' not in response JSON ({json}) as expected."
# raise HTTPError(message)
# total_size = json.pop("totalSize")
json = {"totalSize": total_size, key_name: all_items}
return json

# if len(json) != 1:
# message = f"Expected one other key aside from 'totalSize' ({json})."
# raise HTTPError(message)
# _, items = json.popitem()
def get(self, path: str, **kwargs) -> dict[str, Any]:
"""Send an auth'ed GET request and parse the JSON response.
# num_items += len(items)
# all_items.extend(items)
See ``TowerClient.request`` for argument definitions.
# return all_items
Returns:
A dictionary from deserializing the JSON response.
"""
json = self.request_json("GET", path, **kwargs)
if "totalSize" in json:
json = self.request_paged("GET", path, **kwargs)
return json

def get(self, path: str, **kwargs) -> dict[str, Any]:
"""Send an auth'ed GET request and parse the JSON response.
def post(self, path: str, **kwargs) -> dict[str, Any]:
"""Send an auth'ed POST request and parse the JSON response.
See ``TowerClient.request`` for argument definitions.
Returns:
A dictionary from deserializing the JSON response.
"""
return self.request_json("GET", path, **kwargs)
return self.request_json("POST", path, **kwargs)

def get_user_info(self) -> dict[str, Any]:
"""Describe current user.
def unwrap(self, json: dict[str, Any], key: str) -> Any:
"""Unwrap nested key in JSON response.
Raises:
HTTPError: If the response lacks the expected keys.
Args:
json: Raw JSON response.
key: Top-level key.
Returns:
Current user.
Unnested JSON response.
"""
path = "/user-info"
key = "user"
response = self.get(path)
if key not in response:
message = f"Expecting '{key}' key in response ({response})."
if key not in json:
message = f"Expecting '{key}' key in JSON response ({json})."
raise HTTPError(message)
return response[key]
return json[key]

def get_user_workspaces_and_orgs(self, user_id: int) -> list[dict[str, Any]]:
"""List the workspaces and organizations of a given user.
def get_user_info(self) -> models.User:
"""Describe current user.
Raises:
HTTPError: If the response lacks the expected keys.
Returns:
Current user.
"""
path = "/user-info"
json = self.get(path)
unwrapped = self.unwrap(json, "user")
return models.User.from_json(unwrapped)

def list_user_workspaces_and_orgs(
self,
user_id: int,
) -> list[models.Workspace | models.Organization]:
"""List the workspaces and organizations of a given user.
Returns:
Workspaces and organizations.
List of workspaces and organizations.
"""
path = f"/user/{user_id}/workspaces"
key = "orgsAndWorkspaces"
response = self.get(path)
if key not in response:
message = f"Expecting '{key}' key in response ({response})."
raise HTTPError(message)
return response[key]

# TODO: Should this higher-level method exist here or in Ops?
def list_user_workspaces(self) -> list[dict[str, Any]]:
json = self.get(path)
items = self.unwrap(json, "orgsAndWorkspaces")
objects: list[models.Organization | models.Workspace] = list()
for item in items:
if item["workspaceId"]:
workspace = models.Workspace.from_json(item)
objects.append(workspace)
else:
org = models.Organization.from_json(item)
objects.append(org)
return objects

def list_user_workspaces(self) -> list[models.Workspace]:
"""List the workspaces that are available to the current user.
Returns:
List of user workspaces.
"""
user = self.get_user_info()
orgs_and_workspaces = self.get_user_workspaces_and_orgs(user["id"])
items = self.list_user_workspaces_and_orgs(user.id)
return [item for item in items if isinstance(item, models.Workspace)]

workspaces = list()
for workspace in orgs_and_workspaces:
# Response includes organizations, which don't have workspace IDs
if workspace["workspaceId"] is None:
continue
workspaces.append(workspace)
return workspaces
def generate_params(
self,
workspace_id: Optional[int],
**kwargs,
) -> dict[str, Any]:
"""Generate URL query parameters.
Args:
workspace_id: Tower workspace ID.
**kwargs: Additional query parameters that are included
if they are not set to None.
Returns:
URL query parameters based on input.
"""
params = {}
if workspace_id:
params["workspaceId"] = int(workspace_id)
for name, value in kwargs.items():
if value is not None:
params[name] = value
return params

def get_compute_env(
self,
compute_env_id: str,
workspace_id: Optional[int] = None,
) -> models.ComputeEnv:
"""Retrieve information about a given compute environment.
Args:
compute_env_id: Compute environment ID.
workspace_id: Tower workspace ID.
Returns:
Compute environment instance.
"""
path = f"/compute-envs/{compute_env_id}"
params = self.generate_params(workspace_id, attributes="labels")
json = self.get(path, params=params)
unwrapped = self.unwrap(json, "computeEnv")
return models.ComputeEnv.from_json(unwrapped)

def list_compute_envs(
self,
workspace_id: Optional[int] = None,
status: Optional[str] = None,
) -> list[models.ComputeEnvSummary]:
"""List all compute environments.
Args:
workspace_id: Tower workspace ID. Defaults to None.
status: Compute environment status to filter on.
Defaults to None.
Returns:
List of compute environments.
"""
path = "/compute-envs"
params = self.generate_params(workspace_id, status=status)
json = self.get(path, params=params)
items = self.unwrap(json, "computeEnvs")
return [models.ComputeEnvSummary.from_json(item) for item in items]

def create_label(
self,
name: str,
workspace_id: Optional[int] = None,
) -> models.Label:
"""Create a workflow label.
Args:
name: Label name.
workspace_id: Tower workspace ID. Defaults to None.
Returns:
Label instance.
"""
path = "/labels"
params = self.generate_params(workspace_id)
payload = {"name": name, "resource": False}
json = self.post(path, params=params, json=payload)
return models.Label.from_json(json)

def list_labels(self, workspace_id: Optional[int] = None) -> list[models.Label]:
"""List all available labels.
Args:
workspace_id: Tower workspace ID. Defaults to None.
Returns:
List of available labels.
"""
path = "/labels"
params = self.generate_params(workspace_id)
json = self.get(path, params=params)
items = self.unwrap(json, "labels")
return [models.Label.from_json(item) for item in items]

def launch_workflow(
self,
launch_info: models.LaunchInfo,
workspace_id: Optional[int] = None,
) -> str:
"""Launch a workflow in the target workspace.
Args:
launch_info: Description of which workflow to
launch and how, including input parameters.
workspace_id: Tower workspace ID.
Returns:
Workflow run ID.
"""
path = "/workflow/launch"
params = self.generate_params(workspace_id)
payload = launch_info.to_dict()
json = self.post(path, params=params, json=payload)
return self.unwrap(json, "workflowId")

def get_workflow(self, workspace_id: int, workflow_id: str) -> dict:
"""Gets available information about a workflow run
Expand All @@ -198,5 +335,5 @@ def get_workflow(self, workspace_id: int, workflow_id: str) -> dict:
response (dict): Dictionary containing information about the workflow run
"""
path = f"/workflow/{workflow_id}"
response = self.get(path=path, params={"workspaceId": workspace_id})
return response
json = self.get(path=path, params={"workspaceId": workspace_id})
return json
Loading

0 comments on commit 74f9c5c

Please sign in to comment.