Skip to content

Commit

Permalink
Merge pull request #556 from viash-io/develop
Browse files Browse the repository at this point in the history
Viash 0.8.0-RC3
  • Loading branch information
rcannood authored Oct 7, 2023
2 parents 9c73769 + ac23740 commit 650f09d
Show file tree
Hide file tree
Showing 26 changed files with 529 additions and 233 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
# Viash 0.8.0-RC3 (2023-10-07): More bugfixes, more Nextflow functionality

This release contains more bugfixes related to the Nextflow code generation functionality.
It also adds a new `runIf` argument to the `NextflowPlatform` which allows for conditional execution of modules.

## NEW FUNCTIONALITY

* `NextflowPlatform`: Added new `.run()` argument `runIf` - a function that determines whether the module should be run or not (PR #553).
If the `runIf` closure evaluates to `true`, then the module will be run. Otherwise it will be passed through without running.

## MINOR CHANGES

* `NextflowPlatform`: Rename internal VDSL3 variables to be more consistent with regular Viash component variables and avoid naming clashes (PR #553).

## BUG FIXES

* `export cli_autocomplete`: Fix output script format and hide `--loglevel` and `--colorize` (PR #544). Masked arguments are usable but might not be very useful to always display in help messages.

# Viash 0.8.0-RC2 (2023-10-04): Some bugfixes

Some bugfixes related to the new dependencies and Nextflow code generation functionality.
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "viash"

version := "0.8.0-RC2"
version := "0.8.0-RC3"

scalaVersion := "2.13.10"

Expand Down
41 changes: 21 additions & 20 deletions src/main/resources/io/viash/platforms/nextflow/VDSL3Helper.nf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* Internally, this workflow will convert the input channel
* to a format which the Nextflow module will be able to handle.
*/
def vdsl3WorkflowFactory(Map args) {
def vdsl3WorkflowFactory(Map args, Map meta, String rawScript) {
def key = args["key"]
def processObj = null

Expand All @@ -20,7 +20,7 @@ def vdsl3WorkflowFactory(Map args) {
main:

if (processObj == null) {
processObj = _vdsl3ProcessFactory(args)
processObj = _vdsl3ProcessFactory(args, meta, rawScript)
}

output_ = input_
Expand All @@ -34,7 +34,7 @@ def vdsl3WorkflowFactory(Map args) {
}

// process input files separately
def inputPaths = thisConfig.functionality.allArguments
def inputPaths = meta.config.functionality.allArguments
.findAll { it.type == "file" && it.direction == "input" }
.collect { par ->
def val = data_.containsKey(par.plainName) ? data_[par.plainName] : []
Expand Down Expand Up @@ -62,7 +62,7 @@ def vdsl3WorkflowFactory(Map args) {
}

// remove input files
def argsExclInputFiles = thisConfig.functionality.allArguments
def argsExclInputFiles = meta.config.functionality.allArguments
.findAll { (it.type != "file" || it.direction != "input") && data_.containsKey(it.plainName) }
.collectEntries { par ->
def parName = par.plainName
Expand All @@ -76,11 +76,11 @@ def vdsl3WorkflowFactory(Map args) {
[parName, val]
}

[ id ] + inputPaths + [ argsExclInputFiles, resourcesDir ]
[ id ] + inputPaths + [ argsExclInputFiles, meta.resources_dir ]
}
| processObj
| map { output ->
def outputFiles = thisConfig.functionality.allArguments
def outputFiles = meta.config.functionality.allArguments
.findAll { it.type == "file" && it.direction == "output" }
.indexed()
.collectEntries{ index, par ->
Expand Down Expand Up @@ -114,13 +114,13 @@ def vdsl3WorkflowFactory(Map args) {
return processWf
}

// depends on: thisConfig, thisScript, session?
def _vdsl3ProcessFactory(Map workflowArgs) {
// depends on: session?
def _vdsl3ProcessFactory(Map workflowArgs, Map meta, String rawScript) {
// autodetect process key
def wfKey = workflowArgs["key"]
def procKeyPrefix = "${wfKey}_process"
def meta = nextflow.script.ScriptMeta.current()
def existing = meta.getProcessNames().findAll{it.startsWith(procKeyPrefix)}
def scriptMeta = nextflow.script.ScriptMeta.current()
def existing = scriptMeta.getProcessNames().findAll{it.startsWith(procKeyPrefix)}
def numbers = existing.collect{it.replace(procKeyPrefix, "0").toInteger()}
def newNumber = (numbers + [-1]).max() + 1

Expand Down Expand Up @@ -174,12 +174,12 @@ def _vdsl3ProcessFactory(Map workflowArgs) {
}
}.join()

def inputPaths = thisConfig.functionality.allArguments
def inputPaths = meta.config.functionality.allArguments
.findAll { it.type == "file" && it.direction == "input" }
.collect { ', path(viash_par_' + it.plainName + ', stageAs: "_viash_par/' + it.plainName + '_?/*")' }
.join()

def outputPaths = thisConfig.functionality.allArguments
def outputPaths = meta.config.functionality.allArguments
.findAll { it.type == "file" && it.direction == "output" }
.collect { par ->
// insert dummy into every output (see nextflow-io/nextflow#2678)
Expand All @@ -199,15 +199,15 @@ def _vdsl3ProcessFactory(Map workflowArgs) {
}

// create dirs for output files (based on BashWrapper.createParentFiles)
def createParentStr = thisConfig.functionality.allArguments
def createParentStr = meta.config.functionality.allArguments
.findAll { it.type == "file" && it.direction == "output" && it.create_parent }
.collect { par ->
"\${ args.containsKey(\"${par.plainName}\") ? \"mkdir_parent \\\"\" + (args[\"${par.plainName}\"] instanceof String ? args[\"${par.plainName}\"] : args[\"${par.plainName}\"].join('\" \"')) + \"\\\"\" : \"\" }"
}
.join("\n")

// construct inputFileExports
def inputFileExports = thisConfig.functionality.allArguments
def inputFileExports = meta.config.functionality.allArguments
.findAll { it.type == "file" && it.direction.toLowerCase() == "input" }
.collect { par ->
viash_par_contents = "(viash_par_${par.plainName} instanceof List ? viash_par_${par.plainName}.join(\"${par.multiple_sep}\") : viash_par_${par.plainName})"
Expand All @@ -229,15 +229,15 @@ def _vdsl3ProcessFactory(Map workflowArgs) {
).toAbsolutePath()

// construct stub
def stub = thisConfig.functionality.allArguments
def stub = meta.config.functionality.allArguments
.findAll { it.type == "file" && it.direction == "output" }
.collect { par ->
"\${ args.containsKey(\"${par.plainName}\") ? \"touch2 \\\"\" + (args[\"${par.plainName}\"] instanceof String ? args[\"${par.plainName}\"].replace(\"_*\", \"_0\") : args[\"${par.plainName}\"].join('\" \"')) + \"\\\"\" : \"\" }"
}
.join("\n")

// escape script
def escapedScript = thisScript.replace('\\', '\\\\').replace('$', '\\$').replace('"""', '\\"\\"\\"')
def escapedScript = rawScript.replace('\\', '\\\\').replace('$', '\\$').replace('"""', '\\"\\"\\"')

// publishdir assert
def assertStr = (workflowArgs.auto.publish == true) || workflowArgs.auto.transcript ?
Expand Down Expand Up @@ -269,7 +269,7 @@ def _vdsl3ProcessFactory(Map workflowArgs) {
|# export VIASH_META_RESOURCES_DIR="\${resourcesDir.toRealPath().toAbsolutePath()}"
|export VIASH_META_RESOURCES_DIR="\${resourcesDir}"
|export VIASH_META_TEMP_DIR="${['docker', 'podman', 'charliecloud'].any{ it == workflow.containerEngine } ? '/tmp' : tmpDir}"
|export VIASH_META_FUNCTIONALITY_NAME="${thisConfig.functionality.name}"
|export VIASH_META_FUNCTIONALITY_NAME="${meta.config.functionality.name}"
|# export VIASH_META_EXECUTABLE="\\\$VIASH_META_RESOURCES_DIR/\\\$VIASH_META_FUNCTIONALITY_NAME"
|export VIASH_META_CONFIG="\\\$VIASH_META_RESOURCES_DIR/.config.vsh.yaml"
|\${task.cpus ? "export VIASH_META_CPUS=\$task.cpus" : "" }
Expand Down Expand Up @@ -312,16 +312,17 @@ def _vdsl3ProcessFactory(Map workflowArgs) {
def ownerParams = new nextflow.script.ScriptBinding.ParamsMap()
def binding = new nextflow.script.ScriptBinding().setParams(ownerParams)
def module = new nextflow.script.IncludeDef.Module(name: procKey)
def session = nextflow.Nextflow.getSession()
def scriptParser = new nextflow.script.ScriptParser(session)
.setModule(true)
.setBinding(binding)
scriptParser.scriptPath = meta.getScriptPath()
scriptParser.scriptPath = scriptMeta.getScriptPath()
def moduleScript = scriptParser.runScript(procStr)
.getScript()

// register module in meta
meta.addModule(moduleScript, module.name, module.alias)
scriptMeta.addModule(moduleScript, module.name, module.alias)

// retrieve and return process from meta
return meta.getProcess(procKey)
return scriptMeta.getProcess(procKey)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
def typeCheck(String stage, Map par, Object value, String id, String key) {

def _checkArgumentType(String stage, Map par, Object value, String id, String key) {
// expectedClass will only be != null if value is not of the expected type
def expectedClass = null

Expand All @@ -8,7 +9,7 @@ def typeCheck(String stage, Map par, Object value, String id, String key) {
if (value instanceof List) {
try {
value = value.collect { listVal ->
typeCheck(stage, par + [multiple: false], listVal, id, key)
_checkArgumentType(stage, par + [multiple: false], listVal, id, key)
}
} catch (Exception e) {
expectedClass = "List[${par.type}]"
Expand Down Expand Up @@ -61,45 +62,3 @@ def typeCheck(String stage, Map par, Object value, String id, String key) {

return value
}

Map processInputs(Map inputs, Map config, String id, String key) {
if (!workflow.stubRun) {
config.functionality.allArguments.each { arg ->
if (arg.required) {
assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null :
"Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing"
}
}

inputs = inputs.collectEntries { name, value ->
def par = config.functionality.allArguments.find { it.plainName == name && (it.direction == "input" || it.type == "file") }
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid input argument"

value = typeCheck("input", par, value, id, key)

[ name, value ]
}
}
return inputs
}

Map processOutputs(Map outputs, Map config, String id, String key) {
if (!workflow.stubRun) {
config.functionality.allArguments.each { arg ->
if (arg.direction == "output" && arg.required) {
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
}
}

outputs = outputs.collectEntries { name, value ->
def par = config.functionality.allArguments.find { it.plainName == name && it.direction == "output" }
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument"

value = typeCheck("output", par, value, id, key)

[ name, value ]
}
}
return outputs
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Map _processInputValues(Map inputs, Map config, String id, String key) {
if (!workflow.stubRun) {
config.functionality.allArguments.each { arg ->
if (arg.required) {
assert inputs.containsKey(arg.plainName) && inputs.get(arg.plainName) != null :
"Error in module '${key}' id '${id}': required input argument '${arg.plainName}' is missing"
}
}

inputs = inputs.collectEntries { name, value ->
def par = config.functionality.allArguments.find { it.plainName == name && (it.direction == "input" || it.type == "file") }
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid input argument"

value = _checkArgumentType("input", par, value, id, key)

[ name, value ]
}
}
return inputs
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Map _processOutputValues(Map outputs, Map config, String id, String key) {
if (!workflow.stubRun) {
config.functionality.allArguments.each { arg ->
if (arg.direction == "output" && arg.required) {
assert outputs.containsKey(arg.plainName) && outputs.get(arg.plainName) != null :
"Error in module '${key}' id '${id}': required output argument '${arg.plainName}' is missing"
}
}

outputs = outputs.collectEntries { name, value ->
def par = config.functionality.allArguments.find { it.plainName == name && it.direction == "output" }
assert par != null : "Error in module '${key}' id '${id}': '${name}' is not a valid output argument"

value = _checkArgumentType("output", par, value, id, key)

[ name, value ]
}
}
return outputs
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* @return: a workflow that runs the components
**/
def runComponents(Map args) {
log.warn("runComponents is deprecated, use runEach instead")
assert args.components: "runComponents should be passed a list of components to run"

def components_ = args.components
Expand Down
105 changes: 105 additions & 0 deletions src/main/resources/io/viash/platforms/nextflow/channel/runEach.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Run a list of components on a stream of data.
*
* @param components: list of Viash VDSL3 modules to run
* @param fromState: a closure, a map or a list of keys to extract from the input data.
* If a closure, it will be called with the id, the data and the component itself.
* @param toState: a closure, a map or a list of keys to extract from the output data
* If a closure, it will be called with the id, the output data, the old state and the component itself.
* @param filter: filter function to apply to the input.
* It will be called with the id, the data and the component itself.
* @param id: id to use for the output data
* If a closure, it will be called with the id, the data and the component itself.
* @param auto: auto options to pass to the components
*
* @return: a workflow that runs the components
**/
def runEach(Map args) {
assert args.components: "runEach should be passed a list of components to run"

def components_ = args.components
if (components_ !instanceof List) {
components_ = [ components_ ]
}
assert components_.size() > 0: "pass at least one component to runEach"

def fromState_ = args.fromState
def toState_ = args.toState
def filter_ = args.filter
def id_ = args.id

workflow runEachWf {
take: input_ch
main:

// generate one channel per method
out_chs = components_.collect{ comp_ ->
filter_ch = filter_
? input_ch | filter{tup ->
filter_(tup[0], tup[1], comp_)
}
: input_ch
id_ch = id_
? filter_ch | map{tup ->
// def new_id = id_(tup[0], tup[1], comp_)
def new_id = tup[0]
if (id_ instanceof String) {
new_id = id_
} else if (id_ instanceof Closure) {
new_id = id_(new_id, tup[1], comp_)
}
[new_id] + tup.drop(1)
}
: filter_ch
data_ch = id_ch | map{tup ->
def new_data = tup[1]
if (fromState_ instanceof Map) {
new_data = fromState_.collectEntries{ key0, key1 ->
[key0, new_data[key1]]
}
} else if (fromState_ instanceof List) {
new_data = fromState_.collectEntries{ key ->
[key, new_data[key]]
}
} else if (fromState_ instanceof Closure) {
new_data = fromState_(tup[0], new_data, comp_)
}
tup.take(1) + [new_data] + tup.drop(1)
}
out_ch = data_ch
| comp_.run(
auto: (args.auto ?: [:]) + [simplifyInput: false, simplifyOutput: false]
)
post_ch = toState_
? out_ch | map{tup ->
def output = tup[1]
def old_state = tup[2]
if (toState_ instanceof Map) {
new_state = old_state + toState_.collectEntries{ key0, key1 ->
[key0, output[key1]]
}
} else if (toState_ instanceof List) {
new_state = old_state + toState_.collectEntries{ key ->
[key, output[key]]
}
} else if (toState_ instanceof Closure) {
new_state = toState_(tup[0], output, old_state, comp_)
}
[tup[0], new_state] + tup.drop(3)
}
: out_ch

post_ch
}

// mix all results
output_ch =
(out_chs.size == 1)
? out_chs[0]
: out_chs[0].mix(*out_chs.drop(1))

emit: output_ch
}

return runEachWf
}
Loading

0 comments on commit 650f09d

Please sign in to comment.