Skip to content

Commit

Permalink
adapt to changes in Snakemake 8 main branch, remove old code previous…
Browse files Browse the repository at this point in the history
…ly needed for handling input and output as well as workflow sources: all of this now happens automatically within Snakemake.
  • Loading branch information
johanneskoester committed Nov 13, 2023
1 parent 3a79a84 commit 4d94daa
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 153 deletions.
24 changes: 18 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ jobs:
- name: Check out the code
uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: "3.11"

- name: Install poetry
run: pipx install poetry
run: pip install poetry

- name: Determine dependencies
run: poetry lock

- uses: actions/setup-python@v4
with:
python-version: "3.9"
python-version: "3.11"
cache: poetry

- name: Install Dependencies using Poetry
Expand All @@ -36,15 +40,19 @@ jobs:
- name: Check out the code
uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: "3.11"

- name: Install poetry
run: pipx install poetry
run: pip install poetry

- name: Determine dependencies
run: poetry lock

- uses: actions/setup-python@v4
with:
python-version: "3.9"
python-version: "3.11"
cache: poetry

- name: Install Dependencies using Poetry
Expand All @@ -58,15 +66,19 @@ jobs:
steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: "3.11"

- name: Install poetry
run: pipx install poetry
run: pip install poetry

- name: Determine dependencies
run: poetry lock

- uses: actions/setup-python@v4
with:
python-version: "3.9"
python-version: "3.11"
cache: poetry

- name: Install dependencies
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/release-please.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,21 @@ jobs:

- uses: actions/setup-python@v2
with:
python-version: "3.9"
python-version: "3.11"

- name: Install poetry
run: pipx install poetry
run: pip install poetry

- name: Determine dependencies
run: poetry lock

- uses: actions/setup-python@v4
with:
python-version: "3.9"
python-version: "3.11"
cache: poetry

- name: Install Dependencies using Poetry
run: |
pip install connection-pool # because it is incompatible with poetry
poetry install
- name: Publish to PyPi
Expand Down
10 changes: 7 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ authors = [
readme = "README.md"

[tool.poetry.dependencies]
python = ">=3.9,<3.11"
snakemake-interface-common = "^1.4.1"
snakemake-interface-executor-plugins = "^4.0.0"
python = "^3.11"
snakemake-interface-common = "^1.14.1"
snakemake-interface-executor-plugins = "^7.0.3"
tibanna = "^4.0.0"


Expand All @@ -21,6 +21,10 @@ flake8 = "^6.1.0"
coverage = "^7.3.1"
pytest = "^7.4.2"
snakemake = {git = "https://github.com/snakemake/snakemake.git"}
snakemake-storage-plugin-s3 = "^0.2.4"

[tool.coverage.run]
omit = [".*", "*/site-packages/*", "Snakefile"]

[build-system]
requires = ["poetry-core"]
Expand Down
141 changes: 11 additions & 130 deletions snakemake_executor_plugin_tibanna/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import math
import os
import re
from typing import List, Generator, Optional
from typing import AsyncGenerator, List, Generator, Optional
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor
from snakemake_interface_executor_plugins import ExecutorSettingsBase, CommonSettings
Expand Down Expand Up @@ -42,16 +42,6 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": True,
},
)
precommand: Optional[str] = field(
default=None,
metadata={
"precommand": "Any command to execute before snakemake command on AWS "
"cloud such as wget, git clone, unzip, etc. This is used with --tibanna. "
"Do not include input/output download/upload commands - file transfer "
"between S3 bucket and the run environment (container) is automatically "
"handled by Tibanna."
},
)
spot_instance: bool = field(
default=False,
metadata={
Expand Down Expand Up @@ -88,61 +78,26 @@ class ExecutorSettings(ExecutorSettingsBase):
# filesystem (True) or not (False).
# This is e.g. the case for cloud execution.
implies_no_shared_fs=True,
pass_default_storage_provider_args=True,
pass_default_resources_args=True,
pass_envvar_declarations_to_cmd=False,
auto_deploy_default_storage_provider=True,
)


# Required:
# Implementation of your executor
class Executor(RemoteExecutor):
def __init__(
self,
workflow: WorkflowExecutorInterface,
logger: LoggerExecutorInterface,
):
super().__init__(
workflow,
logger,
# configure behavior of RemoteExecutor below
# whether arguments for setting the remote provider shall be passed to jobs
pass_default_remote_provider_args=False,
# whether arguments for setting default resources shall be passed to jobs
pass_default_resources_args=False,
# whether environment variables shall be passed to jobs
pass_envvar_declarations_to_cmd=False,
# specify initial amount of seconds to sleep before checking for job status
init_sleep_seconds=0,
)

self.workflow_sources = []
for wfs in self.workflow.dag.get_sources():
if os.path.isdir(wfs):
for dirpath, dirnames, filenames in os.walk(wfs):
self.workflow_sources.extend(
[os.path.join(dirpath, f) for f in filenames]
)
else:
self.workflow_sources.append(os.path.abspath(wfs))

log = "sources="
for f in self.workflow_sources:
log += f
self.snakefile = workflow.main_snakefile
self.envvars = {e: os.environ[e] for e in workflow.envvars}
def __post_init__(self):
self.snakefile = self.workflow.main_snakefile
self.envvars = {e: os.environ[e] for e in self.workflow.envvars}
if self.envvars:
logger.debug("envvars = %s" % str(self.envvars))
self.logger.debug("envvars = %s" % str(self.envvars))
self.tibanna_sfn = self.workflow.executor_settings.sfn

self.precommand = self.workflow.executor_settings.precommand or ""

# TODO this does not work if the default remote is something else than S3
self.s3_bucket = workflow.storage_settings.default_remote_prefix.split("/")[0]
self.s3_subdir = re.sub(
f"^{self.s3_bucket}/", "", workflow.storage_settings.default_remote_prefix
)
logger.debug("precommand= " + self.precommand)
logger.debug("bucket=" + self.s3_bucket)
logger.debug("subdir=" + self.s3_subdir)
self.quiet = workflow.output_settings.quiet
self.quiet = self.workflow.output_settings.quiet

self.container_image = self.workflow.remote_execution_settings.container_image

Expand Down Expand Up @@ -181,7 +136,7 @@ def run_job(self, job: JobExecutorInterface):

async def check_active_jobs(
self, active_jobs: List[SubmittedJobInfo]
) -> Generator[SubmittedJobInfo, None, None]:
) -> AsyncGenerator[SubmittedJobInfo, None, None]:
# Check the status of active jobs.

# You have to iterate over the given list active_jobs.
Expand Down Expand Up @@ -267,23 +222,6 @@ def add_command(self, job: JobExecutorInterface, tibanna_args, tibanna_config):
self.logger.debug("command = " + str(command))
tibanna_args.command = command

def add_workflow_files(self, job: JobExecutorInterface, tibanna_args):
snakefile_fname, snakemake_dir = self.split_filename(self.snakefile)
snakemake_child_fnames = []
for src in self.workflow_sources:
src_fname, _ = self.split_filename(src, snakemake_dir)
if src_fname != snakefile_fname: # redundant
snakemake_child_fnames.append(src_fname)
# change path for config files
# TODO - this is a hacky way to do this
self.workflow.overwrite_configfiles = [
self.split_filename(cf, snakemake_dir)[0]
for cf in self.workflow.overwrite_configfiles
]
tibanna_args.snakemake_directory_local = snakemake_dir
tibanna_args.snakemake_main_filename = snakefile_fname
tibanna_args.snakemake_child_filenames = list(set(snakemake_child_fnames))

def adjust_filepath(self, f):
if not hasattr(f, "remote_object"):
rel = self.remove_prefix(f) # log/benchmark
Expand All @@ -296,59 +234,6 @@ def adjust_filepath(self, f):
return rel

def make_tibanna_input(self, job: JobExecutorInterface):
# input & output
# Local snakemake command here must be run with --default-remote-prefix
# and --default-remote-provider (forced) but on VM these options will be
# removed.
# The snakemake on the VM will consider these input and output as not remote.
# They files are transferred to the container by Tibanna before running
# snakemake.
# In short, the paths on VM must be consistent with what's in Snakefile.
# but the actual location of the files is on the S3 bucket/prefix.
# This mapping info must be passed to Tibanna.
for i in job.input:
self.logger.debug("job input " + str(i))
self.logger.debug(
"job input is remote= " + ("true" if i.is_remote else "false")
)
if hasattr(i.remote_object, "provider"):
self.logger.debug(
" is remote default= "
+ ("true" if i.remote_object.provider.is_default else "false")
)
for o in job.expanded_output:
self.logger.debug("job output " + str(o))
self.logger.debug(
"job output is remote= " + ("true" if o.is_remote else "false")
)
if hasattr(o.remote_object, "provider"):
self.logger.debug(
" is remote default= "
+ ("true" if o.remote_object.provider.is_default else "false")
)
file_prefix = (
"file:///data1/snakemake" # working dir inside snakemake container on VM
)
input_source = dict()
for ip in job.input:
ip_rel = self.adjust_filepath(ip)
input_source[os.path.join(file_prefix, ip_rel)] = "s3://" + ip
output_target = dict()
output_all = [eo for eo in job.expanded_output]
if job.log:
if isinstance(job.log, list):
output_all.extend([str(_) for _ in job.log])
else:
output_all.append(str(job.log))
if hasattr(job, "benchmark") and job.benchmark:
if isinstance(job.benchmark, list):
output_all.extend([str(_) for _ in job.benchmark])
else:
output_all.append(str(job.benchmark))
for op in output_all:
op_rel = self.adjust_filepath(op)
output_target[os.path.join(file_prefix, op_rel)] = "s3://" + op

# mem & cpu
mem = job.resources["mem_mb"] / 1024 if "mem_mb" in job.resources.keys() else 1
cpu = job.threads
Expand All @@ -374,14 +259,10 @@ def make_tibanna_input(self, job: JobExecutorInterface):
)

tibanna_args = ec2_utils.Args(
output_S3_bucket=self.s3_bucket,
language="snakemake",
container_image=self.container_image,
input_files=input_source,
output_target=output_target,
input_env=self.envvars,
)
self.add_workflow_files(job, tibanna_args)
self.add_command(job, tibanna_args, tibanna_config)
tibanna_input = {
"jobid": jobid,
Expand Down
23 changes: 13 additions & 10 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
BUCKET_NAME = "snakemake-testing-%s-bucket" % next(tempfile._get_candidate_names())


class TestWorkflowsBase(snakemake.common.tests.TestWorkflowsBase):
class TestWorkflows(snakemake.common.tests.TestWorkflowsMinioPlayStorageBase):
__test__ = True

def get_executor(self) -> str:
Expand All @@ -21,12 +21,15 @@ def get_executor_settings(self) -> Optional[ExecutorSettingsBase]:
sfn=..., # TODO add test sfn
)

def get_default_remote_provider(self) -> Optional[str]:
# Return name of default remote provider if required for testing,
# otherwise None.
return "S3"

def get_default_remote_prefix(self) -> Optional[str]:
# Return default remote prefix if required for testing,
# otherwise None.
return BUCKET_NAME
def get_assume_shared_fs(self) -> bool:
return False

def get_remote_execution_settings(
self,
) -> snakemake.settings.RemoteExecutionSettings:
return snakemake.settings.RemoteExecutionSettings(
seconds_between_status_checks=10,
envvars=self.get_envvars(),
# TODO remove once we have switched to stable snakemake for dev
container_image="snakemake/snakemake:latest",
)

0 comments on commit 4d94daa

Please sign in to comment.