From b56198ddababee4e44f593a427ce079a25890d9e Mon Sep 17 00:00:00 2001 From: Johannes Koester Date: Mon, 11 Sep 2023 20:50:46 +0200 Subject: [PATCH 1/9] fix: fix syntax error --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index cad05c6..d3a1f5b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "snakemake-executor-plugin-tibanna" version = "0.1.0" description = "A Snakemake executor plugin for Amazon AWS workflow exection with tibanna." authors = [ - "Soohyun Lee <@SooLee>" + "Soohyun Lee <@SooLee>", "Johannes Koester " ] readme = "README.md" From fb62974e870d28150646a578f6d14a55df4fcf3b Mon Sep 17 00:00:00 2001 From: Johannes Koester Date: Mon, 11 Sep 2023 20:51:50 +0200 Subject: [PATCH 2/9] minor --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d3a1f5b..8ca92bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "snakemake-executor-plugin-tibanna" version = "0.1.0" description = "A Snakemake executor plugin for Amazon AWS workflow exection with tibanna." authors = [ - "Soohyun Lee <@SooLee>", + "Soohyun Lee <@SooLee>", # TODO please add Soo Lee's email address "Johannes Koester " ] readme = "README.md" From 08501493cbd138794ffcd8567311f4e3cb03c09d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Mon, 11 Sep 2023 20:57:18 +0200 Subject: [PATCH 3/9] chore: setup AWS for testing --- .github/workflows/ci.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fd73756..d15eeb9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,12 +73,11 @@ jobs: run: | poetry install - - name: Setup AWS + - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v1 - if: env.AWS_AVAILABLE with: - aws-access-key-id: "${{ secrets.AWS_ACCESS_KEY_ID }}" - aws-secret-access-key: "${{ secrets.AWS_SECRET_ACCESS_KEY }}" + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Run pytest From 948c4faba881f12903d5924a83baf0591e005611 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Mon, 11 Sep 2023 20:59:18 +0200 Subject: [PATCH 4/9] fix: add tibanna dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 8ca92bf..fd0f33a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ readme = "README.md" python = "^3.9" snakemake-interface-common = "^1.3.3" snakemake-interface-executor-plugins = "^3.0.0" +tibanna = "^3.3.1" [tool.poetry.group.dev.dependencies] From b663bf1d9e738fec6ee91b0d91eaa53311ec6f6f Mon Sep 17 00:00:00 2001 From: Johannes Koester Date: Wed, 20 Sep 2023 11:41:54 +0200 Subject: [PATCH 5/9] fix: adapt to changes in snakemake-interface-executor-plugins --- pyproject.toml | 4 ++-- snakemake_executor_plugin_tibanna/__init__.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index fd0f33a..a9192de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,8 +10,8 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.9" -snakemake-interface-common = "^1.3.3" -snakemake-interface-executor-plugins = "^3.0.0" +snakemake-interface-common = "^1.4.1" +snakemake-interface-executor-plugins = "^4.0.0" tibanna = "^3.3.1" diff --git a/snakemake_executor_plugin_tibanna/__init__.py b/snakemake_executor_plugin_tibanna/__init__.py index 233daf8..9e980fb 100644 --- a/snakemake_executor_plugin_tibanna/__init__.py +++ b/snakemake_executor_plugin_tibanna/__init__.py @@ -14,7 +14,7 @@ from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface from snakemake_interface_executor_plugins.jobs import ( - ExecutorJobInterface, + JobExecutorInterface, ) from snakemake_interface_common.exceptions import WorkflowError # noqa @@ -146,7 +146,7 @@ def __init__( self.container_image = self.workflow.remote_execution_settings.container_image - def run_job(self, job: ExecutorJobInterface): + def run_job(self, job: JobExecutorInterface): # Implement here how to run a job. # You can access the job's resources, etc. # via the job object. @@ -258,7 +258,7 @@ def remove_prefix(self, s): def get_snakefile(self): return os.path.basename(self.snakefile) - def add_command(self, job: ExecutorJobInterface, tibanna_args, tibanna_config): + def add_command(self, job: JobExecutorInterface, tibanna_args, tibanna_config): # format command command = self.format_job_exec(job) @@ -267,7 +267,7 @@ def add_command(self, job: ExecutorJobInterface, tibanna_args, tibanna_config): self.logger.debug("command = " + str(command)) tibanna_args.command = command - def add_workflow_files(self, job: ExecutorJobInterface, tibanna_args): + def add_workflow_files(self, job: JobExecutorInterface, tibanna_args): snakefile_fname, snakemake_dir = self.split_filename(self.snakefile) snakemake_child_fnames = [] for src in self.workflow_sources: @@ -295,7 +295,7 @@ def adjust_filepath(self, f): rel = f return rel - def make_tibanna_input(self, job: ExecutorJobInterface): + def make_tibanna_input(self, job: JobExecutorInterface): # input & output # Local snakemake command here must be run with --default-remote-prefix # and --default-remote-provider (forced) but on VM these options will be From 4461737fa4140a2df5286b0e492ab439c0357536 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Wed, 20 Sep 2023 21:43:44 +0200 Subject: [PATCH 6/9] update to latest tibanna --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a9192de..3e96a18 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ readme = "README.md" python = "^3.9" snakemake-interface-common = "^1.4.1" snakemake-interface-executor-plugins = "^4.0.0" -tibanna = "^3.3.1" +tibanna = "^4.0.0" [tool.poetry.group.dev.dependencies] From 6bb58251a6b8a55493cde1675c7fc6e8a4a826fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Wed, 20 Sep 2023 21:44:58 +0200 Subject: [PATCH 7/9] Update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 3e96a18..4cfed4a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ black = "^23.9.1" flake8 = "^6.1.0" coverage = "^7.3.1" pytest = "^7.4.2" -snakemake = {git = "https://github.com/snakemake/snakemake.git", branch = "feat/api-rewrite"} +snakemake = {git = "https://github.com/snakemake/snakemake.git"} [build-system] requires = ["poetry-core"] From 5ee85e31e05bd19d2340838617bf9f0a0a4b97ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20K=C3=B6ster?= Date: Wed, 20 Sep 2023 21:49:46 +0200 Subject: [PATCH 8/9] Update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4cfed4a..13c9e2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,7 +9,7 @@ authors = [ readme = "README.md" [tool.poetry.dependencies] -python = "^3.9" +python = ">=3.9,<3.11" snakemake-interface-common = "^1.4.1" snakemake-interface-executor-plugins = "^4.0.0" tibanna = "^4.0.0" From 4d94daaf3e3a363db9eb0a05696a8a4bb291081a Mon Sep 17 00:00:00 2001 From: Johannes Koester Date: Mon, 13 Nov 2023 10:46:11 +0100 Subject: [PATCH 9/9] adapt to changes in Snakemake 8 main branch, remove old code previously needed for handling input and output as well as workflow sources: all of this now happens automatically within Snakemake. --- .github/workflows/ci.yml | 24 ++- .github/workflows/release-please.yml | 7 +- pyproject.toml | 10 +- snakemake_executor_plugin_tibanna/__init__.py | 141 ++---------------- tests/tests.py | 23 +-- 5 files changed, 52 insertions(+), 153 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d15eeb9..7397d38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,15 +13,19 @@ jobs: - name: Check out the code uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry @@ -36,15 +40,19 @@ jobs: - name: Check out the code uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry @@ -58,15 +66,19 @@ jobs: steps: - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install dependencies diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 47853e9..4ac1504 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -26,22 +26,21 @@ jobs: - uses: actions/setup-python@v2 with: - python-version: "3.9" + python-version: "3.11" - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry run: | - pip install connection-pool # because it is incompatible with poetry poetry install - name: Publish to PyPi diff --git a/pyproject.toml b/pyproject.toml index 13c9e2b..f6e655c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,9 +9,9 @@ authors = [ readme = "README.md" [tool.poetry.dependencies] -python = ">=3.9,<3.11" -snakemake-interface-common = "^1.4.1" -snakemake-interface-executor-plugins = "^4.0.0" +python = "^3.11" +snakemake-interface-common = "^1.14.1" +snakemake-interface-executor-plugins = "^7.0.3" tibanna = "^4.0.0" @@ -21,6 +21,10 @@ flake8 = "^6.1.0" coverage = "^7.3.1" pytest = "^7.4.2" snakemake = {git = "https://github.com/snakemake/snakemake.git"} +snakemake-storage-plugin-s3 = "^0.2.4" + +[tool.coverage.run] +omit = [".*", "*/site-packages/*", "Snakefile"] [build-system] requires = ["poetry-core"] diff --git a/snakemake_executor_plugin_tibanna/__init__.py b/snakemake_executor_plugin_tibanna/__init__.py index 9e980fb..5f188bd 100644 --- a/snakemake_executor_plugin_tibanna/__init__.py +++ b/snakemake_executor_plugin_tibanna/__init__.py @@ -7,7 +7,7 @@ import math import os import re -from typing import List, Generator, Optional +from typing import AsyncGenerator, List, Generator, Optional from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor from snakemake_interface_executor_plugins import ExecutorSettingsBase, CommonSettings @@ -42,16 +42,6 @@ class ExecutorSettings(ExecutorSettingsBase): "required": True, }, ) - precommand: Optional[str] = field( - default=None, - metadata={ - "precommand": "Any command to execute before snakemake command on AWS " - "cloud such as wget, git clone, unzip, etc. This is used with --tibanna. " - "Do not include input/output download/upload commands - file transfer " - "between S3 bucket and the run environment (container) is automatically " - "handled by Tibanna." - }, - ) spot_instance: bool = field( default=False, metadata={ @@ -88,61 +78,26 @@ class ExecutorSettings(ExecutorSettingsBase): # filesystem (True) or not (False). # This is e.g. the case for cloud execution. implies_no_shared_fs=True, + pass_default_storage_provider_args=True, + pass_default_resources_args=True, + pass_envvar_declarations_to_cmd=False, + auto_deploy_default_storage_provider=True, ) # Required: # Implementation of your executor class Executor(RemoteExecutor): - def __init__( - self, - workflow: WorkflowExecutorInterface, - logger: LoggerExecutorInterface, - ): - super().__init__( - workflow, - logger, - # configure behavior of RemoteExecutor below - # whether arguments for setting the remote provider shall be passed to jobs - pass_default_remote_provider_args=False, - # whether arguments for setting default resources shall be passed to jobs - pass_default_resources_args=False, - # whether environment variables shall be passed to jobs - pass_envvar_declarations_to_cmd=False, - # specify initial amount of seconds to sleep before checking for job status - init_sleep_seconds=0, - ) - - self.workflow_sources = [] - for wfs in self.workflow.dag.get_sources(): - if os.path.isdir(wfs): - for dirpath, dirnames, filenames in os.walk(wfs): - self.workflow_sources.extend( - [os.path.join(dirpath, f) for f in filenames] - ) - else: - self.workflow_sources.append(os.path.abspath(wfs)) - - log = "sources=" - for f in self.workflow_sources: - log += f - self.snakefile = workflow.main_snakefile - self.envvars = {e: os.environ[e] for e in workflow.envvars} + def __post_init__(self): + self.snakefile = self.workflow.main_snakefile + self.envvars = {e: os.environ[e] for e in self.workflow.envvars} if self.envvars: - logger.debug("envvars = %s" % str(self.envvars)) + self.logger.debug("envvars = %s" % str(self.envvars)) self.tibanna_sfn = self.workflow.executor_settings.sfn self.precommand = self.workflow.executor_settings.precommand or "" - # TODO this does not work if the default remote is something else than S3 - self.s3_bucket = workflow.storage_settings.default_remote_prefix.split("/")[0] - self.s3_subdir = re.sub( - f"^{self.s3_bucket}/", "", workflow.storage_settings.default_remote_prefix - ) - logger.debug("precommand= " + self.precommand) - logger.debug("bucket=" + self.s3_bucket) - logger.debug("subdir=" + self.s3_subdir) - self.quiet = workflow.output_settings.quiet + self.quiet = self.workflow.output_settings.quiet self.container_image = self.workflow.remote_execution_settings.container_image @@ -181,7 +136,7 @@ def run_job(self, job: JobExecutorInterface): async def check_active_jobs( self, active_jobs: List[SubmittedJobInfo] - ) -> Generator[SubmittedJobInfo, None, None]: + ) -> AsyncGenerator[SubmittedJobInfo, None, None]: # Check the status of active jobs. # You have to iterate over the given list active_jobs. @@ -267,23 +222,6 @@ def add_command(self, job: JobExecutorInterface, tibanna_args, tibanna_config): self.logger.debug("command = " + str(command)) tibanna_args.command = command - def add_workflow_files(self, job: JobExecutorInterface, tibanna_args): - snakefile_fname, snakemake_dir = self.split_filename(self.snakefile) - snakemake_child_fnames = [] - for src in self.workflow_sources: - src_fname, _ = self.split_filename(src, snakemake_dir) - if src_fname != snakefile_fname: # redundant - snakemake_child_fnames.append(src_fname) - # change path for config files - # TODO - this is a hacky way to do this - self.workflow.overwrite_configfiles = [ - self.split_filename(cf, snakemake_dir)[0] - for cf in self.workflow.overwrite_configfiles - ] - tibanna_args.snakemake_directory_local = snakemake_dir - tibanna_args.snakemake_main_filename = snakefile_fname - tibanna_args.snakemake_child_filenames = list(set(snakemake_child_fnames)) - def adjust_filepath(self, f): if not hasattr(f, "remote_object"): rel = self.remove_prefix(f) # log/benchmark @@ -296,59 +234,6 @@ def adjust_filepath(self, f): return rel def make_tibanna_input(self, job: JobExecutorInterface): - # input & output - # Local snakemake command here must be run with --default-remote-prefix - # and --default-remote-provider (forced) but on VM these options will be - # removed. - # The snakemake on the VM will consider these input and output as not remote. - # They files are transferred to the container by Tibanna before running - # snakemake. - # In short, the paths on VM must be consistent with what's in Snakefile. - # but the actual location of the files is on the S3 bucket/prefix. - # This mapping info must be passed to Tibanna. - for i in job.input: - self.logger.debug("job input " + str(i)) - self.logger.debug( - "job input is remote= " + ("true" if i.is_remote else "false") - ) - if hasattr(i.remote_object, "provider"): - self.logger.debug( - " is remote default= " - + ("true" if i.remote_object.provider.is_default else "false") - ) - for o in job.expanded_output: - self.logger.debug("job output " + str(o)) - self.logger.debug( - "job output is remote= " + ("true" if o.is_remote else "false") - ) - if hasattr(o.remote_object, "provider"): - self.logger.debug( - " is remote default= " - + ("true" if o.remote_object.provider.is_default else "false") - ) - file_prefix = ( - "file:///data1/snakemake" # working dir inside snakemake container on VM - ) - input_source = dict() - for ip in job.input: - ip_rel = self.adjust_filepath(ip) - input_source[os.path.join(file_prefix, ip_rel)] = "s3://" + ip - output_target = dict() - output_all = [eo for eo in job.expanded_output] - if job.log: - if isinstance(job.log, list): - output_all.extend([str(_) for _ in job.log]) - else: - output_all.append(str(job.log)) - if hasattr(job, "benchmark") and job.benchmark: - if isinstance(job.benchmark, list): - output_all.extend([str(_) for _ in job.benchmark]) - else: - output_all.append(str(job.benchmark)) - for op in output_all: - op_rel = self.adjust_filepath(op) - output_target[os.path.join(file_prefix, op_rel)] = "s3://" + op - # mem & cpu mem = job.resources["mem_mb"] / 1024 if "mem_mb" in job.resources.keys() else 1 cpu = job.threads @@ -374,14 +259,10 @@ def make_tibanna_input(self, job: JobExecutorInterface): ) tibanna_args = ec2_utils.Args( - output_S3_bucket=self.s3_bucket, language="snakemake", container_image=self.container_image, - input_files=input_source, - output_target=output_target, input_env=self.envvars, ) - self.add_workflow_files(job, tibanna_args) self.add_command(job, tibanna_args, tibanna_config) tibanna_input = { "jobid": jobid, diff --git a/tests/tests.py b/tests/tests.py index 4856003..0e27163 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -9,7 +9,7 @@ BUCKET_NAME = "snakemake-testing-%s-bucket" % next(tempfile._get_candidate_names()) -class TestWorkflowsBase(snakemake.common.tests.TestWorkflowsBase): +class TestWorkflows(snakemake.common.tests.TestWorkflowsMinioPlayStorageBase): __test__ = True def get_executor(self) -> str: @@ -21,12 +21,15 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: sfn=..., # TODO add test sfn ) - def get_default_remote_provider(self) -> Optional[str]: - # Return name of default remote provider if required for testing, - # otherwise None. - return "S3" - - def get_default_remote_prefix(self) -> Optional[str]: - # Return default remote prefix if required for testing, - # otherwise None. - return BUCKET_NAME + def get_assume_shared_fs(self) -> bool: + return False + + def get_remote_execution_settings( + self, + ) -> snakemake.settings.RemoteExecutionSettings: + return snakemake.settings.RemoteExecutionSettings( + seconds_between_status_checks=10, + envvars=self.get_envvars(), + # TODO remove once we have switched to stable snakemake for dev + container_image="snakemake/snakemake:latest", + )