From 71600853dc4ff97afd1e7f1f6a527c9a5e4bca04 Mon Sep 17 00:00:00 2001 From: Thanh Son Phung Date: Sat, 14 Dec 2024 15:58:46 -0500 Subject: [PATCH] context feature added --- parsl/executors/taskvine/executor.py | 15 +++++++++++++++ parsl/executors/taskvine/manager.py | 9 ++++++++- parsl/executors/taskvine/utils.py | 7 +++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index aca78f8ec9..9a996cc762 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -422,6 +422,7 @@ def submit(self, func, resource_specification, *args, **kwargs): argument_file = None result_file = None map_file = None + function_context_file = None # Get path to files that will contain the pickled function, # arguments, result, and map of input and output files @@ -438,6 +439,19 @@ def submit(self, func, resource_specification, *args, **kwargs): result_file = self._path_in_task(executor_task_id, "result") map_file = self._path_in_task(executor_task_id, "map") + if exec_mode == 'serverless': + if 'function_context' in resource_specification: + if 'function_context_file' not in self._map_func_names_to_serialized_func_file[func.__name__]: + function_context = resource_specification.get('function_context') + function_context_args = resource_specification.get('function_context_args', []) + function_context_kwargs = resource_specification.get('function_context_kwargs', {}) + function_context_file = os.path.join(self._function_data_dir.name, func.__name__, 'function_context') + self._serialize_object_to_file(function_context_file, [function_context, function_context_args, function_context_kwargs]) + self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file'] = function_context_file + else: + function_context_file = self._map_func_names_to_serialized_func_file[func.__name__]['function_context_file'] + + logger.debug("Creating executor task {} with function at: {}, argument at: {}, \ and result to be found at: {}".format(executor_task_id, function_file, argument_file, result_file)) @@ -475,6 +489,7 @@ def submit(self, func, resource_specification, *args, **kwargs): function_file=function_file, argument_file=argument_file, result_file=result_file, + function_context_file=function_context_file, cores=cores, memory=memory, disk=disk, diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index 9a065a6768..63075a282f 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -270,6 +270,12 @@ def _taskvine_submit_wait(ready_task_queue=None, # This cost is paid only once per function/app. func = _deserialize_object_from_file(task.function_file) + # Deserialize the function context to add it to the library if available + # This cost is paid only once per function/app. + function_context_list = None + if task.function_context_file: + function_context_list = _deserialize_object_from_file(task.function_context_file) + # Don't automatically add environment so manager can declare and cache the vine file associated with the environment file add_env = False lib_name = f'{task.func_name}-lib' @@ -278,7 +284,8 @@ def _taskvine_submit_wait(ready_task_queue=None, poncho_env=poncho_env_path, init_command=manager_config.init_command, exec_mode='direct', - add_env=add_env) + add_env=add_env, + library_context_info=function_context_list) # Configure the library if provided if manager_config.library_config: diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index b30ff1439c..8ac61fe0f9 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -15,6 +15,7 @@ def __init__(self, function_file: Optional[str], # pickled file containing the function information argument_file: Optional[str], # pickled file containing the arguments to the function call result_file: Optional[str], # path to the pickled result object of the function execution + function_context_file: Optional[list], # path to the pickled list of function context details for serverless functions cores: Optional[float], # number of cores to allocate memory: Optional[int], # amount of memory in MBs to allocate disk: Optional[int], # amount of disk in MBs to allocate @@ -33,6 +34,7 @@ def __init__(self, self.result_file = result_file self.input_files = input_files self.output_files = output_files + self.function_context_file = function_context_file self.cores = cores self.memory = memory self.disk = disk @@ -85,3 +87,8 @@ def run_parsl_function(map_file, function_file, argument_file, result_file): """ from parsl.executors.taskvine.exec_parsl_function import run run(map_file, function_file, argument_file, result_file) + + +def load_variable_in_serverless(var_name): + from ndcctools.taskvine.utils import load_variable_from_library + return load_variable_from_library(var_name)