Skip to content

Commit

Permalink
Merge pull request #18 from Sage-Bionetworks-Workflows/bgrande/orca-2…
Browse files Browse the repository at this point in the history
…10/resume-workflows

[ORCA-210] Add support for resuming workflows in NextflowTowerOps
  • Loading branch information
Bruno Grande authored May 4, 2023
2 parents f1a4b33 + ccd3a57 commit 57de998
Show file tree
Hide file tree
Showing 12 changed files with 875 additions and 176 deletions.
54 changes: 43 additions & 11 deletions src/orca/services/nextflowtower/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ def request_json(self, method: str, path: str, **kwargs) -> dict[str, Any]:
A dictionary from deserializing the JSON response.
"""
response = self.request(method, path, **kwargs)
response.raise_for_status()
try:
response.raise_for_status()
except HTTPError as e:
# Add extra context if possible
raise HTTPError(response.text) from e
return response.json()

def request_paged(self, method: str, path: str, **kwargs) -> dict[str, Any]:
Expand Down Expand Up @@ -318,22 +322,50 @@ def launch_workflow(
"""
path = "/workflow/launch"
params = self.generate_params(workspace_id)
payload = launch_info.to_dict()
payload = launch_info.to_json()
json = self.post(path, params=params, json=payload)
return self.unwrap(json, "workflowId")

def get_workflow(self, workspace_id: int, workflow_id: str) -> dict:
"""Gets available information about a workflow run
def get_workflow(
self,
workflow_id: str,
workspace_id: Optional[int] = None,
) -> models.Workflow:
"""Get information about a workflow run.
Attributes:
workspace_id (int): The ID number of the workspace the workflow
exists within.
workflow_id (str): The ID number for a workflow run to get
information about.
workflow_id: The ID number for a workflow run to get
information about.
workspace_id: The ID number of the workspace the workflow
exists within. Defaults to None.
Returns:
response (dict): Dictionary containing information about the workflow run
Workflow instance.
"""
path = f"/workflow/{workflow_id}"
json = self.get(path=path, params={"workspaceId": workspace_id})
return json
params = self.generate_params(workspace_id)
json = self.get(path=path, params=params)
unwrapped = self.unwrap(json, "workflow")
return models.Workflow.from_json(unwrapped)

def list_workflows(
self,
search_filter: Optional[str] = None,
workspace_id: Optional[int] = None,
) -> list[models.Workflow]:
"""List available workflows that match search filter.
Attributes:
search_filter: A Nextflow Tower search query, as you would
compose it in the runs search bar. Defaults to None.
workspace_id: The ID number of the workspace the workflow
exists within. Defaults to None.
Returns:
List of workflow instances.
"""
path = "/workflow"
params = self.generate_params(workspace_id, search=search_filter)
json = self.get(path=path, params=params)
items = self.unwrap(json, "workflows")
return [models.Workflow.from_json(item["workflow"]) for item in items]
Loading

0 comments on commit 57de998

Please sign in to comment.