Skip to content

Commit

Permalink
Merge branch 'master' into benc-ghent-scalein-names
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Mar 5, 2024
2 parents 516bae2 + ab2a338 commit 03fd8f3
Show file tree
Hide file tree
Showing 53 changed files with 140 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E126, E402, E129, W504
max-line-length = 158
max-line-length = 152
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
# E741 disallows ambiguous single letter names which look like numbers
Expand Down
2 changes: 1 addition & 1 deletion docs/teaching_scripts/test_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from parsl import python_app, HighThroughputExecutor, Config
import parsl

parsl.load(Config(executors=[HighThroughputExecutor(label='htex_spawn', max_workers=1, address='127.0.0.1')]))
parsl.load(Config(executors=[HighThroughputExecutor(label='htex_spawn', max_workers_per_node=1, address='127.0.0.1')]))


# Part 1: Explain imports
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ used by the infiniband interface with ``address_by_interface('ib0')``
HighThroughputExecutor(
label="frontera_htex",
address=address_by_interface('ib0'),
max_workers=56,
max_workers_per_node=56,
provider=SlurmProvider(
channel=LocalChannel(),
nodes_per_block=128,
Expand Down
2 changes: 1 addition & 1 deletion docs/userguide/monitoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ configuration. Here the `parsl.monitoring.MonitoringHub` is specified to use por
HighThroughputExecutor(
label="local_htex",
cores_per_worker=1,
max_workers=4,
max_workers_per_node=4,
address=address_by_hostname(),
)
],
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/ASPIRE1.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
heartbeat_period=15,
heartbeat_threshold=120,
worker_debug=True,
max_workers=4,
max_workers_per_node=4,
address=address_by_interface('ib0'),
provider=PBSProProvider(
launcher=MpiRunLauncher(),
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/ad_hoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
executors=[
HighThroughputExecutor(
label='remote_htex',
max_workers=2,
max_workers_per_node=2,
worker_logdir_root=user_opts['adhoc']['script_dir'],
provider=AdHocProvider(
# Command to be run before starting a worker, such as:
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/bridges.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
HighThroughputExecutor(
label='Bridges_HTEX_multinode',
address=address_by_interface('ens3f0'),
max_workers=1,
max_workers_per_node=1,
provider=SlurmProvider(
'YOUR_PARTITION_NAME', # Specify Partition / QOS, for eg. RM-small
nodes_per_block=2,
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/cc_in2p3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
executors=[
HighThroughputExecutor(
label='cc_in2p3_htex',
max_workers=2,
max_workers_per_node=2,
provider=GridEngineProvider(
channel=LocalChannel(),
nodes_per_block=1,
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/expanse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
executors=[
HighThroughputExecutor(
label='Expanse_CPU_Multinode',
max_workers=32,
max_workers_per_node=32,
provider=SlurmProvider(
'compute',
account='YOUR_ALLOCATION_ON_EXPANSE',
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/frontera.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
executors=[
HighThroughputExecutor(
label="frontera_htex",
max_workers=1, # Set number of workers per node
max_workers_per_node=1, # Set number of workers per node
provider=SlurmProvider(
cmd_timeout=60, # Add extra time for slow scheduler responses
channel=LocalChannel(),
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
HighThroughputExecutor(
label='kube-htex',
cores_per_worker=1,
max_workers=1,
max_workers_per_node=1,
worker_logdir_root='YOUR_WORK_DIR',

# Address for the pod worker to connect back
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/midway.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
label='Midway_HTEX_multinode',
address=address_by_interface('bond0'),
worker_debug=False,
max_workers=2,
max_workers_per_node=2,
provider=SlurmProvider(
'YOUR_PARTITION', # Partition name, e.g 'broadwl'
launcher=SrunLauncher(),
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/osg.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
executors=[
HighThroughputExecutor(
label='OSG_HTEX',
max_workers=1,
max_workers_per_node=1,
provider=CondorProvider(
nodes_per_block=1,
init_blocks=4,
Expand Down
2 changes: 1 addition & 1 deletion parsl/configs/stampede2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
HighThroughputExecutor(
label='Stampede2_HTEX',
address=address_by_interface('em3'),
max_workers=2,
max_workers_per_node=2,
provider=SlurmProvider(
nodes_per_block=2,
init_blocks=1,
Expand Down
3 changes: 2 additions & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,8 @@ def launch_task(self, task_record: TaskRecord) -> Future:

return exec_fu

def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], func: Callable) -> Tuple[Sequence[Any], Dict[str, Any], Callable]:
def _add_input_deps(self, executor: str, args: Sequence[Any], kwargs: Dict[str, Any], func: Callable) -> Tuple[Sequence[Any], Dict[str, Any],
Callable]:
"""Look for inputs of the app that are files. Give the data manager
the opportunity to replace a file with a data future for that file,
for example wrapping the result of a staging action.
Expand Down
44 changes: 35 additions & 9 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Dict, Sequence
from typing import List, Optional, Tuple, Union, Callable
import math
import warnings

import parsl.launchers
from parsl.serialize import pack_res_spec_apply_message, deserialize
Expand Down Expand Up @@ -40,7 +41,7 @@

logger = logging.getLogger(__name__)

DEFAULT_LAUNCH_CMD = ("process_worker_pool.py {debug} {max_workers} "
DEFAULT_LAUNCH_CMD = ("process_worker_pool.py {debug} {max_workers_per_node} "
"-a {addresses} "
"-p {prefetch_capacity} "
"-c {cores_per_worker} "
Expand Down Expand Up @@ -155,7 +156,10 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin):
the there's sufficient memory for each worker. Default: None
max_workers : int
Caps the number of workers launched per node. Default: infinity
Deprecated. Please use max_workers_per_node instead.
max_workers_per_node : int
Caps the number of workers launched per node. Default: None
cpu_affinity: string
Whether or how each worker process sets thread affinity. Options include "none" to forgo
Expand Down Expand Up @@ -229,7 +233,8 @@ def __init__(self,
worker_debug: bool = False,
cores_per_worker: float = 1.0,
mem_per_worker: Optional[float] = None,
max_workers: Union[int, float] = float('inf'),
max_workers: Optional[Union[int, float]] = None,
max_workers_per_node: Optional[Union[int, float]] = None,
cpu_affinity: str = 'none',
available_accelerators: Union[int, Sequence[str]] = (),
prefetch_capacity: int = 0,
Expand All @@ -252,7 +257,6 @@ def __init__(self,
self.working_dir = working_dir
self.cores_per_worker = cores_per_worker
self.mem_per_worker = mem_per_worker
self.max_workers = max_workers
self.prefetch_capacity = prefetch_capacity
self.address = address
self.address_probe_timeout = address_probe_timeout
Expand All @@ -261,8 +265,12 @@ def __init__(self,
else:
self.all_addresses = ','.join(get_all_addresses())

mem_slots = max_workers
cpu_slots = max_workers
if max_workers:
self._warn_deprecated("max_workers", "max_workers_per_node")
self.max_workers_per_node = max_workers_per_node or max_workers or float("inf")

mem_slots = self.max_workers_per_node
cpu_slots = self.max_workers_per_node
if hasattr(self.provider, 'mem_per_node') and \
self.provider.mem_per_node is not None and \
mem_per_worker is not None and \
Expand All @@ -279,7 +287,7 @@ def __init__(self,
self.available_accelerators = list(available_accelerators)

# Determine the number of workers per node
self._workers_per_node = min(max_workers, mem_slots, cpu_slots)
self._workers_per_node = min(self.max_workers_per_node, mem_slots, cpu_slots)
if len(self.available_accelerators) > 0:
self._workers_per_node = min(self._workers_per_node, len(available_accelerators))
if self._workers_per_node == float('inf'):
Expand Down Expand Up @@ -317,6 +325,24 @@ def __init__(self,

radio_mode = "htex"

def _warn_deprecated(self, old: str, new: str):
warnings.warn(
f"{old} is deprecated and will be removed in a future release. "
"Please use {new} instead.",
DeprecationWarning,
stacklevel=2
)

@property
def max_workers(self):
self._warn_deprecated("max_workers", "max_workers_per_node")
return self.max_workers_per_node

@max_workers.setter
def max_workers(self, val: Union[int, float]):
self._warn_deprecated("max_workers", "max_workers_per_node")
self.max_workers_per_node = val

@property
def logdir(self):
return "{}/{}".format(self.run_dir, self.label)
Expand All @@ -331,7 +357,7 @@ def initialize_scaling(self):
"""Compose the launch command and scale out the initial blocks.
"""
debug_opts = "--debug" if self.worker_debug else ""
max_workers = "" if self.max_workers == float('inf') else "--max_workers={}".format(self.max_workers)
max_workers_per_node = "" if self.max_workers_per_node == float('inf') else "--max_workers_per_node={}".format(self.max_workers_per_node)
enable_mpi_opts = "--enable_mpi_mode " if self.enable_mpi_mode else ""

address_probe_timeout_string = ""
Expand All @@ -346,7 +372,7 @@ def initialize_scaling(self):
result_port=self.worker_result_port,
cores_per_worker=self.cores_per_worker,
mem_per_worker=self.mem_per_worker,
max_workers=max_workers,
max_workers_per_node=max_workers_per_node,
nodes_per_block=self.provider.nodes_per_block,
heartbeat_period=self.heartbeat_period,
heartbeat_threshold=self.heartbeat_threshold,
Expand Down
8 changes: 7 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,13 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string:
None.
"""
if format_string is None:
format_string = "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d %(processName)s(%(process)d) %(threadName)s %(funcName)s [%(levelname)s] %(message)s"
format_string = (

"%(asctime)s.%(msecs)03d %(name)s:%(lineno)d "
"%(processName)s(%(process)d) %(threadName)s "
"%(funcName)s [%(levelname)s] %(message)s"

)

global logger
logger = logging.getLogger(LOGGER_NAME)
Expand Down
28 changes: 17 additions & 11 deletions parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, *,
result_port,
cores_per_worker,
mem_per_worker,
max_workers,
max_workers_per_node,
prefetch_capacity,
uid,
block_id,
Expand Down Expand Up @@ -100,8 +100,8 @@ def __init__(self, *,
the there's sufficient memory for each worker. If set to None, memory on node is not
considered in the determination of workers to be launched on node by the manager.
max_workers : int
caps the maximum number of workers that can be launched.
max_workers_per_node : int | float
Caps the maximum number of workers that can be launched.
prefetch_capacity : int
Number of tasks that could be prefetched over available worker capacity.
Expand Down Expand Up @@ -190,15 +190,15 @@ def __init__(self, *,
else:
available_mem_on_node = round(psutil.virtual_memory().available / (2**30), 1)

self.max_workers = max_workers
self.max_workers_per_node = max_workers_per_node
self.prefetch_capacity = prefetch_capacity

mem_slots = max_workers
mem_slots = max_workers_per_node
# Avoid a divide by 0 error.
if mem_per_worker and mem_per_worker > 0:
mem_slots = math.floor(available_mem_on_node / mem_per_worker)

self.worker_count: int = min(max_workers,
self.worker_count: int = min(max_workers_per_node,
mem_slots,
math.floor(cores_on_node / cores_per_worker))

Expand Down Expand Up @@ -371,7 +371,8 @@ def push_results(self, kill_event):
logger.exception("Got an exception: {}".format(e))

if time.time() > last_result_beat + self.heartbeat_period:
logger.info(f"Sending heartbeat via results connection: last_result_beat={last_result_beat} heartbeat_period={self.heartbeat_period} seconds")
heartbeat_message = f"last_result_beat={last_result_beat} heartbeat_period={self.heartbeat_period} seconds"
logger.info(f"Sending heartbeat via results connection: {heartbeat_message}")
last_result_beat = time.time()
items.append(pickle.dumps({'type': 'heartbeat'}))

Expand Down Expand Up @@ -412,7 +413,9 @@ def worker_watchdog(self, kill_event: threading.Event):
raise WorkerLost(worker_id, platform.node())
except Exception:
logger.info("Putting exception for executor task {} in the pending result queue".format(task['task_id']))
result_package = {'type': 'result', 'task_id': task['task_id'], 'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))}
result_package = {'type': 'result',
'task_id': task['task_id'],
'exception': serialize(RemoteExceptionWrapper(*sys.exc_info()))}
pkl_package = pickle.dumps(result_package)
self.pending_result_queue.put(pkl_package)
except KeyError:
Expand Down Expand Up @@ -793,7 +796,7 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
help="GB of memory assigned to each worker process. Default=0, no assignment")
parser.add_argument("-t", "--task_port", required=True,
help="REQUIRED: Task port for receiving tasks from the interchange")
parser.add_argument("--max_workers", default=float('inf'),
parser.add_argument("--max_workers_per_node", default=float('inf'),
help="Caps the maximum workers that can be launched, default:infinity")
parser.add_argument("-p", "--prefetch_capacity", default=0,
help="Number of tasks that can be prefetched to the manager. Default is 0.")
Expand Down Expand Up @@ -847,7 +850,7 @@ def strategyorlist(s: str):
logger.info("task_port: {}".format(args.task_port))
logger.info("result_port: {}".format(args.result_port))
logger.info("addresses: {}".format(args.addresses))
logger.info("max_workers: {}".format(args.max_workers))
logger.info("max_workers_per_node: {}".format(args.max_workers_per_node))
logger.info("poll_period: {}".format(args.poll))
logger.info("address_probe_timeout: {}".format(args.address_probe_timeout))
logger.info("Prefetch capacity: {}".format(args.prefetch_capacity))
Expand All @@ -866,7 +869,10 @@ def strategyorlist(s: str):
block_id=args.block_id,
cores_per_worker=float(args.cores_per_worker),
mem_per_worker=None if args.mem_per_worker == 'None' else float(args.mem_per_worker),
max_workers=args.max_workers if args.max_workers == float('inf') else int(args.max_workers),
max_workers_per_node=(
args.max_workers_per_node if args.max_workers_per_node == float('inf')
else int(args.max_workers_per_node)
),
prefetch_capacity=int(args.prefetch_capacity),
heartbeat_threshold=int(args.hb_threshold),
heartbeat_period=int(args.hb_period),
Expand Down
7 changes: 5 additions & 2 deletions parsl/executors/workqueue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@


# Support structure to communicate parsl tasks to the work queue submit thread.
ParslTaskToWq = namedtuple('ParslTaskToWq',
'id category cores memory disk gpus priority running_time_min env_pkg map_file function_file result_file input_files output_files')
ParslTaskToWq = namedtuple(
'ParslTaskToWq',
'id '
'category '
'cores memory disk gpus priority running_time_min env_pkg map_file function_file result_file input_files output_files')

# Support structure to communicate final status of work queue tasks to parsl
# if result_received is True:
Expand Down
5 changes: 4 additions & 1 deletion parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,10 @@ def start(self,
if reprocessable_last_resource_messages:
self._insert(table=STATUS, messages=reprocessable_last_resource_messages)
except Exception:
logger.exception("Exception in db loop: this might have been a malformed message, or some other error. monitoring data may have been lost")
logger.exception(
"Exception in db loop: this might have been a malformed message, "
"or some other error. monitoring data may have been lost"
)
exception_happened = True
if exception_happened:
raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log")
Expand Down
8 changes: 6 additions & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,12 @@ def close(self) -> None:
self._dfk_channel.close()
if exception_msgs:
for exception_msg in exception_msgs:
self.logger.error("{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(exception_msg[0],
exception_msg[1]))
self.logger.error(
"{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(
exception_msg[0],
exception_msg[1]
)
)
self.router_proc.terminate()
self.dbm_proc.terminate()
self.filesystem_proc.terminate()
Expand Down
Loading

0 comments on commit 03fd8f3

Please sign in to comment.