Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: added dependency on tibanna and various little fixes #1

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
31 changes: 21 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ jobs:
- name: Check out the code
uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: "3.11"

- name: Install poetry
run: pipx install poetry
run: pip install poetry

- name: Determine dependencies
run: poetry lock

- uses: actions/setup-python@v4
with:
python-version: "3.9"
python-version: "3.11"
cache: poetry

- name: Install Dependencies using Poetry
Expand All @@ -36,15 +40,19 @@ jobs:
- name: Check out the code
uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: "3.11"

- name: Install poetry
run: pipx install poetry
run: pip install poetry

- name: Determine dependencies
run: poetry lock

- uses: actions/setup-python@v4
with:
python-version: "3.9"
python-version: "3.11"
cache: poetry

- name: Install Dependencies using Poetry
Expand All @@ -58,27 +66,30 @@ jobs:
steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v4
with:
python-version: "3.11"

- name: Install poetry
run: pipx install poetry
run: pip install poetry

- name: Determine dependencies
run: poetry lock

- uses: actions/setup-python@v4
with:
python-version: "3.9"
python-version: "3.11"
cache: poetry

- name: Install dependencies
run: |
poetry install

- name: Setup AWS
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
if: env.AWS_AVAILABLE
with:
aws-access-key-id: "${{ secrets.AWS_ACCESS_KEY_ID }}"
aws-secret-access-key: "${{ secrets.AWS_SECRET_ACCESS_KEY }}"
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-east-1

- name: Run pytest
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/release-please.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,21 @@ jobs:

- uses: actions/setup-python@v2
with:
python-version: "3.9"
python-version: "3.11"

- name: Install poetry
run: pipx install poetry
run: pip install poetry

- name: Determine dependencies
run: poetry lock

- uses: actions/setup-python@v4
with:
python-version: "3.9"
python-version: "3.11"
cache: poetry

- name: Install Dependencies using Poetry
run: |
pip install connection-pool # because it is incompatible with poetry
poetry install

- name: Publish to PyPi
Expand Down
15 changes: 10 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@ name = "snakemake-executor-plugin-tibanna"
version = "0.1.0"
description = "A Snakemake executor plugin for Amazon AWS workflow exection with tibanna."
authors = [
"Soohyun Lee <@SooLee>"
"Soohyun Lee <@SooLee>", # TODO please add Soo Lee's email address
"Johannes Koester <[email protected]>"
]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.9"
snakemake-interface-common = "^1.3.3"
snakemake-interface-executor-plugins = "^3.0.0"
python = "^3.11"
snakemake-interface-common = "^1.14.1"
snakemake-interface-executor-plugins = "^7.0.3"
tibanna = "^4.0.0"


[tool.poetry.group.dev.dependencies]
black = "^23.9.1"
flake8 = "^6.1.0"
coverage = "^7.3.1"
pytest = "^7.4.2"
snakemake = {git = "https://github.com/snakemake/snakemake.git", branch = "feat/api-rewrite"}
snakemake = {git = "https://github.com/snakemake/snakemake.git"}
snakemake-storage-plugin-s3 = "^0.2.4"

[tool.coverage.run]
omit = [".*", "*/site-packages/*", "Snakefile"]

[build-system]
requires = ["poetry-core"]
Expand Down
149 changes: 15 additions & 134 deletions snakemake_executor_plugin_tibanna/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
import math
import os
import re
from typing import List, Generator, Optional
from typing import AsyncGenerator, List, Generator, Optional
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor
from snakemake_interface_executor_plugins import ExecutorSettingsBase, CommonSettings
from snakemake_interface_executor_plugins.workflow import WorkflowExecutorInterface
from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface
from snakemake_interface_executor_plugins.jobs import (
ExecutorJobInterface,
JobExecutorInterface,
)
from snakemake_interface_common.exceptions import WorkflowError # noqa

Expand Down Expand Up @@ -42,16 +42,6 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": True,
},
)
precommand: Optional[str] = field(
default=None,
metadata={
"precommand": "Any command to execute before snakemake command on AWS "
"cloud such as wget, git clone, unzip, etc. This is used with --tibanna. "
"Do not include input/output download/upload commands - file transfer "
"between S3 bucket and the run environment (container) is automatically "
"handled by Tibanna."
},
)
spot_instance: bool = field(
default=False,
metadata={
Expand Down Expand Up @@ -88,65 +78,30 @@ class ExecutorSettings(ExecutorSettingsBase):
# filesystem (True) or not (False).
# This is e.g. the case for cloud execution.
implies_no_shared_fs=True,
pass_default_storage_provider_args=True,
pass_default_resources_args=True,
pass_envvar_declarations_to_cmd=False,
auto_deploy_default_storage_provider=True,
)


# Required:
# Implementation of your executor
class Executor(RemoteExecutor):
def __init__(
self,
workflow: WorkflowExecutorInterface,
logger: LoggerExecutorInterface,
):
super().__init__(
workflow,
logger,
# configure behavior of RemoteExecutor below
# whether arguments for setting the remote provider shall be passed to jobs
pass_default_remote_provider_args=False,
# whether arguments for setting default resources shall be passed to jobs
pass_default_resources_args=False,
# whether environment variables shall be passed to jobs
pass_envvar_declarations_to_cmd=False,
# specify initial amount of seconds to sleep before checking for job status
init_sleep_seconds=0,
)

self.workflow_sources = []
for wfs in self.workflow.dag.get_sources():
if os.path.isdir(wfs):
for dirpath, dirnames, filenames in os.walk(wfs):
self.workflow_sources.extend(
[os.path.join(dirpath, f) for f in filenames]
)
else:
self.workflow_sources.append(os.path.abspath(wfs))

log = "sources="
for f in self.workflow_sources:
log += f
self.snakefile = workflow.main_snakefile
self.envvars = {e: os.environ[e] for e in workflow.envvars}
def __post_init__(self):
self.snakefile = self.workflow.main_snakefile
self.envvars = {e: os.environ[e] for e in self.workflow.envvars}
if self.envvars:
logger.debug("envvars = %s" % str(self.envvars))
self.logger.debug("envvars = %s" % str(self.envvars))
self.tibanna_sfn = self.workflow.executor_settings.sfn

self.precommand = self.workflow.executor_settings.precommand or ""

# TODO this does not work if the default remote is something else than S3
self.s3_bucket = workflow.storage_settings.default_remote_prefix.split("/")[0]
self.s3_subdir = re.sub(
f"^{self.s3_bucket}/", "", workflow.storage_settings.default_remote_prefix
)
logger.debug("precommand= " + self.precommand)
logger.debug("bucket=" + self.s3_bucket)
logger.debug("subdir=" + self.s3_subdir)
self.quiet = workflow.output_settings.quiet
self.quiet = self.workflow.output_settings.quiet

self.container_image = self.workflow.remote_execution_settings.container_image

def run_job(self, job: ExecutorJobInterface):
def run_job(self, job: JobExecutorInterface):
# Implement here how to run a job.
# You can access the job's resources, etc.
# via the job object.
Expand Down Expand Up @@ -181,7 +136,7 @@ def run_job(self, job: ExecutorJobInterface):

async def check_active_jobs(
self, active_jobs: List[SubmittedJobInfo]
) -> Generator[SubmittedJobInfo, None, None]:
) -> AsyncGenerator[SubmittedJobInfo, None, None]:
# Check the status of active jobs.

# You have to iterate over the given list active_jobs.
Expand Down Expand Up @@ -258,7 +213,7 @@ def remove_prefix(self, s):
def get_snakefile(self):
return os.path.basename(self.snakefile)

def add_command(self, job: ExecutorJobInterface, tibanna_args, tibanna_config):
def add_command(self, job: JobExecutorInterface, tibanna_args, tibanna_config):
# format command
command = self.format_job_exec(job)

Expand All @@ -267,23 +222,6 @@ def add_command(self, job: ExecutorJobInterface, tibanna_args, tibanna_config):
self.logger.debug("command = " + str(command))
tibanna_args.command = command

def add_workflow_files(self, job: ExecutorJobInterface, tibanna_args):
snakefile_fname, snakemake_dir = self.split_filename(self.snakefile)
snakemake_child_fnames = []
for src in self.workflow_sources:
src_fname, _ = self.split_filename(src, snakemake_dir)
if src_fname != snakefile_fname: # redundant
snakemake_child_fnames.append(src_fname)
# change path for config files
# TODO - this is a hacky way to do this
self.workflow.overwrite_configfiles = [
self.split_filename(cf, snakemake_dir)[0]
for cf in self.workflow.overwrite_configfiles
]
tibanna_args.snakemake_directory_local = snakemake_dir
tibanna_args.snakemake_main_filename = snakefile_fname
tibanna_args.snakemake_child_filenames = list(set(snakemake_child_fnames))

def adjust_filepath(self, f):
if not hasattr(f, "remote_object"):
rel = self.remove_prefix(f) # log/benchmark
Expand All @@ -295,60 +233,7 @@ def adjust_filepath(self, f):
rel = f
return rel

def make_tibanna_input(self, job: ExecutorJobInterface):
# input & output
# Local snakemake command here must be run with --default-remote-prefix
# and --default-remote-provider (forced) but on VM these options will be
# removed.
# The snakemake on the VM will consider these input and output as not remote.
# They files are transferred to the container by Tibanna before running
# snakemake.
# In short, the paths on VM must be consistent with what's in Snakefile.
# but the actual location of the files is on the S3 bucket/prefix.
# This mapping info must be passed to Tibanna.
for i in job.input:
self.logger.debug("job input " + str(i))
self.logger.debug(
"job input is remote= " + ("true" if i.is_remote else "false")
)
if hasattr(i.remote_object, "provider"):
self.logger.debug(
" is remote default= "
+ ("true" if i.remote_object.provider.is_default else "false")
)
for o in job.expanded_output:
self.logger.debug("job output " + str(o))
self.logger.debug(
"job output is remote= " + ("true" if o.is_remote else "false")
)
if hasattr(o.remote_object, "provider"):
self.logger.debug(
" is remote default= "
+ ("true" if o.remote_object.provider.is_default else "false")
)
file_prefix = (
"file:///data1/snakemake" # working dir inside snakemake container on VM
)
input_source = dict()
for ip in job.input:
ip_rel = self.adjust_filepath(ip)
input_source[os.path.join(file_prefix, ip_rel)] = "s3://" + ip
output_target = dict()
output_all = [eo for eo in job.expanded_output]
if job.log:
if isinstance(job.log, list):
output_all.extend([str(_) for _ in job.log])
else:
output_all.append(str(job.log))
if hasattr(job, "benchmark") and job.benchmark:
if isinstance(job.benchmark, list):
output_all.extend([str(_) for _ in job.benchmark])
else:
output_all.append(str(job.benchmark))
for op in output_all:
op_rel = self.adjust_filepath(op)
output_target[os.path.join(file_prefix, op_rel)] = "s3://" + op

def make_tibanna_input(self, job: JobExecutorInterface):
# mem & cpu
mem = job.resources["mem_mb"] / 1024 if "mem_mb" in job.resources.keys() else 1
cpu = job.threads
Expand All @@ -374,14 +259,10 @@ def make_tibanna_input(self, job: ExecutorJobInterface):
)

tibanna_args = ec2_utils.Args(
output_S3_bucket=self.s3_bucket,
language="snakemake",
container_image=self.container_image,
input_files=input_source,
output_target=output_target,
input_env=self.envvars,
)
self.add_workflow_files(job, tibanna_args)
self.add_command(job, tibanna_args, tibanna_config)
tibanna_input = {
"jobid": jobid,
Expand Down
Loading
Loading