From cc52e976a6dd705db52d155025c9971e76c1a3a1 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Wed, 25 Oct 2023 16:35:03 -0400 Subject: [PATCH 1/2] Avoid reserializing results into multiprocessing in Work Queue and Task Vine executors (#2916) This PR fixes issue #2908 --- .../executors/taskvine/exec_parsl_function.py | 7 +-- parsl/executors/taskvine/executor.py | 13 ++++- parsl/executors/taskvine/manager.py | 28 ++++------ parsl/executors/taskvine/utils.py | 10 ++-- .../workqueue/exec_parsl_function.py | 3 +- parsl/executors/workqueue/executor.py | 55 +++++++++++-------- 6 files changed, 65 insertions(+), 51 deletions(-) diff --git a/parsl/executors/taskvine/exec_parsl_function.py b/parsl/executors/taskvine/exec_parsl_function.py index 59cca96bab..0bd86b7a99 100644 --- a/parsl/executors/taskvine/exec_parsl_function.py +++ b/parsl/executors/taskvine/exec_parsl_function.py @@ -1,11 +1,10 @@ import traceback import sys -import pickle from parsl.app.errors import RemoteExceptionWrapper from parsl.data_provider.files import File from parsl.utils import get_std_fname_mode -from parsl.serialize import deserialize +from parsl.serialize import deserialize, serialize # This scripts executes a parsl function which is pickled in 4 files: # @@ -30,10 +29,10 @@ # -def dump_result_to_file(result_file: str, result_package): +def dump_result_to_file(result_file: str, result): """ Dump a result to the given result file.""" with open(result_file, "wb") as f_out: - pickle.dump(result_package, f_out) + f_out.write(serialize(result)) def remap_location(mapping, parsl_file): diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index a6ce3987bc..ac9681f9b7 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -22,7 +22,7 @@ # Import Parsl constructs import parsl.utils as putils from parsl.data_provider.staging import Staging -from parsl.serialize import serialize +from parsl.serialize import serialize, deserialize from parsl.data_provider.files import File from parsl.errors import OptionalModuleMissing from parsl.providers.base import ExecutionProvider @@ -639,11 +639,18 @@ def _collect_taskvine_results(self): logger.debug(f'Updating Future for Parsl Task: {task_report.executor_id}. \ Task {task_report.executor_id} has result_received set to {task_report.result_received}') if task_report.result_received: - future.set_result(task_report.result) + try: + with open(task_report.result_file, 'rb') as f_in: + result = deserialize(f_in.read()) + except Exception as e: + logger.debug(f'Cannot load result from result file {task_report.result_file}. Exception: {e}') + future.set_exception(TaskVineTaskFailure('Cannot load result from result file', None)) + else: + future.set_result(result) else: # If there are no results, then the task failed according to one of # taskvine modes, such as resource exhaustion. - future.set_exception(TaskVineTaskFailure(task_report.reason, task_report.result)) + future.set_exception(TaskVineTaskFailure(task_report.reason, None)) # decrement outstanding task counter with self._outstanding_tasks_lock: diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 2351a0a301..c33053e313 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -2,7 +2,6 @@ import hashlib import subprocess import os -import pickle import queue import shutil import uuid @@ -229,7 +228,7 @@ def _taskvine_submit_wait(ready_task_queue=None, logger.error("Unable to create executor task (mode:regular): {}".format(e)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=task.executor_id, result_received=False, - result=None, + result_file=None, reason="task could not be created by taskvine", status=-1)) continue @@ -268,7 +267,7 @@ def _taskvine_submit_wait(ready_task_queue=None, logger.error("Unable to create executor task (mode:serverless): {}".format(e)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=task.executor_id, result_received=False, - result=None, + result_file=None, reason="task could not be created by taskvine", status=-1)) else: @@ -369,7 +368,7 @@ def _taskvine_submit_wait(ready_task_queue=None, logger.error("Unable to submit task to taskvine: {}".format(e)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=task.executor_id, result_received=False, - result=None, + result_file=None, reason="task could not be submited to taskvine", status=-1)) continue @@ -394,24 +393,21 @@ def _taskvine_submit_wait(ready_task_queue=None, logger.debug(f"completed executor task info: {executor_task_id}, {t.category}, {t.command}, {t.std_output}") - # A tasks completes 'succesfully' if it has result file, - # and it can be loaded. This may mean that the 'success' is - # an exception. + # A tasks completes 'succesfully' if it has result file. + # A check whether the Python object represented using this file can be + # deserialized happens later in the collector thread of the executor + # process. logger.debug("Looking for result in {}".format(result_file)) - try: - with open(result_file, "rb") as f_in: - result = pickle.load(f_in) + if os.path.exists(result_file): logger.debug("Found result in {}".format(result_file)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=executor_task_id, result_received=True, - result=result, + result_file=result_file, reason=None, status=t.exit_code)) # If a result file could not be generated, explain the - # failure according to taskvine error codes. We generate - # an exception and wrap it with RemoteExceptionWrapper, to - # match the positive case. - except Exception as e: + # failure according to taskvine error codes. + else: reason = _explain_taskvine_result(t) logger.debug("Did not find result in {}".format(result_file)) logger.debug("Wrapper Script status: {}\nTaskVine Status: {}" @@ -420,7 +416,7 @@ def _taskvine_submit_wait(ready_task_queue=None, .format(executor_task_id, t.id, reason)) finished_task_queue.put_nowait(VineTaskToParsl(executor_id=executor_task_id, result_received=False, - result=e, + result_file=None, reason=reason, status=t.exit_code)) diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 9f0d9f7c05..86cf446b1a 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -42,20 +42,20 @@ def __init__(self, class VineTaskToParsl: """ - Support structure to communicate final status of TaskVine tasks to Parsl - result is only valid if result_received is True - reason and status are only valid if result_received is False + Support structure to communicate final status of TaskVine tasks to Parsl. + result_file is only valid if result_received is True. + Reason and status are only valid if result_received is False. """ def __init__(self, executor_id: int, # executor id of task result_received: bool, # whether result is received or not - result, # result object if available + result_file: Optional[str], # path to file that contains the serialized result object reason: Optional[str], # string describing why execution fails status: Optional[int] # exit code of execution of task ): self.executor_id = executor_id self.result_received = result_received - self.result = result + self.result_file = result_file self.reason = reason self.status = status diff --git a/parsl/executors/workqueue/exec_parsl_function.py b/parsl/executors/workqueue/exec_parsl_function.py index 32dc51d281..9aa8674677 100644 --- a/parsl/executors/workqueue/exec_parsl_function.py +++ b/parsl/executors/workqueue/exec_parsl_function.py @@ -4,6 +4,7 @@ import traceback import sys import pickle +from parsl.serialize import serialize # This scripts executes a parsl function which is pickled in a file: # @@ -32,7 +33,7 @@ def load_pickled_file(filename): def dump_result_to_file(result_file, result_package): with open(result_file, "wb") as f_out: - pickle.dump(result_package, f_out) + f_out.write(serialize(result_package)) def remap_location(mapping, parsl_file): diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 8655a507d4..230b074ff0 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -21,7 +21,7 @@ import shutil import itertools -from parsl.serialize import pack_apply_message +from parsl.serialize import pack_apply_message, deserialize import parsl.utils as putils from parsl.executors.errors import ExecutorError from parsl.data_provider.files import File @@ -66,11 +66,11 @@ # Support structure to communicate final status of work queue tasks to parsl # if result_received is True: -# result is the result +# result_file is the path to the file containing the result. # if result_received is False: # reason and status are only valid if result_received is False -# result is either None or an exception raised while looking for a result -WqTaskToParsl = namedtuple('WqTaskToParsl', 'id result_received result reason status') +# result_file is None +WqTaskToParsl = namedtuple('WqTaskToParsl', 'id result_received result_file reason status') # Support structure to report parsl filenames to work queue. # parsl_name is the local_name or filepath attribute of a parsl file object. @@ -729,14 +729,29 @@ def _collect_work_queue_results(self): with self.tasks_lock: future = self.tasks.pop(task_report.id) logger.debug("Updating Future for executor task {}".format(task_report.id)) + # If result_received, then there's a result file. The object inside the file + # may be a valid result or an exception caused within the function invocation. + # Otherwise there's no result file, implying errors from WorkQueue. if task_report.result_received: - future.set_result(task_report.result) + try: + with open(task_report.result_file, 'rb') as f_in: + result = deserialize(f_in.read()) + except Exception as e: + logger.error(f'Cannot load result from result file {task_report.result_file}. Exception: {e}') + ex = WorkQueueTaskFailure('Cannot load result from result file', None) + ex.__cause__ = e + future.set_exception(ex) + else: + if isinstance(result, Exception): + ex = WorkQueueTaskFailure('Task execution raises an exception', result) + ex.__cause__ = result + future.set_exception(ex) + else: + future.set_result(result) else: # If there are no results, then the task failed according to one of # work queue modes, such as resource exhaustion. - ex = WorkQueueTaskFailure(task_report.reason, task_report.result) - if task_report.result is not None: - ex.__cause__ = task_report.result + ex = WorkQueueTaskFailure(task_report.reason, None) future.set_exception(ex) finally: logger.debug("Marking all outstanding tasks as failed") @@ -876,7 +891,7 @@ def _work_queue_submit_wait(*, logger.error("Unable to create task: {}".format(e)) collector_queue.put_nowait(WqTaskToParsl(id=task.id, result_received=False, - result=None, + result_file=None, reason="task could not be created by work queue", status=-1)) continue @@ -937,7 +952,7 @@ def _work_queue_submit_wait(*, logger.error("Unable to submit task to work queue: {}".format(e)) collector_queue.put_nowait(WqTaskToParsl(id=task.id, result_received=False, - result=None, + result_file=None, reason="task could not be submited to work queue", status=-1)) continue @@ -957,24 +972,20 @@ def _work_queue_submit_wait(*, logger.debug("Completed Work Queue task {}, executor task {}".format(t.id, t.tag)) result_file = result_file_of_task_id.pop(t.tag) - # A tasks completes 'succesfully' if it has result file, - # and it can be loaded. This may mean that the 'success' is - # an exception. + # A tasks completes 'succesfully' if it has result file. + # The check whether this file can load a serialized Python object + # happens later in the collector thread of the executor process. logger.debug("Looking for result in {}".format(result_file)) - try: - with open(result_file, "rb") as f_in: - result = pickle.load(f_in) + if os.path.exists(result_file): logger.debug("Found result in {}".format(result_file)) collector_queue.put_nowait(WqTaskToParsl(id=executor_task_id, result_received=True, - result=result, + result_file=result_file, reason=None, status=t.return_status)) # If a result file could not be generated, explain the - # failure according to work queue error codes. We generate - # an exception and wrap it with RemoteExceptionWrapper, to - # match the positive case. - except Exception as e: + # failure according to work queue error codes. + else: reason = _explain_work_queue_result(t) logger.debug("Did not find result in {}".format(result_file)) logger.debug("Wrapper Script status: {}\nWorkQueue Status: {}" @@ -983,7 +994,7 @@ def _work_queue_submit_wait(*, .format(executor_task_id, t.id, reason)) collector_queue.put_nowait(WqTaskToParsl(id=executor_task_id, result_received=False, - result=e, + result_file=None, reason=reason, status=t.return_status)) logger.debug("Exiting WorkQueue Monitoring Process") From eeb8952bc31551c4af0b44e7e519ce90b9995d72 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Thu, 26 Oct 2023 15:57:51 -0400 Subject: [PATCH 2/2] vine chain excp (#2926) --- parsl/executors/taskvine/executor.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index ac9681f9b7..788616fd8a 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -643,14 +643,22 @@ def _collect_taskvine_results(self): with open(task_report.result_file, 'rb') as f_in: result = deserialize(f_in.read()) except Exception as e: - logger.debug(f'Cannot load result from result file {task_report.result_file}. Exception: {e}') - future.set_exception(TaskVineTaskFailure('Cannot load result from result file', None)) + logger.error(f'Cannot load result from result file {task_report.result_file}. Exception: {e}') + ex = TaskVineTaskFailure('Cannot load result from result file', None) + ex.__cause__ = e + future.set_exception(ex) else: - future.set_result(result) + if isinstance(result, Exception): + ex = TaskVineTaskFailure('Task execution raises an exception', result) + ex.__cause__ = result + future.set_exception(ex) + else: + future.set_result(result) else: # If there are no results, then the task failed according to one of # taskvine modes, such as resource exhaustion. - future.set_exception(TaskVineTaskFailure(task_report.reason, None)) + ex = TaskVineTaskFailure(task_report.reason, None) + future.set_exception(ex) # decrement outstanding task counter with self._outstanding_tasks_lock: