From ae86461afc7acb7243a4386069343eecc2f42c09 Mon Sep 17 00:00:00 2001 From: Reid Mello <30907815+rjmello@users.noreply.github.com> Date: Mon, 18 Nov 2024 12:12:03 -0500 Subject: [PATCH] Move `execute_task()` to a dedicated module Multiple executors use the `execute_task()` function, so moving it to its own module improves code organization and reusability. --- parsl/executors/execute_task.py | 37 +++++++++++++++++++ parsl/executors/flux/execute_parsl_task.py | 2 +- .../high_throughput/process_worker_pool.py | 37 +------------------ parsl/executors/radical/rpex_worker.py | 2 +- parsl/tests/test_execute_task.py | 29 +++++++++++++++ 5 files changed, 70 insertions(+), 37 deletions(-) create mode 100644 parsl/executors/execute_task.py create mode 100644 parsl/tests/test_execute_task.py diff --git a/parsl/executors/execute_task.py b/parsl/executors/execute_task.py new file mode 100644 index 0000000000..5bcd79dda6 --- /dev/null +++ b/parsl/executors/execute_task.py @@ -0,0 +1,37 @@ +import os + +from parsl.serialize import unpack_res_spec_apply_message + + +def execute_task(bufs: bytes): + """Deserialize the buffer and execute the task. + Returns the result or throws exception. + """ + f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs, copy=False) + + for varname in resource_spec: + envname = "PARSL_" + str(varname).upper() + os.environ[envname] = str(resource_spec[varname]) + + # We might need to look into callability of the function from itself + # since we change it's name in the new namespace + prefix = "parsl_" + fname = prefix + "f" + argname = prefix + "args" + kwargname = prefix + "kwargs" + resultname = prefix + "result" + + code = "{0} = {1}(*{2}, **{3})".format(resultname, fname, + argname, kwargname) + + user_ns = locals() + user_ns.update({ + '__builtins__': __builtins__, + fname: f, + argname: args, + kwargname: kwargs, + resultname: resultname + }) + + exec(code, user_ns, user_ns) + return user_ns.get(resultname) diff --git a/parsl/executors/flux/execute_parsl_task.py b/parsl/executors/flux/execute_parsl_task.py index ddf3c67e14..4372578e26 100644 --- a/parsl/executors/flux/execute_parsl_task.py +++ b/parsl/executors/flux/execute_parsl_task.py @@ -4,8 +4,8 @@ import logging import os +from parsl.executors.execute_task import execute_task from parsl.executors.flux import TaskResult -from parsl.executors.high_throughput.process_worker_pool import execute_task from parsl.serialize import serialize diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 957d670188..a8bbaa9be8 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -23,6 +23,7 @@ from parsl import curvezmq from parsl.app.errors import RemoteExceptionWrapper +from parsl.executors.execute_task import execute_task from parsl.executors.high_throughput.errors import WorkerLost from parsl.executors.high_throughput.mpi_prefix_composer import ( VALID_LAUNCHERS, @@ -35,7 +36,7 @@ from parsl.executors.high_throughput.probe import probe_addresses from parsl.multiprocessing import SpawnContext from parsl.process_loggers import wrap_with_logs -from parsl.serialize import serialize, unpack_res_spec_apply_message +from parsl.serialize import serialize from parsl.version import VERSION as PARSL_VERSION HEARTBEAT_CODE = (2 ** 32) - 1 @@ -599,40 +600,6 @@ def _init_mpi_env(mpi_launcher: str, resource_spec: Dict): update_resource_spec_env_vars(mpi_launcher=mpi_launcher, resource_spec=resource_spec, node_info=nodes_for_task) -def execute_task(bufs: bytes): - """Deserialize the buffer and execute the task. - Returns the result or throws exception. - """ - f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs, copy=False) - - for varname in resource_spec: - envname = "PARSL_" + str(varname).upper() - os.environ[envname] = str(resource_spec[varname]) - - # We might need to look into callability of the function from itself - # since we change it's name in the new namespace - prefix = "parsl_" - fname = prefix + "f" - argname = prefix + "args" - kwargname = prefix + "kwargs" - resultname = prefix + "result" - - code = "{0} = {1}(*{2}, **{3})".format(resultname, fname, - argname, kwargname) - - user_ns = locals() - user_ns.update({ - '__builtins__': __builtins__, - fname: f, - argname: args, - kwargname: kwargs, - resultname: resultname - }) - - exec(code, user_ns, user_ns) - return user_ns.get(resultname) - - @wrap_with_logs(target="worker_log") def worker( worker_id: int, diff --git a/parsl/executors/radical/rpex_worker.py b/parsl/executors/radical/rpex_worker.py index db1b7d2bea..09482d8d01 100644 --- a/parsl/executors/radical/rpex_worker.py +++ b/parsl/executors/radical/rpex_worker.py @@ -4,7 +4,7 @@ import parsl.app.errors as pe from parsl.app.bash import remote_side_bash_executor -from parsl.executors.high_throughput.process_worker_pool import execute_task +from parsl.executors.execute_task import execute_task from parsl.serialize import serialize, unpack_res_spec_apply_message diff --git a/parsl/tests/test_execute_task.py b/parsl/tests/test_execute_task.py new file mode 100644 index 0000000000..42fb59c5c1 --- /dev/null +++ b/parsl/tests/test_execute_task.py @@ -0,0 +1,29 @@ +import os + +import pytest + +from parsl.executors.execute_task import execute_task +from parsl.serialize.facade import pack_res_spec_apply_message + + +def addemup(*args: int, name: str = "apples"): + total = sum(args) + return f"{total} {name}" + + +@pytest.mark.local +def test_execute_task(): + args = (1, 2, 3) + kwargs = {"name": "boots"} + buff = pack_res_spec_apply_message(addemup, args, kwargs, {}) + res = execute_task(buff) + assert res == addemup(*args, **kwargs) + + +@pytest.mark.local +def test_execute_task_resource_spec(): + resource_spec = {"num_nodes": 2, "ranks_per_node": 2, "num_ranks": 4} + buff = pack_res_spec_apply_message(addemup, (1, 2), {}, resource_spec) + execute_task(buff) + for key, val in resource_spec.items(): + assert os.environ[f"PARSL_{key.upper()}"] == str(val)