From 2606ba5e4d1e1219da62dd68cb131ffaad7ab41a Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 24 Jul 2024 23:24:22 +0200 Subject: [PATCH] ensure that failed tasks fail the workload - if so configured --- examples/simple/bag_of_pipelines.py | 102 ++++++++++++++++++ src/radical/entk/appman/appmanager.py | 3 + src/radical/entk/appman/wfprocessor.py | 32 +++--- src/radical/entk/execman/rp/task_manager.py | 10 +- src/radical/entk/execman/rp/task_processor.py | 5 +- src/radical/entk/task.py | 7 ++ 6 files changed, 139 insertions(+), 20 deletions(-) create mode 100755 examples/simple/bag_of_pipelines.py diff --git a/examples/simple/bag_of_pipelines.py b/examples/simple/bag_of_pipelines.py new file mode 100755 index 00000000..a77326ce --- /dev/null +++ b/examples/simple/bag_of_pipelines.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +import sys +import time + +import radical.entk as re + +watch = list() + + +# ------------------------------------------------------------------------------ +# +def timeit(msg): + + watch.append(time.time()) + + if len(watch) > 1: + print('%-11s: %6.2f' % (msg, watch[-1] - watch[-2])) + else: + print() + + +# ------------------------------------------------------------------------------ +# +def main(): + + timeit('start') + + n_pipes = int(sys.argv[1]) + n_stages = int(sys.argv[2]) + n_tasks = int(sys.argv[3]) + backend = str(sys.argv[4]) + + tot_tasks = n_pipes * n_stages * n_tasks + tot_stages = n_pipes * n_stages + tot_nodes = tot_tasks + tot_stages + n_pipes + 1 + + print('%s: %d x %d x %d = %d tasks / %d nodes' + % (backend, n_pipes, n_stages, n_tasks, tot_tasks, tot_nodes)) + + td = {'executable': 'true'} + + am = re.AppManager(rts=backend, autoterminate=True) + + pipelines = list() + for _ in range(n_pipes): + + pipeline = re.Pipeline() + pipelines.append(pipeline) + + stages = list() + for _ in range(n_stages): + + stage = re.Stage() + pipeline.add_stages([stage]) + + for _ in range(n_tasks): + + task = re.Task(td) + stage.add_tasks([task]) + + am.workflow = set(pipelines) + timeit('create') + + rd = {'resource' : 'local.localhost', + 'cpus' : 32, + 'walltime' : 15} + am.resource_desc = rd + timeit('backend') + + # rw.puml_plot([wf], depth=2) + # timeit('plot (puml)') + + timeit('run') + am.run() + timeit('wait') + am.terminate() + timeit('close') + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + use_yappi = False + + if use_yappi: + import yappi + yappi.set_clock_type("wall") + yappi.start(builtins=True) + + main() + + if use_yappi: + import yappi + yappi.get_thread_stats().print_all() + stats = yappi.convert2pstats(yappi.get_func_stats()) + stats.dump_stats('pstats.prof') + + +# ------------------------------------------------------------------------------ + diff --git a/src/radical/entk/appman/appmanager.py b/src/radical/entk/appman/appmanager.py index 93881616..6779d7f9 100644 --- a/src/radical/entk/appman/appmanager.py +++ b/src/radical/entk/appman/appmanager.py @@ -693,6 +693,9 @@ def _run_workflow(self): for pipe in self._workflow: + if pipe.state == FAILED: + raise EnTKError('Pipeline %s failed' % pipe.uid) + with pipe.lock: if pipe.completed and \ diff --git a/src/radical/entk/appman/wfprocessor.py b/src/radical/entk/appman/wfprocessor.py index 30531d81..0c6ad505 100644 --- a/src/radical/entk/appman/wfprocessor.py +++ b/src/radical/entk/appman/wfprocessor.py @@ -102,9 +102,13 @@ def _advance(self, obj, obj_type, new_state): self._report.ok('Update: ') self._report.info('%s state: %s\n' % (obj.luid, obj.state)) - if obj_type == 'Task' and obj.state == FAILED: - self._report.error('task %s failed: %s\n%s\n' - % (obj.uid, obj.exception, obj.exception_detail)) + if obj.state == FAILED: + if obj_type == 'Task': + self._report.error('%s %s failed: %s\n%s\n' + % (obj_type, obj.uid, obj.exception, + obj.exception_detail)) + else: + self._report.error('%s %s failed\n' % (obj_type, obj.uid)) self._logger.info('Transition %s to state %s', obj.uid, new_state) @@ -348,21 +352,21 @@ def _update_dequeued_task(self, deq_task): if deq_task.metadata: task.metadata.update(deq_task.metadata) - # If there is no exit code, we assume success - # We are only concerned about state of task and not - # deq_task - if deq_task.exit_code == 0: - task_state = DONE - elif deq_task.exit_code == 1: - task_state = FAILED - else: - task_state = deq_task.state + task_state = deq_task.state task.exception = deq_task.exception task.exception_detail = deq_task.exception_detail - if task.state == FAILED and self._resubmit_failed: - task_state = INITIAL + if task.state == FAILED: + if self._resubmit_failed: + task_state = INITIAL + else: + if task.error_is_fatal: + print('=== not resubmitting task') + self._advance(stage, 'Stage', FAILED) + self._advance(pipe, 'Pipeline', FAILED) + return + self._advance(task, 'Task', task_state) # Found the task and processed it -- no more diff --git a/src/radical/entk/execman/rp/task_manager.py b/src/radical/entk/execman/rp/task_manager.py index 65757440..be970fe0 100644 --- a/src/radical/entk/execman/rp/task_manager.py +++ b/src/radical/entk/execman/rp/task_manager.py @@ -251,12 +251,16 @@ def task_state_cb(rp_task, state): self._log.debug('Task %s in state %s', rp_task.uid, rp_task.state) - if rp_task.state in rp.FINAL: + if state in rp.FINAL: task = create_task_from_rp(rp_task, self._log, self._prof) + if state == rp.DONE : target_state = states.DONE + if state == rp.FAILED : target_state = states.FAILED + if state == rp.CANCELED: target_state = states.CANCELED + # to AppManager - self._advance(task, 'Task', states.COMPLETED, 'cb-to-sync') + self._advance(task, 'Task', target_state, 'cb-to-sync') load_placeholder(task) @@ -265,7 +269,7 @@ def task_state_cb(rp_task, state): # to WFprocessor self._zmq_queue['put'].put(qname='completed', msgs=[tdict]) self._log.info('Pushed task %s with state %s to completed', - task.uid, task.state) + task.uid, state) except KeyboardInterrupt as ex: self._log.exception('Execution interrupted (probably by Ctrl+C)' diff --git a/src/radical/entk/execman/rp/task_processor.py b/src/radical/entk/execman/rp/task_processor.py index 968fc6f4..5c3b1ed0 100644 --- a/src/radical/entk/execman/rp/task_processor.py +++ b/src/radical/entk/execman/rp/task_processor.py @@ -5,6 +5,7 @@ import radical.pilot as rp import radical.utils as ru +from ... import states from ...task import Task from ...exceptions import EnTKTypeError, EnTKValueError @@ -564,10 +565,8 @@ def create_task_from_rp(rp_task, logger, prof=None): 'metadata' : rp_task.description.get('metadata', {}) }) - if rp_task.state == rp.DONE : task.exit_code = 0 - elif rp_task.state in [rp.FAILED, rp.CANCELED] : task.exit_code = 1 - if rp_task.state == rp.FAILED: + task.state = states.FAILED task.exception = rp_task.exception task.exception_detail = rp_task.exception_detail diff --git a/src/radical/entk/task.py b/src/radical/entk/task.py index 019007dd..bca89929 100644 --- a/src/radical/entk/task.py +++ b/src/radical/entk/task.py @@ -333,6 +333,11 @@ class Task(ru.TypedDict): [type: `str` | default: `""`] The name of the file to store stderr. If not set then the name of the following format will be used: `.err`. + .. data:: error_is_fatal + + [type: `bool` | default: `False`] Flag to indicate if workflow shouold + fail if the task got failed. + .. data:: stage_on_error [type: `bool` | default: `False`] Flag to allow staging out data if @@ -429,6 +434,7 @@ class Task(ru.TypedDict): 'download_output_data' : [str], 'stdout' : str, 'stderr' : str, + 'error_is_fatal' : bool, 'stage_on_error' : bool, 'exit_code' : int, 'exception' : str, @@ -470,6 +476,7 @@ class Task(ru.TypedDict): 'download_output_data' : [], 'stdout' : '', 'stderr' : '', + 'error_is_fatal' : False, 'stage_on_error' : False, 'exit_code' : None, 'exception' : None,