Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move execute_task() to a dedicated module #3701

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions parsl/executors/execute_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os

from parsl.serialize import unpack_res_spec_apply_message


def execute_task(bufs: bytes):
"""Deserialize the buffer and execute the task.
Returns the result or throws exception.
"""
f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs, copy=False)

for varname in resource_spec:
envname = "PARSL_" + str(varname).upper()
os.environ[envname] = str(resource_spec[varname])

# We might need to look into callability of the function from itself
# since we change it's name in the new namespace
prefix = "parsl_"
fname = prefix + "f"
argname = prefix + "args"
kwargname = prefix + "kwargs"
resultname = prefix + "result"

code = "{0} = {1}(*{2}, **{3})".format(resultname, fname,
argname, kwargname)

user_ns = locals()
user_ns.update({
'__builtins__': __builtins__,
fname: f,
argname: args,
kwargname: kwargs,
resultname: resultname
})

exec(code, user_ns, user_ns)
return user_ns.get(resultname)
2 changes: 1 addition & 1 deletion parsl/executors/flux/execute_parsl_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging
import os

from parsl.executors.execute_task import execute_task
from parsl.executors.flux import TaskResult
from parsl.executors.high_throughput.process_worker_pool import execute_task
from parsl.serialize import serialize


Expand Down
37 changes: 2 additions & 35 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from parsl import curvezmq
from parsl.app.errors import RemoteExceptionWrapper
from parsl.executors.execute_task import execute_task
from parsl.executors.high_throughput.errors import WorkerLost
from parsl.executors.high_throughput.mpi_prefix_composer import (
VALID_LAUNCHERS,
Expand All @@ -35,7 +36,7 @@
from parsl.executors.high_throughput.probe import probe_addresses
from parsl.multiprocessing import SpawnContext
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize, unpack_res_spec_apply_message
from parsl.serialize import serialize
from parsl.version import VERSION as PARSL_VERSION

HEARTBEAT_CODE = (2 ** 32) - 1
Expand Down Expand Up @@ -599,40 +600,6 @@ def _init_mpi_env(mpi_launcher: str, resource_spec: Dict):
update_resource_spec_env_vars(mpi_launcher=mpi_launcher, resource_spec=resource_spec, node_info=nodes_for_task)


def execute_task(bufs: bytes):
"""Deserialize the buffer and execute the task.
Returns the result or throws exception.
"""
f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs, copy=False)

for varname in resource_spec:
envname = "PARSL_" + str(varname).upper()
os.environ[envname] = str(resource_spec[varname])

# We might need to look into callability of the function from itself
# since we change it's name in the new namespace
prefix = "parsl_"
fname = prefix + "f"
argname = prefix + "args"
kwargname = prefix + "kwargs"
resultname = prefix + "result"

code = "{0} = {1}(*{2}, **{3})".format(resultname, fname,
argname, kwargname)

user_ns = locals()
user_ns.update({
'__builtins__': __builtins__,
fname: f,
argname: args,
kwargname: kwargs,
resultname: resultname
})

exec(code, user_ns, user_ns)
return user_ns.get(resultname)


@wrap_with_logs(target="worker_log")
def worker(
worker_id: int,
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/radical/rpex_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import parsl.app.errors as pe
from parsl.app.bash import remote_side_bash_executor
from parsl.executors.high_throughput.process_worker_pool import execute_task
from parsl.executors.execute_task import execute_task
from parsl.serialize import serialize, unpack_res_spec_apply_message


Expand Down
29 changes: 29 additions & 0 deletions parsl/tests/test_execute_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

import pytest

from parsl.executors.execute_task import execute_task
from parsl.serialize.facade import pack_res_spec_apply_message


def addemup(*args: int, name: str = "apples"):
total = sum(args)
khk-globus marked this conversation as resolved.
Show resolved Hide resolved
return f"{total} {name}"


@pytest.mark.local
def test_execute_task():
args = (1, 2, 3)
kwargs = {"name": "boots"}
buff = pack_res_spec_apply_message(addemup, args, kwargs, {})
res = execute_task(buff)
assert res == addemup(*args, **kwargs)


@pytest.mark.local
def test_execute_task_resource_spec():
resource_spec = {"num_nodes": 2, "ranks_per_node": 2, "num_ranks": 4}
buff = pack_res_spec_apply_message(addemup, (1, 2), {}, resource_spec)
execute_task(buff)
for key, val in resource_spec.items():
assert os.environ[f"PARSL_{key.upper()}"] == str(val)
Loading