diff --git a/Makefile b/Makefile index 62eb858824..651cfbb0d4 100644 --- a/Makefile +++ b/Makefile @@ -122,6 +122,7 @@ coverage: ## show the coverage report .PHONY: clean clean: ## clean up the environment by deleting the .venv, dist, eggs, mypy caches, coverage info, etc +parsl+flux rm -rf .venv $(DEPS) dist *.egg-info .mypy_cache build .pytest_cache .coverage runinfo_* $(WORKQUEUE_INSTALL) .PHONY: flux_local_test @@ -129,3 +130,6 @@ flux_local_test: pip3 install . pytest parsl/tests/ -k "not cleannet" --config parsl/tests/configs/flux_local.py --random-order --durations 10 + + rm -rf .venv $(DEPS) dist *.egg-info .mypy_cache build .pytest_cache .coverage runinfo $(WORKQUEUE_INSTALL) +master diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 422ef1b50b..795c229a42 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -55,6 +55,7 @@ "--hb_period={heartbeat_period} " "{address_probe_timeout_string} " "--hb_threshold={heartbeat_threshold} " + "--drain_period={drain_period} " "--cpu-affinity {cpu_affinity} " "{enable_mpi_mode} " "--mpi-launcher={mpi_launcher} " @@ -201,6 +202,14 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin): Timeout period to be used by the executor components in milliseconds. Increasing poll_periods trades performance for cpu efficiency. Default: 10ms + drain_period : int + The number of seconds after start when workers will begin to drain + and then exit. Set this to a time that is slightly less than the + maximum walltime of batch jobs to avoid killing tasks while they + execute. For example, you could set this to the walltime minus a grace + period for the batch job to start the workers, minus the expected + maximum length of an individual task. + worker_logdir_root : string In case of a remote file system, specify the path to where logs will be kept. @@ -240,6 +249,7 @@ def __init__(self, prefetch_capacity: int = 0, heartbeat_threshold: int = 120, heartbeat_period: int = 30, + drain_period: Optional[int] = None, poll_period: int = 10, address_probe_timeout: Optional[int] = None, worker_logdir_root: Optional[str] = None, @@ -303,6 +313,7 @@ def __init__(self, self.interchange_port_range = interchange_port_range self.heartbeat_threshold = heartbeat_threshold self.heartbeat_period = heartbeat_period + self.drain_period = drain_period self.poll_period = poll_period self.run_dir = '.' self.worker_logdir_root = worker_logdir_root @@ -376,6 +387,7 @@ def initialize_scaling(self): nodes_per_block=self.provider.nodes_per_block, heartbeat_period=self.heartbeat_period, heartbeat_threshold=self.heartbeat_threshold, + drain_period=self.drain_period, poll_period=self.poll_period, cert_dir=self.cert_dir, logdir=self.worker_logdir, diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 11d5ed2ee4..2746699203 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -28,6 +28,7 @@ PKL_HEARTBEAT_CODE = pickle.dumps((2 ** 32) - 1) +PKL_DRAINED_CODE = pickle.dumps((2 ** 32) - 2) LOGGER_NAME = "interchange" logger = logging.getLogger(LOGGER_NAME) @@ -308,7 +309,8 @@ def _command_server(self) -> NoReturn: 'worker_count': m['worker_count'], 'tasks': len(m['tasks']), 'idle_duration': idle_duration, - 'active': m['active']} + 'active': m['active'], + 'draining': m['draining']} reply.append(resp) elif command_req.startswith("HOLD_WORKER"): @@ -385,6 +387,7 @@ def start(self) -> None: self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event) self.process_results_incoming(interesting_managers, hub_channel) self.expire_bad_managers(interesting_managers, hub_channel) + self.expire_drained_managers(interesting_managers, hub_channel) self.process_tasks_to_send(interesting_managers) self.zmq_context.destroy() @@ -431,6 +434,7 @@ def process_task_outgoing_incoming( 'max_capacity': 0, 'worker_count': 0, 'active': True, + 'draining': False, 'tasks': []} self.connected_block_history.append(msg['block_id']) @@ -469,10 +473,28 @@ def process_task_outgoing_incoming( self._ready_managers[manager_id]['last_heartbeat'] = time.time() logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id)) self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) + elif msg['type'] == 'drain': + self._ready_managers[manager_id]['draining'] = True + logger.debug(f"Manager {manager_id!r} requested drain") else: logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") + def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + + for manager_id in list(interesting_managers): + # is it always true that a draining manager will be in interesting managers? + # i think so because it will have outstanding capacity? + m = self._ready_managers[manager_id] + if m['draining'] and len(m['tasks']) == 0: + logger.info(f"Manager {manager_id!r} is drained - sending drained message to manager") + self.task_outgoing.send_multipart([manager_id, b'', PKL_DRAINED_CODE]) + interesting_managers.remove(manager_id) + self._ready_managers.pop(manager_id) + + m['active'] = False + self._send_monitoring_info(hub_channel, m) + def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers @@ -490,7 +512,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tasks_inflight = len(m['tasks']) real_capacity = m['max_capacity'] - tasks_inflight - if (real_capacity and m['active']): + if (real_capacity and m['active'] and not m['draining']): tasks = self.get_tasks(real_capacity) if tasks: self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)]) diff --git a/parsl/executors/high_throughput/manager_record.py b/parsl/executors/high_throughput/manager_record.py index 235a615d5c..0cf8e432db 100644 --- a/parsl/executors/high_throughput/manager_record.py +++ b/parsl/executors/high_throughput/manager_record.py @@ -9,6 +9,7 @@ class ManagerRecord(TypedDict, total=False): worker_count: int max_capacity: int active: bool + draining: bool hostname: str last_heartbeat: float idle_since: Optional[float] diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 7c32334786..867640774c 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -36,6 +36,7 @@ from parsl.executors.high_throughput.mpi_prefix_composer import compose_all, VALID_LAUNCHERS HEARTBEAT_CODE = (2 ** 32) - 1 +DRAINED_CODE = (2 ** 32) - 2 class Manager: @@ -73,7 +74,8 @@ def __init__(self, *, enable_mpi_mode: bool = False, mpi_launcher: str = "mpiexec", available_accelerators: Sequence[str], - cert_dir: Optional[str]): + cert_dir: Optional[str], + drain_period: Optional[int]): """ Parameters ---------- @@ -138,6 +140,9 @@ def __init__(self, *, cert_dir : str | None Path to the certificate directory. + + drain_period: int | None + Number of seconds to drain after TODO: could be a nicer timespec involving m,s,h qualifiers for user friendliness? """ logger.info("Manager initializing") @@ -227,6 +232,14 @@ def __init__(self, *, self.heartbeat_period = heartbeat_period self.heartbeat_threshold = heartbeat_threshold self.poll_period = poll_period + + self.drain_time: float + if drain_period: + self.drain_time = self._start_time + drain_period + logger.info(f"Will request drain at {self.drain_time}") + else: + self.drain_time = float('inf') + self.cpu_affinity = cpu_affinity # Define accelerator available, adjust worker count accordingly @@ -262,10 +275,19 @@ def heartbeat_to_incoming(self): """ Send heartbeat to the incoming task queue """ msg = {'type': 'heartbeat'} + # don't need to dumps and encode this every time - could do as a global on import? b_msg = json.dumps(msg).encode('utf-8') self.task_incoming.send(b_msg) logger.debug("Sent heartbeat") + def drain_to_incoming(self): + """ Send heartbeat to the incoming task queue + """ + msg = {'type': 'drain'} + b_msg = json.dumps(msg).encode('utf-8') + self.task_incoming.send(b_msg) + logger.debug("Sent drain") + @wrap_with_logs def pull_tasks(self, kill_event): """ Pull tasks from the incoming tasks zmq pipe onto the internal @@ -298,6 +320,7 @@ def pull_tasks(self, kill_event): # time here are correctly copy-pasted from the relevant if # statements. next_interesting_event_time = min(last_beat + self.heartbeat_period, + self.drain_time, last_interchange_contact + self.heartbeat_threshold) try: pending_task_count = self.pending_task_queue.qsize() @@ -312,6 +335,14 @@ def pull_tasks(self, kill_event): self.heartbeat_to_incoming() last_beat = time.time() + if self.drain_time and time.time() > self.drain_time: + logger.info("Requesting drain") + self.drain_to_incoming() + self.drain_time = None + # This will start the pool draining... + # Drained exit behaviour does not happen here. It will be + # driven by the interchange sending a DRAINED_CODE message. + poll_duration_s = max(0, next_interesting_event_time - time.time()) socks = dict(poller.poll(timeout=poll_duration_s * 1000)) @@ -322,7 +353,9 @@ def pull_tasks(self, kill_event): if tasks == HEARTBEAT_CODE: logger.debug("Got heartbeat from interchange") - + elif tasks == DRAINED_CODE: + logger.info("Got fulled drained message from interchange - setting kill flag") + kill_event.set() else: task_recv_counter += len(tasks) logger.debug("Got executor tasks: {}, cumulative count of tasks: {}".format([t['task_id'] for t in tasks], task_recv_counter)) @@ -490,9 +523,8 @@ def start(self): self._worker_watchdog_thread.start() self._monitoring_handler_thread.start() - logger.info("Loop start") + logger.info("Manager threads started") - # TODO : Add mechanism in this loop to stop the worker pool # This might need a multiprocessing event to signal back. self._kill_event.wait() logger.critical("Received kill event, terminating worker processes") @@ -804,6 +836,8 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_ help="Heartbeat period in seconds. Uses manager default unless set") parser.add_argument("--hb_threshold", default=120, help="Heartbeat threshold in seconds. Uses manager default unless set") + parser.add_argument("--drain_period", default=None, + help="Drain this pool after specified number of seconds. By default, does not drain.") parser.add_argument("--address_probe_timeout", default=30, help="Timeout to probe for viable address to interchange. Default: 30s") parser.add_argument("--poll", default=10, @@ -856,6 +890,7 @@ def strategyorlist(s: str): logger.info("Prefetch capacity: {}".format(args.prefetch_capacity)) logger.info("Heartbeat threshold: {}".format(args.hb_threshold)) logger.info("Heartbeat period: {}".format(args.hb_period)) + logger.info("Drain period: {}".format(args.drain_period)) logger.info("CPU affinity: {}".format(args.cpu_affinity)) logger.info("Accelerators: {}".format(" ".join(args.available_accelerators))) logger.info("enable_mpi_mode: {}".format(args.enable_mpi_mode)) @@ -876,6 +911,7 @@ def strategyorlist(s: str): prefetch_capacity=int(args.prefetch_capacity), heartbeat_threshold=int(args.hb_threshold), heartbeat_period=int(args.hb_period), + drain_period=None if args.drain_period == "None" else int(args.drain_period), poll_period=int(args.poll), cpu_affinity=args.cpu_affinity, enable_mpi_mode=args.enable_mpi_mode, diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 207675147e..0cd7fac697 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -196,8 +196,9 @@ def __synchronize_manager_factory_comm_settings(self): if self.manager_config.port == 0 and self.manager_config.project_name is None: self.manager_config.project_name = "parsl-vine-" + str(uuid.uuid4()) - # guess the host name if the project name is not given - if not self.manager_config.project_name: + # guess the host name if the project name is not given and none has been supplied + # explicitly in the manager config. + if not self.manager_config.project_name and self.manager_config.address is None: self.manager_config.address = get_any_address() # Factory communication settings are overridden by manager communication settings. diff --git a/parsl/executors/taskvine/manager_config.py b/parsl/executors/taskvine/manager_config.py index a037717366..18e58a0b90 100644 --- a/parsl/executors/taskvine/manager_config.py +++ b/parsl/executors/taskvine/manager_config.py @@ -1,4 +1,3 @@ -import socket from dataclasses import dataclass from typing import Optional @@ -23,9 +22,9 @@ class TaskVineManagerConfig: A value of 0 means TaskVine chooses any available port. Default is VINE_DEFAULT_PORT. - address: str + address: Optional[str] Address of the local machine. - Default is socket.gethostname(). + If None, socket.gethostname() will be used to determine the address. project_name: Optional[str] If given, TaskVine will periodically report its status and performance @@ -161,7 +160,7 @@ class TaskVineManagerConfig: # Connection and communication settings port: int = VINE_DEFAULT_PORT - address: str = socket.gethostname() + address: Optional[str] = None project_name: Optional[str] = None project_password_file: Optional[str] = None diff --git a/parsl/tests/test_htex/test_drain.py b/parsl/tests/test_htex/test_drain.py new file mode 100644 index 0000000000..af528eeef5 --- /dev/null +++ b/parsl/tests/test_htex/test_drain.py @@ -0,0 +1,78 @@ +import parsl +import pytest +import time + +from parsl.providers import LocalProvider +from parsl.channels import LocalChannel +from parsl.launchers import SimpleLauncher + +from parsl.config import Config +from parsl.executors import HighThroughputExecutor + +# this constant is used to scale some durations that happen +# based around the expected drain period: the drain period +# is TIME_CONST seconds, and the single executed task will +# last twice that many number of seconds. +TIME_CONST = 1 + + +def local_config(): + return Config( + executors=[ + HighThroughputExecutor( + label="htex_local", + drain_period=TIME_CONST, + worker_debug=True, + cores_per_worker=1, + encrypted=True, + provider=LocalProvider( + channel=LocalChannel(), + init_blocks=1, + min_blocks=0, + max_blocks=0, + launcher=SimpleLauncher(), + ), + ) + ], + strategy='none', + ) + + +@parsl.python_app +def f(n): + import time + time.sleep(n) + + +@pytest.mark.local +def test_drain(try_assert): + + htex = parsl.dfk().executors['htex_local'] + + # wait till we have a block running... + + try_assert(lambda: len(htex.connected_managers()) == 1) + + managers = htex.connected_managers() + assert managers[0]['active'], "The manager should be active" + assert not managers[0]['draining'], "The manager should not be draining" + + fut = f(TIME_CONST * 2) + + time.sleep(TIME_CONST) + + # this assert should happen *very fast* after the above delay... + try_assert(lambda: htex.connected_managers()[0]['draining'], timeout_ms=500) + + # and the test task should still be running... + assert not fut.done(), "The test task should still be running" + + fut.result() + + # and now we should see the manager disappear... + # ... with strategy='none', this should be coming from draining but + # that information isn't immediately obvious from the absence in + # connected managers. + # As with the above draining assert, this should happen very fast after + # the task ends. + try_assert(lambda: len(htex.connected_managers()) == 0, timeout_ms=500)