From 4d94daaf3e3a363db9eb0a05696a8a4bb291081a Mon Sep 17 00:00:00 2001 From: Johannes Koester Date: Mon, 13 Nov 2023 10:46:11 +0100 Subject: [PATCH] adapt to changes in Snakemake 8 main branch, remove old code previously needed for handling input and output as well as workflow sources: all of this now happens automatically within Snakemake. --- .github/workflows/ci.yml | 24 ++- .github/workflows/release-please.yml | 7 +- pyproject.toml | 10 +- snakemake_executor_plugin_tibanna/__init__.py | 141 ++---------------- tests/tests.py | 23 +-- 5 files changed, 52 insertions(+), 153 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d15eeb9..7397d38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,15 +13,19 @@ jobs: - name: Check out the code uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry @@ -36,15 +40,19 @@ jobs: - name: Check out the code uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry @@ -58,15 +66,19 @@ jobs: steps: - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install dependencies diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 47853e9..4ac1504 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -26,22 +26,21 @@ jobs: - uses: actions/setup-python@v2 with: - python-version: "3.9" + python-version: "3.11" - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry run: | - pip install connection-pool # because it is incompatible with poetry poetry install - name: Publish to PyPi diff --git a/pyproject.toml b/pyproject.toml index 13c9e2b..f6e655c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,9 +9,9 @@ authors = [ readme = "README.md" [tool.poetry.dependencies] -python = ">=3.9,<3.11" -snakemake-interface-common = "^1.4.1" -snakemake-interface-executor-plugins = "^4.0.0" +python = "^3.11" +snakemake-interface-common = "^1.14.1" +snakemake-interface-executor-plugins = "^7.0.3" tibanna = "^4.0.0" @@ -21,6 +21,10 @@ flake8 = "^6.1.0" coverage = "^7.3.1" pytest = "^7.4.2" snakemake = {git = "https://github.com/snakemake/snakemake.git"} +snakemake-storage-plugin-s3 = "^0.2.4" + +[tool.coverage.run] +omit = [".*", "*/site-packages/*", "Snakefile"] [build-system] requires = ["poetry-core"] diff --git a/snakemake_executor_plugin_tibanna/__init__.py b/snakemake_executor_plugin_tibanna/__init__.py index 9e980fb..5f188bd 100644 --- a/snakemake_executor_plugin_tibanna/__init__.py +++ b/snakemake_executor_plugin_tibanna/__init__.py @@ -7,7 +7,7 @@ import math import os import re -from typing import List, Generator, Optional +from typing import AsyncGenerator, List, Generator, Optional from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor from snakemake_interface_executor_plugins import ExecutorSettingsBase, CommonSettings @@ -42,16 +42,6 @@ class ExecutorSettings(ExecutorSettingsBase): "required": True, }, ) - precommand: Optional[str] = field( - default=None, - metadata={ - "precommand": "Any command to execute before snakemake command on AWS " - "cloud such as wget, git clone, unzip, etc. This is used with --tibanna. " - "Do not include input/output download/upload commands - file transfer " - "between S3 bucket and the run environment (container) is automatically " - "handled by Tibanna." - }, - ) spot_instance: bool = field( default=False, metadata={ @@ -88,61 +78,26 @@ class ExecutorSettings(ExecutorSettingsBase): # filesystem (True) or not (False). # This is e.g. the case for cloud execution. implies_no_shared_fs=True, + pass_default_storage_provider_args=True, + pass_default_resources_args=True, + pass_envvar_declarations_to_cmd=False, + auto_deploy_default_storage_provider=True, ) # Required: # Implementation of your executor class Executor(RemoteExecutor): - def __init__( - self, - workflow: WorkflowExecutorInterface, - logger: LoggerExecutorInterface, - ): - super().__init__( - workflow, - logger, - # configure behavior of RemoteExecutor below - # whether arguments for setting the remote provider shall be passed to jobs - pass_default_remote_provider_args=False, - # whether arguments for setting default resources shall be passed to jobs - pass_default_resources_args=False, - # whether environment variables shall be passed to jobs - pass_envvar_declarations_to_cmd=False, - # specify initial amount of seconds to sleep before checking for job status - init_sleep_seconds=0, - ) - - self.workflow_sources = [] - for wfs in self.workflow.dag.get_sources(): - if os.path.isdir(wfs): - for dirpath, dirnames, filenames in os.walk(wfs): - self.workflow_sources.extend( - [os.path.join(dirpath, f) for f in filenames] - ) - else: - self.workflow_sources.append(os.path.abspath(wfs)) - - log = "sources=" - for f in self.workflow_sources: - log += f - self.snakefile = workflow.main_snakefile - self.envvars = {e: os.environ[e] for e in workflow.envvars} + def __post_init__(self): + self.snakefile = self.workflow.main_snakefile + self.envvars = {e: os.environ[e] for e in self.workflow.envvars} if self.envvars: - logger.debug("envvars = %s" % str(self.envvars)) + self.logger.debug("envvars = %s" % str(self.envvars)) self.tibanna_sfn = self.workflow.executor_settings.sfn self.precommand = self.workflow.executor_settings.precommand or "" - # TODO this does not work if the default remote is something else than S3 - self.s3_bucket = workflow.storage_settings.default_remote_prefix.split("/")[0] - self.s3_subdir = re.sub( - f"^{self.s3_bucket}/", "", workflow.storage_settings.default_remote_prefix - ) - logger.debug("precommand= " + self.precommand) - logger.debug("bucket=" + self.s3_bucket) - logger.debug("subdir=" + self.s3_subdir) - self.quiet = workflow.output_settings.quiet + self.quiet = self.workflow.output_settings.quiet self.container_image = self.workflow.remote_execution_settings.container_image @@ -181,7 +136,7 @@ def run_job(self, job: JobExecutorInterface): async def check_active_jobs( self, active_jobs: List[SubmittedJobInfo] - ) -> Generator[SubmittedJobInfo, None, None]: + ) -> AsyncGenerator[SubmittedJobInfo, None, None]: # Check the status of active jobs. # You have to iterate over the given list active_jobs. @@ -267,23 +222,6 @@ def add_command(self, job: JobExecutorInterface, tibanna_args, tibanna_config): self.logger.debug("command = " + str(command)) tibanna_args.command = command - def add_workflow_files(self, job: JobExecutorInterface, tibanna_args): - snakefile_fname, snakemake_dir = self.split_filename(self.snakefile) - snakemake_child_fnames = [] - for src in self.workflow_sources: - src_fname, _ = self.split_filename(src, snakemake_dir) - if src_fname != snakefile_fname: # redundant - snakemake_child_fnames.append(src_fname) - # change path for config files - # TODO - this is a hacky way to do this - self.workflow.overwrite_configfiles = [ - self.split_filename(cf, snakemake_dir)[0] - for cf in self.workflow.overwrite_configfiles - ] - tibanna_args.snakemake_directory_local = snakemake_dir - tibanna_args.snakemake_main_filename = snakefile_fname - tibanna_args.snakemake_child_filenames = list(set(snakemake_child_fnames)) - def adjust_filepath(self, f): if not hasattr(f, "remote_object"): rel = self.remove_prefix(f) # log/benchmark @@ -296,59 +234,6 @@ def adjust_filepath(self, f): return rel def make_tibanna_input(self, job: JobExecutorInterface): - # input & output - # Local snakemake command here must be run with --default-remote-prefix - # and --default-remote-provider (forced) but on VM these options will be - # removed. - # The snakemake on the VM will consider these input and output as not remote. - # They files are transferred to the container by Tibanna before running - # snakemake. - # In short, the paths on VM must be consistent with what's in Snakefile. - # but the actual location of the files is on the S3 bucket/prefix. - # This mapping info must be passed to Tibanna. - for i in job.input: - self.logger.debug("job input " + str(i)) - self.logger.debug( - "job input is remote= " + ("true" if i.is_remote else "false") - ) - if hasattr(i.remote_object, "provider"): - self.logger.debug( - " is remote default= " - + ("true" if i.remote_object.provider.is_default else "false") - ) - for o in job.expanded_output: - self.logger.debug("job output " + str(o)) - self.logger.debug( - "job output is remote= " + ("true" if o.is_remote else "false") - ) - if hasattr(o.remote_object, "provider"): - self.logger.debug( - " is remote default= " - + ("true" if o.remote_object.provider.is_default else "false") - ) - file_prefix = ( - "file:///data1/snakemake" # working dir inside snakemake container on VM - ) - input_source = dict() - for ip in job.input: - ip_rel = self.adjust_filepath(ip) - input_source[os.path.join(file_prefix, ip_rel)] = "s3://" + ip - output_target = dict() - output_all = [eo for eo in job.expanded_output] - if job.log: - if isinstance(job.log, list): - output_all.extend([str(_) for _ in job.log]) - else: - output_all.append(str(job.log)) - if hasattr(job, "benchmark") and job.benchmark: - if isinstance(job.benchmark, list): - output_all.extend([str(_) for _ in job.benchmark]) - else: - output_all.append(str(job.benchmark)) - for op in output_all: - op_rel = self.adjust_filepath(op) - output_target[os.path.join(file_prefix, op_rel)] = "s3://" + op - # mem & cpu mem = job.resources["mem_mb"] / 1024 if "mem_mb" in job.resources.keys() else 1 cpu = job.threads @@ -374,14 +259,10 @@ def make_tibanna_input(self, job: JobExecutorInterface): ) tibanna_args = ec2_utils.Args( - output_S3_bucket=self.s3_bucket, language="snakemake", container_image=self.container_image, - input_files=input_source, - output_target=output_target, input_env=self.envvars, ) - self.add_workflow_files(job, tibanna_args) self.add_command(job, tibanna_args, tibanna_config) tibanna_input = { "jobid": jobid, diff --git a/tests/tests.py b/tests/tests.py index 4856003..0e27163 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -9,7 +9,7 @@ BUCKET_NAME = "snakemake-testing-%s-bucket" % next(tempfile._get_candidate_names()) -class TestWorkflowsBase(snakemake.common.tests.TestWorkflowsBase): +class TestWorkflows(snakemake.common.tests.TestWorkflowsMinioPlayStorageBase): __test__ = True def get_executor(self) -> str: @@ -21,12 +21,15 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: sfn=..., # TODO add test sfn ) - def get_default_remote_provider(self) -> Optional[str]: - # Return name of default remote provider if required for testing, - # otherwise None. - return "S3" - - def get_default_remote_prefix(self) -> Optional[str]: - # Return default remote prefix if required for testing, - # otherwise None. - return BUCKET_NAME + def get_assume_shared_fs(self) -> bool: + return False + + def get_remote_execution_settings( + self, + ) -> snakemake.settings.RemoteExecutionSettings: + return snakemake.settings.RemoteExecutionSettings( + seconds_between_status_checks=10, + envvars=self.get_envvars(), + # TODO remove once we have switched to stable snakemake for dev + container_image="snakemake/snakemake:latest", + )