Skip to content

Commit

Permalink
ensure that failed tasks fail the workload - if so configured
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Jul 24, 2024
1 parent b001af2 commit 2606ba5
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 20 deletions.
102 changes: 102 additions & 0 deletions examples/simple/bag_of_pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python3

import sys
import time

import radical.entk as re

watch = list()


# ------------------------------------------------------------------------------
#
def timeit(msg):

watch.append(time.time())

if len(watch) > 1:
print('%-11s: %6.2f' % (msg, watch[-1] - watch[-2]))
else:
print()


# ------------------------------------------------------------------------------
#
def main():

timeit('start')

n_pipes = int(sys.argv[1])
n_stages = int(sys.argv[2])
n_tasks = int(sys.argv[3])
backend = str(sys.argv[4])

tot_tasks = n_pipes * n_stages * n_tasks
tot_stages = n_pipes * n_stages
tot_nodes = tot_tasks + tot_stages + n_pipes + 1

print('%s: %d x %d x %d = %d tasks / %d nodes'
% (backend, n_pipes, n_stages, n_tasks, tot_tasks, tot_nodes))

td = {'executable': 'true'}

am = re.AppManager(rts=backend, autoterminate=True)

pipelines = list()
for _ in range(n_pipes):

pipeline = re.Pipeline()
pipelines.append(pipeline)

stages = list()
for _ in range(n_stages):

stage = re.Stage()
pipeline.add_stages([stage])

for _ in range(n_tasks):

task = re.Task(td)
stage.add_tasks([task])

am.workflow = set(pipelines)
timeit('create')

rd = {'resource' : 'local.localhost',
'cpus' : 32,
'walltime' : 15}
am.resource_desc = rd
timeit('backend')

# rw.puml_plot([wf], depth=2)
# timeit('plot (puml)')

timeit('run')
am.run()
timeit('wait')
am.terminate()
timeit('close')


# ------------------------------------------------------------------------------
#
if __name__ == '__main__':

use_yappi = False

if use_yappi:
import yappi
yappi.set_clock_type("wall")
yappi.start(builtins=True)

main()

if use_yappi:
import yappi
yappi.get_thread_stats().print_all()
stats = yappi.convert2pstats(yappi.get_func_stats())
stats.dump_stats('pstats.prof')


# ------------------------------------------------------------------------------

3 changes: 3 additions & 0 deletions src/radical/entk/appman/appmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,9 @@ def _run_workflow(self):

for pipe in self._workflow:

if pipe.state == FAILED:
raise EnTKError('Pipeline %s failed' % pipe.uid)

with pipe.lock:

if pipe.completed and \
Expand Down
32 changes: 18 additions & 14 deletions src/radical/entk/appman/wfprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,13 @@ def _advance(self, obj, obj_type, new_state):
self._report.ok('Update: ')
self._report.info('%s state: %s\n' % (obj.luid, obj.state))

if obj_type == 'Task' and obj.state == FAILED:
self._report.error('task %s failed: %s\n%s\n'
% (obj.uid, obj.exception, obj.exception_detail))
if obj.state == FAILED:
if obj_type == 'Task':
self._report.error('%s %s failed: %s\n%s\n'
% (obj_type, obj.uid, obj.exception,
obj.exception_detail))
else:
self._report.error('%s %s failed\n' % (obj_type, obj.uid))

self._logger.info('Transition %s to state %s', obj.uid, new_state)

Expand Down Expand Up @@ -348,21 +352,21 @@ def _update_dequeued_task(self, deq_task):
if deq_task.metadata:
task.metadata.update(deq_task.metadata)

# If there is no exit code, we assume success
# We are only concerned about state of task and not
# deq_task
if deq_task.exit_code == 0:
task_state = DONE
elif deq_task.exit_code == 1:
task_state = FAILED
else:
task_state = deq_task.state
task_state = deq_task.state

task.exception = deq_task.exception
task.exception_detail = deq_task.exception_detail

if task.state == FAILED and self._resubmit_failed:
task_state = INITIAL
if task.state == FAILED:
if self._resubmit_failed:
task_state = INITIAL
else:
if task.error_is_fatal:
print('=== not resubmitting task')
self._advance(stage, 'Stage', FAILED)
self._advance(pipe, 'Pipeline', FAILED)
return

self._advance(task, 'Task', task_state)

# Found the task and processed it -- no more
Expand Down
10 changes: 7 additions & 3 deletions src/radical/entk/execman/rp/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,16 @@ def task_state_cb(rp_task, state):
self._log.debug('Task %s in state %s', rp_task.uid,
rp_task.state)

if rp_task.state in rp.FINAL:
if state in rp.FINAL:

task = create_task_from_rp(rp_task, self._log, self._prof)

if state == rp.DONE : target_state = states.DONE
if state == rp.FAILED : target_state = states.FAILED
if state == rp.CANCELED: target_state = states.CANCELED

# to AppManager
self._advance(task, 'Task', states.COMPLETED, 'cb-to-sync')
self._advance(task, 'Task', target_state, 'cb-to-sync')

load_placeholder(task)

Expand All @@ -265,7 +269,7 @@ def task_state_cb(rp_task, state):
# to WFprocessor
self._zmq_queue['put'].put(qname='completed', msgs=[tdict])
self._log.info('Pushed task %s with state %s to completed',
task.uid, task.state)
task.uid, state)

except KeyboardInterrupt as ex:
self._log.exception('Execution interrupted (probably by Ctrl+C)'
Expand Down
5 changes: 2 additions & 3 deletions src/radical/entk/execman/rp/task_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import radical.pilot as rp
import radical.utils as ru

from ... import states
from ...task import Task
from ...exceptions import EnTKTypeError, EnTKValueError

Expand Down Expand Up @@ -564,10 +565,8 @@ def create_task_from_rp(rp_task, logger, prof=None):
'metadata' : rp_task.description.get('metadata', {})
})

if rp_task.state == rp.DONE : task.exit_code = 0
elif rp_task.state in [rp.FAILED, rp.CANCELED] : task.exit_code = 1

if rp_task.state == rp.FAILED:
task.state = states.FAILED
task.exception = rp_task.exception
task.exception_detail = rp_task.exception_detail

Expand Down
7 changes: 7 additions & 0 deletions src/radical/entk/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ class Task(ru.TypedDict):
[type: `str` | default: `""`] The name of the file to store stderr. If
not set then the name of the following format will be used: `<uid>.err`.
.. data:: error_is_fatal
[type: `bool` | default: `False`] Flag to indicate if workflow shouold
fail if the task got failed.
.. data:: stage_on_error
[type: `bool` | default: `False`] Flag to allow staging out data if
Expand Down Expand Up @@ -429,6 +434,7 @@ class Task(ru.TypedDict):
'download_output_data' : [str],
'stdout' : str,
'stderr' : str,
'error_is_fatal' : bool,
'stage_on_error' : bool,
'exit_code' : int,
'exception' : str,
Expand Down Expand Up @@ -470,6 +476,7 @@ class Task(ru.TypedDict):
'download_output_data' : [],
'stdout' : '',
'stderr' : '',
'error_is_fatal' : False,
'stage_on_error' : False,
'exit_code' : None,
'exception' : None,
Expand Down

0 comments on commit 2606ba5

Please sign in to comment.