Skip to content

Commit

Permalink
Merge branch 'master' into benc-3515-remove-push-pull
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Nov 21, 2024
2 parents 7cec245 + 07dfb42 commit 2bbfd49
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 87 deletions.
2 changes: 0 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ def __init__(self, config: Config) -> None:
self.monitoring = config.monitoring

if self.monitoring:
if self.monitoring.logdir is None:
self.monitoring.logdir = self.run_dir
self.monitoring.start(self.run_dir, self.config.run_dir)

self.time_began = datetime.datetime.now()
Expand Down
37 changes: 37 additions & 0 deletions parsl/executors/execute_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os

from parsl.serialize import unpack_res_spec_apply_message


def execute_task(bufs: bytes):
"""Deserialize the buffer and execute the task.
Returns the result or throws exception.
"""
f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs)

for varname in resource_spec:
envname = "PARSL_" + str(varname).upper()
os.environ[envname] = str(resource_spec[varname])

# We might need to look into callability of the function from itself
# since we change it's name in the new namespace
prefix = "parsl_"
fname = prefix + "f"
argname = prefix + "args"
kwargname = prefix + "kwargs"
resultname = prefix + "result"

code = "{0} = {1}(*{2}, **{3})".format(resultname, fname,
argname, kwargname)

user_ns = locals()
user_ns.update({
'__builtins__': __builtins__,
fname: f,
argname: args,
kwargname: kwargs,
resultname: resultname
})

exec(code, user_ns, user_ns)
return user_ns.get(resultname)
2 changes: 1 addition & 1 deletion parsl/executors/flux/execute_parsl_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging
import os

from parsl.executors.execute_task import execute_task
from parsl.executors.flux import TaskResult
from parsl.executors.high_throughput.process_worker_pool import execute_task
from parsl.serialize import serialize


Expand Down
5 changes: 2 additions & 3 deletions parsl/executors/high_throughput/mpi_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,7 @@ def put_task(self, task_package: dict):
"""Schedule task if resources are available otherwise backlog the task"""
user_ns = locals()
user_ns.update({"__builtins__": __builtins__})
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(
task_package["buffer"], user_ns, copy=False
)
_f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message(task_package["buffer"])

nodes_needed = resource_spec.get("num_nodes")
if nodes_needed:
Expand All @@ -177,6 +175,7 @@ def put_task(self, task_package: dict):
self._map_tasks_to_nodes[task_package["task_id"]] = allocated_nodes
buffer = pack_res_spec_apply_message(_f, _args, _kwargs, resource_spec)
task_package["buffer"] = buffer
task_package["resource_spec"] = resource_spec

self.pending_task_q.put(task_package)

Expand Down
53 changes: 12 additions & 41 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from parsl import curvezmq
from parsl.app.errors import RemoteExceptionWrapper
from parsl.executors.execute_task import execute_task
from parsl.executors.high_throughput.errors import WorkerLost
from parsl.executors.high_throughput.mpi_prefix_composer import (
VALID_LAUNCHERS,
Expand All @@ -35,7 +36,7 @@
from parsl.executors.high_throughput.probe import probe_addresses
from parsl.multiprocessing import SpawnContext
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize, unpack_res_spec_apply_message
from parsl.serialize import serialize
from parsl.version import VERSION as PARSL_VERSION

HEARTBEAT_CODE = (2 ** 32) - 1
Expand Down Expand Up @@ -590,45 +591,13 @@ def update_resource_spec_env_vars(mpi_launcher: str, resource_spec: Dict, node_i
os.environ[key] = prefix_table[key]


def execute_task(bufs, mpi_launcher: Optional[str] = None):
"""Deserialize the buffer and execute the task.
Returns the result or throws exception.
"""
user_ns = locals()
user_ns.update({'__builtins__': __builtins__})

f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs, user_ns, copy=False)

for varname in resource_spec:
envname = "PARSL_" + str(varname).upper()
os.environ[envname] = str(resource_spec[varname])

if resource_spec.get("MPI_NODELIST"):
worker_id = os.environ['PARSL_WORKER_RANK']
nodes_for_task = resource_spec["MPI_NODELIST"].split(',')
logger.info(f"Launching task on provisioned nodes: {nodes_for_task}")
assert mpi_launcher
update_resource_spec_env_vars(mpi_launcher,
resource_spec=resource_spec,
node_info=nodes_for_task)
# We might need to look into callability of the function from itself
# since we change it's name in the new namespace
prefix = "parsl_"
fname = prefix + "f"
argname = prefix + "args"
kwargname = prefix + "kwargs"
resultname = prefix + "result"

user_ns.update({fname: f,
argname: args,
kwargname: kwargs,
resultname: resultname})

code = "{0} = {1}(*{2}, **{3})".format(resultname, fname,
argname, kwargname)
exec(code, user_ns, user_ns)
return user_ns.get(resultname)
def _init_mpi_env(mpi_launcher: str, resource_spec: Dict):
node_list = resource_spec.get("MPI_NODELIST")
if node_list is None:
return
nodes_for_task = node_list.split(',')
logger.info(f"Launching task on provisioned nodes: {nodes_for_task}")
update_resource_spec_env_vars(mpi_launcher=mpi_launcher, resource_spec=resource_spec, node_info=nodes_for_task)


@wrap_with_logs(target="worker_log")
Expand Down Expand Up @@ -786,8 +755,10 @@ def manager_is_alive():
ready_worker_count.value -= 1
worker_enqueued = False

_init_mpi_env(mpi_launcher=mpi_launcher, resource_spec=req["resource_spec"])

try:
result = execute_task(req['buffer'], mpi_launcher=mpi_launcher)
result = execute_task(req['buffer'])
serialized_result = serialize(result, buffer_threshold=1000000)
except Exception as e:
logger.info('Caught an exception: {}'.format(e))
Expand Down
4 changes: 2 additions & 2 deletions parsl/executors/radical/rpex_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import parsl.app.errors as pe
from parsl.app.bash import remote_side_bash_executor
from parsl.executors.high_throughput.process_worker_pool import execute_task
from parsl.executors.execute_task import execute_task
from parsl.serialize import serialize, unpack_res_spec_apply_message


Expand Down Expand Up @@ -33,7 +33,7 @@ def _dispatch_proc(self, task):

try:
buffer = rp.utils.deserialize_bson(task['description']['executable'])
func, args, kwargs, _resource_spec = unpack_res_spec_apply_message(buffer, {}, copy=False)
func, args, kwargs, _resource_spec = unpack_res_spec_apply_message(buffer)
ret = remote_side_bash_executor(func, *args, **kwargs)
exc = (None, None)
val = None
Expand Down
2 changes: 1 addition & 1 deletion parsl/executors/workqueue/exec_parsl_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def unpack_source_code_function(function_info, user_namespace):

def unpack_byte_code_function(function_info, user_namespace):
from parsl.serialize import unpack_apply_message
func, args, kwargs = unpack_apply_message(function_info["byte code"], user_namespace, copy=False)
func, args, kwargs = unpack_apply_message(function_info["byte code"])
return (func, 'parsl_function_name', args, kwargs)


Expand Down
12 changes: 6 additions & 6 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,20 +279,20 @@ class Resource(Base):
class DatabaseManager:
def __init__(self,
db_url: str = 'sqlite:///runinfo/monitoring.db',
logdir: str = '.',
run_dir: str = '.',
logging_level: int = logging.INFO,
batching_interval: float = 1,
batching_threshold: float = 99999,
):

self.workflow_end = False
self.workflow_start_message: Optional[MonitoringMessage] = None
self.logdir = logdir
os.makedirs(self.logdir, exist_ok=True)
self.run_dir = run_dir
os.makedirs(self.run_dir, exist_ok=True)

logger.propagate = False

set_file_logger("{}/database_manager.log".format(self.logdir), level=logging_level,
set_file_logger(f"{self.run_dir}/database_manager.log", level=logging_level,
format_string="%(asctime)s.%(msecs)03d %(name)s:%(lineno)d [%(levelname)s] [%(threadName)s %(thread)d] %(message)s",
name="database_manager")

Expand Down Expand Up @@ -681,7 +681,7 @@ def close(self) -> None:
def dbm_starter(exception_q: mpq.Queue,
resource_msgs: mpq.Queue,
db_url: str,
logdir: str,
run_dir: str,
logging_level: int) -> None:
"""Start the database manager process
Expand All @@ -692,7 +692,7 @@ def dbm_starter(exception_q: mpq.Queue,

try:
dbm = DatabaseManager(db_url=db_url,
logdir=logdir,
run_dir=run_dir,
logging_level=logging_level)
logger.info("Starting dbm in dbm starter")
dbm.start(resource_msgs)
Expand Down
23 changes: 9 additions & 14 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def __init__(self,
workflow_name: Optional[str] = None,
workflow_version: Optional[str] = None,
logging_endpoint: Optional[str] = None,
logdir: Optional[str] = None,
monitoring_debug: bool = False,
resource_monitoring_enabled: bool = True,
resource_monitoring_interval: float = 30): # in seconds
Expand Down Expand Up @@ -73,8 +72,6 @@ def __init__(self,
The database connection url for monitoring to log the information.
These URLs follow RFC-1738, and can include username, password, hostname, database name.
Default: sqlite, in the configured run_dir.
logdir : str
Parsl log directory paths. Logs and temp files go here. Default: '.'
monitoring_debug : Bool
Enable monitoring debug logging. Default: False
resource_monitoring_enabled : boolean
Expand All @@ -96,7 +93,6 @@ def __init__(self,
self.hub_port_range = hub_port_range

self.logging_endpoint = logging_endpoint
self.logdir = logdir
self.monitoring_debug = monitoring_debug

self.workflow_name = workflow_name
Expand All @@ -109,13 +105,10 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No

logger.debug("Starting MonitoringHub")

if self.logdir is None:
self.logdir = "."

if self.logging_endpoint is None:
self.logging_endpoint = f"sqlite:///{os.fspath(config_run_dir)}/monitoring.db"

os.makedirs(self.logdir, exist_ok=True)
os.makedirs(dfk_run_dir, exist_ok=True)

self.monitoring_hub_active = True

Expand Down Expand Up @@ -151,7 +144,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
"hub_address": self.hub_address,
"udp_port": self.hub_port,
"zmq_port_range": self.hub_port_range,
"logdir": self.logdir,
"run_dir": dfk_run_dir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
},
name="Monitoring-Router-Process",
Expand All @@ -161,7 +154,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No

self.dbm_proc = ForkProcess(target=dbm_starter,
args=(self.exception_q, self.resource_msgs,),
kwargs={"logdir": self.logdir,
kwargs={"run_dir": dfk_run_dir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"db_url": self.logging_endpoint,
},
Expand All @@ -172,7 +165,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
logger.info("Started the router process %s and DBM process %s", self.router_proc.pid, self.dbm_proc.pid)

self.filesystem_proc = ForkProcess(target=filesystem_receiver,
args=(self.logdir, self.resource_msgs, dfk_run_dir),
args=(self.resource_msgs, dfk_run_dir),
name="Monitoring-Filesystem-Process",
daemon=True
)
Expand Down Expand Up @@ -258,8 +251,8 @@ def close(self) -> None:


@wrap_with_logs
def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir: str) -> None:
logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
def filesystem_receiver(q: Queue[TaggedMonitoringMessage], run_dir: str) -> None:
logger = set_file_logger(f"{run_dir}/monitoring_filesystem_radio.log",
name="monitoring_filesystem_radio",
level=logging.INFO)

Expand All @@ -270,6 +263,8 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir:
new_dir = f"{base_path}/new/"
logger.debug("Creating new and tmp paths under %s", base_path)

target_radio = MultiprocessingQueueRadioSender(q)

os.makedirs(tmp_dir, exist_ok=True)
os.makedirs(new_dir, exist_ok=True)

Expand All @@ -285,7 +280,7 @@ def filesystem_receiver(logdir: str, q: Queue[TaggedMonitoringMessage], run_dir:
message = pickle.load(f)
logger.debug("Message received is: %s", message)
assert isinstance(message, tuple)
q.put(cast(TaggedMonitoringMessage, message))
target_radio.send(cast(TaggedMonitoringMessage, message))
os.remove(full_path_filename)
except Exception:
logger.exception("Exception processing %s - probably will be retried next iteration", filename)
Expand Down
Loading

0 comments on commit 2bbfd49

Please sign in to comment.