Skip to content

Commit

Permalink
Handle spurious ReceiveScatterOutput callbacks (#2051)
Browse files Browse the repository at this point in the history
This commit fixes #2003 by handling spurious, repeated callbacks of the
`receive_scatter_output` method of the `ReceiveScatterOutput` class. The
reason of multiple awakenings has not been investigated deeply, though.
In the future, a thorough examination of the `MultithreadedJobExecutor`
logic may be necessary.

---------

Co-authored-by: Michael R. Crusoe <[email protected]>
  • Loading branch information
GlassOfWhiskey and mr-c authored Oct 8, 2024
1 parent 18b8fdf commit 6970186
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions cwltool/workflow_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
MutableMapping,
MutableSequence,
Optional,
Set,
Sized,
Tuple,
Union,
Expand Down Expand Up @@ -88,12 +89,17 @@ def __init__(
) -> None:
"""Initialize."""
self.dest = dest
self.completed = 0
self._completed: Set[int] = set()
self.processStatus = "success"
self.total = total
self.output_callback = output_callback
self.steps: List[Optional[JobsGeneratorType]] = []

@property
def completed(self) -> int:
"""The number of completed internal jobs."""
return len(self._completed)

def receive_scatter_output(self, index: int, jobout: CWLObjectType, processStatus: str) -> None:
"""Record the results of a scatter operation."""
for key, val in jobout.items():
Expand All @@ -108,10 +114,11 @@ def receive_scatter_output(self, index: int, jobout: CWLObjectType, processStatu
if self.processStatus != "permanentFail":
self.processStatus = processStatus

self.completed += 1
if index not in self._completed:
self._completed.add(index)

if self.completed == self.total:
self.output_callback(self.dest, self.processStatus)
if self.completed == self.total:
self.output_callback(self.dest, self.processStatus)

def setTotal(
self,
Expand Down

0 comments on commit 6970186

Please sign in to comment.