From 5c018d220f001c786542bed2861a8c8097c966f2 Mon Sep 17 00:00:00 2001 From: Jenny Medina Date: Fri, 12 Jan 2024 10:18:38 -0500 Subject: [PATCH 1/3] Pointing to new nf-synapse/synstage workflow --- demo.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/demo.py b/demo.py index 0d50711..ea34bfb 100644 --- a/demo.py +++ b/demo.py @@ -44,9 +44,10 @@ def synstage_info(self, samplesheet_uri: str) -> LaunchInfo: run_name = self.get_run_name("synstage") return LaunchInfo( run_name=run_name, - pipeline="Sage-Bionetworks-Workflows/nf-synstage", + pipeline="Sage-Bionetworks-Workflows/nf-synapse", revision="main", profiles=["sage"], + entry_name="NF_SYNSTAGE", params={ "input": samplesheet_uri, }, @@ -130,7 +131,7 @@ def get_staged_samplesheet(self, samplesheet: str) -> str: if scheme != "s3": raise ValueError("Expected an S3 URI.") path = PurePosixPath(samplesheet_resource) - return f"{scheme}://{path.parent}/synstage/{path.name}" + return f"{scheme}://{path.parent}/{path.name}" def monitor_workflow(self, workflow_id): """Monitor any workflow run (wait until done).""" From 6c8dbe5796669a2b7eb7533936b46e2395a60e54 Mon Sep 17 00:00:00 2001 From: Jenny Medina Date: Wed, 17 Jan 2024 18:18:31 -0500 Subject: [PATCH 2/3] Fixing path to staged samplesheet Co-authored-by: Brad Macdonald --- demo.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/demo.py b/demo.py index ea34bfb..2b8dbd5 100644 --- a/demo.py +++ b/demo.py @@ -46,7 +46,7 @@ def synstage_info(self, samplesheet_uri: str) -> LaunchInfo: run_name=run_name, pipeline="Sage-Bionetworks-Workflows/nf-synapse", revision="main", - profiles=["sage"], + profiles=["docker"], entry_name="NF_SYNSTAGE", params={ "input": samplesheet_uri, @@ -125,13 +125,13 @@ class TowerRnaseqFlow(FlowSpec): help="S3 prefix for storing output files from different runs", ) - def get_staged_samplesheet(self, samplesheet: str) -> str: + def get_staged_samplesheet(self, samplesheet: str, run_name: str) -> str: """Generate staged samplesheet based on synstage behavior.""" scheme, _, samplesheet_resource = samplesheet.partition("://") if scheme != "s3": raise ValueError("Expected an S3 URI.") path = PurePosixPath(samplesheet_resource) - return f"{scheme}://{path.parent}/{path.name}" + return f"{scheme}://{path.parent}/synstage/{run_name}/{path.name}" def monitor_workflow(self, workflow_id): """Monitor any workflow run (wait until done).""" @@ -186,7 +186,7 @@ def monitor_synstage(self): @step def launch_rnaseq(self): """Launch nf-core/rnaseq workflow to process RNA-seq data.""" - staged_uri = self.get_staged_samplesheet(self.samplesheet_uri) + staged_uri = self.get_staged_samplesheet(self.samplesheet_uri, self.dataset.get_run_name("synstage")) launch_info = self.dataset.rnaseq_info(staged_uri, self.rnaseq_outdir) self.rnaseq_id = self.tower.launch_workflow(launch_info, "spot") self.next(self.monitor_rnaseq) From 051711827b36e187f694e2b2e68a383d910c3b7b Mon Sep 17 00:00:00 2001 From: Jenny Medina Date: Wed, 17 Jan 2024 19:13:15 -0500 Subject: [PATCH 3/3] Documentation update for demo.py; updating example. --- README.md | 13 ++++++++++--- demo.py | 10 ++++++---- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 37c28e6..d82a77d 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ This Python package provides the components to connect various third-party servi ## Demonstration Script -This repository includes a demonstration script called [`demo.py`](demo.py), which showcases how you can use `py-orca` to launch and monitor your workflows on Nextflow Tower. Specifically, it illustrates how to process an RNA-seq dataset using a series of workflow runs, namely `nf-synstage`, `nf-core/rnaseq`, and `nf-synindex`. `py-orca` can be used with any Python-compatible workflow management system to orchestrate each step (_e.g._ Airflow, Prefect, Dagster). The demonstration script uses [Metaflow](https://metaflow.org/) because it's easy to run locally and has an intuitive syntax. +This repository includes a demonstration script called [`demo.py`](demo.py), which showcases how you can use `py-orca` to launch and monitor your workflows on Nextflow Tower. Specifically, it illustrates how to process an RNA-seq dataset using a series of workflow runs, namely `nf-synapse/synstage`, `nf-core/rnaseq`, and `nf-synindex`. `py-orca` can be used with any Python-compatible workflow management system to orchestrate each step (_e.g._ Airflow, Prefect, Dagster). The demonstration script uses [Metaflow](https://metaflow.org/) because it's easy to run locally and has an intuitive syntax. The script assumes that the following environment variables are set. Before setting them up, ensure that you have an AWS profile configured for a role that has access to the dev/ops tower workspace you plan to launch your workflows from. You can set these environment variables using whatever method you prefer (_e.g._ using an `.env` file, sourcing a shell script, etc). Refer to [`.env.example`](.env.example) for the format of their values as well as examples. @@ -22,7 +22,7 @@ Refer to [`.env.example`](.env.example) for the format of their values as well a - `SYNAPSE_CONNECTION_URI` - `AWS_PROFILE` (or another source of AWS credentials) -Once your environment is set, you can create a virtual environment, install the Python dependencies, and run the demonstration script (after downloading it) as follows. Note that you will need to update the `s3_prefix` parameter so that it points to an S3 bucket that is accessible to your Tower workspace. +Once your environment variables are set, you can create a virtual environment, install the Python dependencies, and run the demonstration script (after downloading it) as follows. Note that you will need to update the `s3_prefix` parameter so that it points to an S3 bucket that is accessible to your Tower workspace. ### Creating and setting up your py-`orca` virtual environment and executing `demo.py` @@ -34,11 +34,18 @@ source venv/bin/activate # Install Python dependencies python3 -m pip install 'py-orca[all]' 'metaflow' 'pyyaml' 's3fs' +``` +Before running the example below, ensure that the `s3_prefix` points to an S3 bucket your Nextflow `dev` +or `prod` tower workspace has access to. In the example below, we will point to the `example-dev-project-tower-scratch` S3 bucket because we will be launching our workflows within the +`example-dev-project` workspace in `tower-dev`. +```bash # Run the script using an example dataset -python3 demo.py run --dataset_id 'syn51514585' --s3_prefix 's3://orca-service-test-project-tower-bucket/outputs' +python3 demo.py run --dataset_id 'syn51514585' --s3_prefix 's3://example-dev-project-tower-scratch/work' ``` +Once your run takes off, you can follow the output logs in your terminal, or stay updated with your workflow progress on the web client. Be sure that your `synstage` workflow run has a unique name, and is not an iteration of a previous run (i.e. `my_test_dataset_synstage_2`, `my_test_dataset_synstage_3`, and so on). This is because the `demo.py` script does not currently support being able to locate the staged samplesheet file if it has been staged under a run name that is non-unique. + The above dataset ID ([`syn51514585`](https://www.synapse.org/#!Synapse:syn51514585)) refers to the following YAML file, which should be accessible to Sage employees. Similarly, the samplesheet ID below ([`syn51514475`](https://www.synapse.org/#!Synapse:syn51514475)) should also be accessible to Sage employees. However, there is no secure way to make the output folder accessible to Sage employees, so the `synindex` step will fail if you attempt to run this script using the example dataset ID. This should be sufficient to get a feel for using `py-orca`, but feel free to create your own dataset YAML file on Synapse with an output folder that you own. ```yaml diff --git a/demo.py b/demo.py index 2b8dbd5..cb9b7b5 100644 --- a/demo.py +++ b/demo.py @@ -40,7 +40,7 @@ def get_run_name(self, suffix: str) -> str: return f"{self.id}_{suffix}" def synstage_info(self, samplesheet_uri: str) -> LaunchInfo: - """Generate LaunchInfo for nf-synstage.""" + """Generate LaunchInfo for nf-synapse/synstage.""" run_name = self.get_run_name("synstage") return LaunchInfo( run_name=run_name, @@ -172,21 +172,23 @@ def transfer_samplesheet_to_s3(self): @step def launch_synstage(self): - """Launch nf-synstage to stage Synapse files in samplesheet.""" + """Launch nf-synapse/synstage to stage Synapse files in samplesheet.""" launch_info = self.dataset.synstage_info(self.samplesheet_uri) self.synstage_id = self.tower.launch_workflow(launch_info, "spot") self.next(self.monitor_synstage) @step def monitor_synstage(self): - """Monitor nf-synstage workflow run (wait until done).""" + """Monitor nf-synapse/synstage workflow run (wait until done).""" self.monitor_workflow(self.synstage_id) self.next(self.launch_rnaseq) @step def launch_rnaseq(self): """Launch nf-core/rnaseq workflow to process RNA-seq data.""" - staged_uri = self.get_staged_samplesheet(self.samplesheet_uri, self.dataset.get_run_name("synstage")) + staged_uri = self.get_staged_samplesheet( + self.samplesheet_uri, self.dataset.get_run_name("synstage") + ) launch_info = self.dataset.rnaseq_info(staged_uri, self.rnaseq_outdir) self.rnaseq_id = self.tower.launch_workflow(launch_info, "spot") self.next(self.monitor_rnaseq)