Skip to content

Commit

Permalink
Merge pull request #563 from viash-io/develop_0_8
Browse files Browse the repository at this point in the history
Viash 0.8.0-RC6
  • Loading branch information
rcannood authored Oct 11, 2023
2 parents 5b7cd74 + eb11aed commit 5162c97
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Viash 0.8.0-RC6 (2023-10-11): fix race condition issue in runEach

This part of the changelog will be removed.

# Viash 0.8.0-RC5 (2023-10-11): Fix run workflow

This part of the changelog will be removed.
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "viash"

version := "0.8.0-RC5"
version := "0.8.0-RC6"

scalaVersion := "2.13.10"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ def runComponents(Map args) {
out_chs = components_.collect{ comp_ ->
def comp_config = comp_.config

filter_ch = filter_
def filter_ch = filter_
? input_ch | filter{tup ->
filter_(tup[0], tup[1], comp_config)
}
: input_ch
id_ch = id_
def id_ch = id_
? filter_ch | map{tup ->
// def new_id = id_(tup[0], tup[1], comp_config)
def new_id = tup[0]
Expand All @@ -54,7 +54,7 @@ def runComponents(Map args) {
[new_id] + tup.drop(1)
}
: filter_ch
data_ch = id_ch | map{tup ->
def data_ch = id_ch | map{tup ->
def new_data = tup[1]
if (fromState_ instanceof Map) {
new_data = fromState_.collectEntries{ key0, key1 ->
Expand All @@ -69,11 +69,11 @@ def runComponents(Map args) {
}
tup.take(1) + [new_data] + tup.drop(1)
}
out_ch = data_ch
def out_ch = data_ch
| comp_.run(
auto: (args.auto ?: [:]) + [simplifyInput: false, simplifyOutput: false]
)
post_ch = toState_
def post_ch = toState_
? out_ch | map{tup ->
def output = tup[1]
def old_state = tup[2]
Expand Down
21 changes: 10 additions & 11 deletions src/main/resources/io/viash/platforms/nextflow/channel/runEach.nf
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,22 @@ def runEach(Map args) {

// generate one channel per method
out_chs = components_.collect{ comp_ ->
filter_ch = filter_
def filter_ch = filter_
? input_ch | filter{tup ->
filter_(tup[0], tup[1], comp_)
}
: input_ch
id_ch = id_
def id_ch = id_
? filter_ch | map{tup ->
// def new_id = id_(tup[0], tup[1], comp_)
def new_id = tup[0]
if (id_ instanceof String) {
new_id = id_
} else if (id_ instanceof Closure) {
new_id = id_(new_id, tup[1], comp_)
def new_id = id_
if (new_id instanceof Closure) {
new_id = new_id(tup[0], tup[1], comp_)
}
assert new_id instanceof String : "Error in runEach: id should be a String or a Closure that returns a String. Expected: id instanceof String. Found: ${new_id.getClass()}"
[new_id] + tup.drop(1)
}
: filter_ch
data_ch = id_ch | map{tup ->
def data_ch = id_ch | map{tup ->
def new_data = tup[1]
if (fromState_ instanceof Map) {
new_data = fromState_.collectEntries{ key0, key1 ->
Expand All @@ -66,14 +64,15 @@ def runEach(Map args) {
}
tup.take(1) + [new_data] + tup.drop(1)
}
out_ch = data_ch
def out_ch = data_ch
| comp_.run(
auto: (args.auto ?: [:]) + [simplifyInput: false, simplifyOutput: false]
)
post_ch = toState_
def post_ch = toState_
? out_ch | map{tup ->
def output = tup[1]
def old_state = tup[2]
def new_state = null
if (toState_ instanceof Map) {
new_state = old_state + toState_.collectEntries{ key0, key1 ->
[key0, output[key1]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def workflowFactory(Map args, Map defaultWfArgs, Map meta) {
| checkUniqueIds([:])
| _debug(workflowArgs, "input")
| map { tuple ->
tuple = tuple.clone()
tuple = deepClone(tuple)

if (workflowArgs.map) {
tuple = workflowArgs.map(tuple)
Expand Down

0 comments on commit 5162c97

Please sign in to comment.