diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fd73756..7397d38 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,15 +13,19 @@ jobs: - name: Check out the code uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry @@ -36,15 +40,19 @@ jobs: - name: Check out the code uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry @@ -58,27 +66,30 @@ jobs: steps: - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: "3.11" + - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install dependencies run: | poetry install - - name: Setup AWS + - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v1 - if: env.AWS_AVAILABLE with: - aws-access-key-id: "${{ secrets.AWS_ACCESS_KEY_ID }}" - aws-secret-access-key: "${{ secrets.AWS_SECRET_ACCESS_KEY }}" + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: us-east-1 - name: Run pytest diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 47853e9..4ac1504 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -26,22 +26,21 @@ jobs: - uses: actions/setup-python@v2 with: - python-version: "3.9" + python-version: "3.11" - name: Install poetry - run: pipx install poetry + run: pip install poetry - name: Determine dependencies run: poetry lock - uses: actions/setup-python@v4 with: - python-version: "3.9" + python-version: "3.11" cache: poetry - name: Install Dependencies using Poetry run: | - pip install connection-pool # because it is incompatible with poetry poetry install - name: Publish to PyPi diff --git a/pyproject.toml b/pyproject.toml index cad05c6..f6e655c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,15 +3,16 @@ name = "snakemake-executor-plugin-tibanna" version = "0.1.0" description = "A Snakemake executor plugin for Amazon AWS workflow exection with tibanna." authors = [ - "Soohyun Lee <@SooLee>" + "Soohyun Lee <@SooLee>", # TODO please add Soo Lee's email address "Johannes Koester " ] readme = "README.md" [tool.poetry.dependencies] -python = "^3.9" -snakemake-interface-common = "^1.3.3" -snakemake-interface-executor-plugins = "^3.0.0" +python = "^3.11" +snakemake-interface-common = "^1.14.1" +snakemake-interface-executor-plugins = "^7.0.3" +tibanna = "^4.0.0" [tool.poetry.group.dev.dependencies] @@ -19,7 +20,11 @@ black = "^23.9.1" flake8 = "^6.1.0" coverage = "^7.3.1" pytest = "^7.4.2" -snakemake = {git = "https://github.com/snakemake/snakemake.git", branch = "feat/api-rewrite"} +snakemake = {git = "https://github.com/snakemake/snakemake.git"} +snakemake-storage-plugin-s3 = "^0.2.4" + +[tool.coverage.run] +omit = [".*", "*/site-packages/*", "Snakefile"] [build-system] requires = ["poetry-core"] diff --git a/snakemake_executor_plugin_tibanna/__init__.py b/snakemake_executor_plugin_tibanna/__init__.py index 233daf8..5f188bd 100644 --- a/snakemake_executor_plugin_tibanna/__init__.py +++ b/snakemake_executor_plugin_tibanna/__init__.py @@ -7,14 +7,14 @@ import math import os import re -from typing import List, Generator, Optional +from typing import AsyncGenerator, List, Generator, Optional from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor from snakemake_interface_executor_plugins import ExecutorSettingsBase, CommonSettings from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface from snakemake_interface_executor_plugins.jobs import ( - ExecutorJobInterface, + JobExecutorInterface, ) from snakemake_interface_common.exceptions import WorkflowError # noqa @@ -42,16 +42,6 @@ class ExecutorSettings(ExecutorSettingsBase): "required": True, }, ) - precommand: Optional[str] = field( - default=None, - metadata={ - "precommand": "Any command to execute before snakemake command on AWS " - "cloud such as wget, git clone, unzip, etc. This is used with --tibanna. " - "Do not include input/output download/upload commands - file transfer " - "between S3 bucket and the run environment (container) is automatically " - "handled by Tibanna." - }, - ) spot_instance: bool = field( default=False, metadata={ @@ -88,65 +78,30 @@ class ExecutorSettings(ExecutorSettingsBase): # filesystem (True) or not (False). # This is e.g. the case for cloud execution. implies_no_shared_fs=True, + pass_default_storage_provider_args=True, + pass_default_resources_args=True, + pass_envvar_declarations_to_cmd=False, + auto_deploy_default_storage_provider=True, ) # Required: # Implementation of your executor class Executor(RemoteExecutor): - def __init__( - self, - workflow: WorkflowExecutorInterface, - logger: LoggerExecutorInterface, - ): - super().__init__( - workflow, - logger, - # configure behavior of RemoteExecutor below - # whether arguments for setting the remote provider shall be passed to jobs - pass_default_remote_provider_args=False, - # whether arguments for setting default resources shall be passed to jobs - pass_default_resources_args=False, - # whether environment variables shall be passed to jobs - pass_envvar_declarations_to_cmd=False, - # specify initial amount of seconds to sleep before checking for job status - init_sleep_seconds=0, - ) - - self.workflow_sources = [] - for wfs in self.workflow.dag.get_sources(): - if os.path.isdir(wfs): - for dirpath, dirnames, filenames in os.walk(wfs): - self.workflow_sources.extend( - [os.path.join(dirpath, f) for f in filenames] - ) - else: - self.workflow_sources.append(os.path.abspath(wfs)) - - log = "sources=" - for f in self.workflow_sources: - log += f - self.snakefile = workflow.main_snakefile - self.envvars = {e: os.environ[e] for e in workflow.envvars} + def __post_init__(self): + self.snakefile = self.workflow.main_snakefile + self.envvars = {e: os.environ[e] for e in self.workflow.envvars} if self.envvars: - logger.debug("envvars = %s" % str(self.envvars)) + self.logger.debug("envvars = %s" % str(self.envvars)) self.tibanna_sfn = self.workflow.executor_settings.sfn self.precommand = self.workflow.executor_settings.precommand or "" - # TODO this does not work if the default remote is something else than S3 - self.s3_bucket = workflow.storage_settings.default_remote_prefix.split("/")[0] - self.s3_subdir = re.sub( - f"^{self.s3_bucket}/", "", workflow.storage_settings.default_remote_prefix - ) - logger.debug("precommand= " + self.precommand) - logger.debug("bucket=" + self.s3_bucket) - logger.debug("subdir=" + self.s3_subdir) - self.quiet = workflow.output_settings.quiet + self.quiet = self.workflow.output_settings.quiet self.container_image = self.workflow.remote_execution_settings.container_image - def run_job(self, job: ExecutorJobInterface): + def run_job(self, job: JobExecutorInterface): # Implement here how to run a job. # You can access the job's resources, etc. # via the job object. @@ -181,7 +136,7 @@ def run_job(self, job: ExecutorJobInterface): async def check_active_jobs( self, active_jobs: List[SubmittedJobInfo] - ) -> Generator[SubmittedJobInfo, None, None]: + ) -> AsyncGenerator[SubmittedJobInfo, None, None]: # Check the status of active jobs. # You have to iterate over the given list active_jobs. @@ -258,7 +213,7 @@ def remove_prefix(self, s): def get_snakefile(self): return os.path.basename(self.snakefile) - def add_command(self, job: ExecutorJobInterface, tibanna_args, tibanna_config): + def add_command(self, job: JobExecutorInterface, tibanna_args, tibanna_config): # format command command = self.format_job_exec(job) @@ -267,23 +222,6 @@ def add_command(self, job: ExecutorJobInterface, tibanna_args, tibanna_config): self.logger.debug("command = " + str(command)) tibanna_args.command = command - def add_workflow_files(self, job: ExecutorJobInterface, tibanna_args): - snakefile_fname, snakemake_dir = self.split_filename(self.snakefile) - snakemake_child_fnames = [] - for src in self.workflow_sources: - src_fname, _ = self.split_filename(src, snakemake_dir) - if src_fname != snakefile_fname: # redundant - snakemake_child_fnames.append(src_fname) - # change path for config files - # TODO - this is a hacky way to do this - self.workflow.overwrite_configfiles = [ - self.split_filename(cf, snakemake_dir)[0] - for cf in self.workflow.overwrite_configfiles - ] - tibanna_args.snakemake_directory_local = snakemake_dir - tibanna_args.snakemake_main_filename = snakefile_fname - tibanna_args.snakemake_child_filenames = list(set(snakemake_child_fnames)) - def adjust_filepath(self, f): if not hasattr(f, "remote_object"): rel = self.remove_prefix(f) # log/benchmark @@ -295,60 +233,7 @@ def adjust_filepath(self, f): rel = f return rel - def make_tibanna_input(self, job: ExecutorJobInterface): - # input & output - # Local snakemake command here must be run with --default-remote-prefix - # and --default-remote-provider (forced) but on VM these options will be - # removed. - # The snakemake on the VM will consider these input and output as not remote. - # They files are transferred to the container by Tibanna before running - # snakemake. - # In short, the paths on VM must be consistent with what's in Snakefile. - # but the actual location of the files is on the S3 bucket/prefix. - # This mapping info must be passed to Tibanna. - for i in job.input: - self.logger.debug("job input " + str(i)) - self.logger.debug( - "job input is remote= " + ("true" if i.is_remote else "false") - ) - if hasattr(i.remote_object, "provider"): - self.logger.debug( - " is remote default= " - + ("true" if i.remote_object.provider.is_default else "false") - ) - for o in job.expanded_output: - self.logger.debug("job output " + str(o)) - self.logger.debug( - "job output is remote= " + ("true" if o.is_remote else "false") - ) - if hasattr(o.remote_object, "provider"): - self.logger.debug( - " is remote default= " - + ("true" if o.remote_object.provider.is_default else "false") - ) - file_prefix = ( - "file:///data1/snakemake" # working dir inside snakemake container on VM - ) - input_source = dict() - for ip in job.input: - ip_rel = self.adjust_filepath(ip) - input_source[os.path.join(file_prefix, ip_rel)] = "s3://" + ip - output_target = dict() - output_all = [eo for eo in job.expanded_output] - if job.log: - if isinstance(job.log, list): - output_all.extend([str(_) for _ in job.log]) - else: - output_all.append(str(job.log)) - if hasattr(job, "benchmark") and job.benchmark: - if isinstance(job.benchmark, list): - output_all.extend([str(_) for _ in job.benchmark]) - else: - output_all.append(str(job.benchmark)) - for op in output_all: - op_rel = self.adjust_filepath(op) - output_target[os.path.join(file_prefix, op_rel)] = "s3://" + op - + def make_tibanna_input(self, job: JobExecutorInterface): # mem & cpu mem = job.resources["mem_mb"] / 1024 if "mem_mb" in job.resources.keys() else 1 cpu = job.threads @@ -374,14 +259,10 @@ def make_tibanna_input(self, job: ExecutorJobInterface): ) tibanna_args = ec2_utils.Args( - output_S3_bucket=self.s3_bucket, language="snakemake", container_image=self.container_image, - input_files=input_source, - output_target=output_target, input_env=self.envvars, ) - self.add_workflow_files(job, tibanna_args) self.add_command(job, tibanna_args, tibanna_config) tibanna_input = { "jobid": jobid, diff --git a/tests/tests.py b/tests/tests.py index 4856003..0e27163 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -9,7 +9,7 @@ BUCKET_NAME = "snakemake-testing-%s-bucket" % next(tempfile._get_candidate_names()) -class TestWorkflowsBase(snakemake.common.tests.TestWorkflowsBase): +class TestWorkflows(snakemake.common.tests.TestWorkflowsMinioPlayStorageBase): __test__ = True def get_executor(self) -> str: @@ -21,12 +21,15 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]: sfn=..., # TODO add test sfn ) - def get_default_remote_provider(self) -> Optional[str]: - # Return name of default remote provider if required for testing, - # otherwise None. - return "S3" - - def get_default_remote_prefix(self) -> Optional[str]: - # Return default remote prefix if required for testing, - # otherwise None. - return BUCKET_NAME + def get_assume_shared_fs(self) -> bool: + return False + + def get_remote_execution_settings( + self, + ) -> snakemake.settings.RemoteExecutionSettings: + return snakemake.settings.RemoteExecutionSettings( + seconds_between_status_checks=10, + envvars=self.get_envvars(), + # TODO remove once we have switched to stable snakemake for dev + container_image="snakemake/snakemake:latest", + )