diff --git a/parsl/executors/execute_task.py b/parsl/executors/execute_task.py index 5bcd79dda6..41c2f3cc9b 100644 --- a/parsl/executors/execute_task.py +++ b/parsl/executors/execute_task.py @@ -7,7 +7,7 @@ def execute_task(bufs: bytes): """Deserialize the buffer and execute the task. Returns the result or throws exception. """ - f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs, copy=False) + f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs) for varname in resource_spec: envname = "PARSL_" + str(varname).upper() diff --git a/parsl/executors/high_throughput/mpi_resource_management.py b/parsl/executors/high_throughput/mpi_resource_management.py index 09745421d1..ac02e0c419 100644 --- a/parsl/executors/high_throughput/mpi_resource_management.py +++ b/parsl/executors/high_throughput/mpi_resource_management.py @@ -160,9 +160,7 @@ def put_task(self, task_package: dict): """Schedule task if resources are available otherwise backlog the task""" user_ns = locals() user_ns.update({"__builtins__": __builtins__}) - _f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message( - task_package["buffer"], user_ns, copy=False - ) + _f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"]) nodes_needed = resource_spec.get("num_nodes") if nodes_needed: diff --git a/parsl/executors/radical/rpex_worker.py b/parsl/executors/radical/rpex_worker.py index 09482d8d01..90243b558a 100644 --- a/parsl/executors/radical/rpex_worker.py +++ b/parsl/executors/radical/rpex_worker.py @@ -33,7 +33,7 @@ def _dispatch_proc(self, task): try: buffer = rp.utils.deserialize_bson(task['description']['executable']) - func, args, kwargs, _resource_spec = unpack_res_spec_apply_message(buffer, {}, copy=False) + func, args, kwargs, _resource_spec = unpack_res_spec_apply_message(buffer) ret = remote_side_bash_executor(func, *args, **kwargs) exc = (None, None) val = None diff --git a/parsl/executors/workqueue/exec_parsl_function.py b/parsl/executors/workqueue/exec_parsl_function.py index 06a86a5c8d..d19d92efe6 100644 --- a/parsl/executors/workqueue/exec_parsl_function.py +++ b/parsl/executors/workqueue/exec_parsl_function.py @@ -94,7 +94,7 @@ def unpack_source_code_function(function_info, user_namespace): def unpack_byte_code_function(function_info, user_namespace): from parsl.serialize import unpack_apply_message - func, args, kwargs = unpack_apply_message(function_info["byte code"], user_namespace, copy=False) + func, args, kwargs = unpack_apply_message(function_info["byte code"]) return (func, 'parsl_function_name', args, kwargs) diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index f8e76f174b..2e02e2b983 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -87,16 +87,16 @@ def pack_res_spec_apply_message(func: Any, args: Any, kwargs: Any, resource_spec return pack_apply_message(func, args, (kwargs, resource_specification), buffer_threshold=buffer_threshold) -def unpack_apply_message(packed_buffer: bytes, user_ns: Any = None, copy: Any = False) -> List[Any]: +def unpack_apply_message(packed_buffer: bytes) -> List[Any]: """ Unpack and deserialize function and parameters """ return [deserialize(buf) for buf in unpack_buffers(packed_buffer)] -def unpack_res_spec_apply_message(packed_buffer: bytes, user_ns: Any = None, copy: Any = False) -> List[Any]: +def unpack_res_spec_apply_message(packed_buffer: bytes) -> List[Any]: """ Unpack and deserialize function, parameters, and resource_specification """ - func, args, (kwargs, resource_spec) = unpack_apply_message(packed_buffer, user_ns=user_ns, copy=copy) + func, args, (kwargs, resource_spec) = unpack_apply_message(packed_buffer) return [func, args, kwargs, resource_spec]