Skip to content

Commit

Permalink
Merge branch 'master' into benc-k8s-kind-ci
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Jun 18, 2024
2 parents 299de99 + 5e03e1f commit 68e3a5d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 46 deletions.
67 changes: 36 additions & 31 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import logging
import math
import pickle
import subprocess
import threading
import typing
import warnings
from collections import defaultdict
from concurrent.futures import Future
from dataclasses import dataclass
from multiprocessing import Process
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union

import typeguard
Expand All @@ -18,15 +18,14 @@
from parsl.app.errors import RemoteExceptionWrapper
from parsl.data_provider.staging import Staging
from parsl.executors.errors import BadMessage, ScalingFailed
from parsl.executors.high_throughput import interchange, zmq_pipes
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput.errors import CommandClientTimeoutError
from parsl.executors.high_throughput.mpi_prefix_composer import (
VALID_LAUNCHERS,
validate_resource_spec,
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
from parsl.multiprocessing import ForkProcess
from parsl.process_loggers import wrap_with_logs
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider
Expand Down Expand Up @@ -305,7 +304,7 @@ def __init__(self,
self._task_counter = 0
self.worker_ports = worker_ports
self.worker_port_range = worker_port_range
self.interchange_proc: Optional[Process] = None
self.interchange_proc: Optional[subprocess.Popen] = None
self.interchange_port_range = interchange_port_range
self.heartbeat_threshold = heartbeat_threshold
self.heartbeat_period = heartbeat_period
Expand Down Expand Up @@ -520,38 +519,45 @@ def _queue_management_worker(self):

logger.info("Queue management worker finished")

def _start_local_interchange_process(self):
def _start_local_interchange_process(self) -> None:
""" Starts the interchange process locally
Starts the interchange process locally and uses an internal command queue to
Starts the interchange process locally and uses the command queue to
get the worker task and result ports that the interchange has bound to.
"""
self.interchange_proc = ForkProcess(target=interchange.starter,
kwargs={"client_address": "127.0.0.1",
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
"interchange_address": self.address,
"worker_ports": self.worker_ports,
"worker_port_range": self.worker_port_range,
"hub_address": self.hub_address,
"hub_zmq_port": self.hub_zmq_port,
"logdir": self.logdir,
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
"cert_dir": self.cert_dir,
},
daemon=True,
name="HTEX-Interchange"
)
self.interchange_proc.start()

interchange_config = {"client_address": "127.0.0.1",
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
"interchange_address": self.address,
"worker_ports": self.worker_ports,
"worker_port_range": self.worker_port_range,
"hub_address": self.hub_address,
"hub_zmq_port": self.hub_zmq_port,
"logdir": self.logdir,
"heartbeat_threshold": self.heartbeat_threshold,
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
"cert_dir": self.cert_dir,
}

config_pickle = pickle.dumps(interchange_config)

self.interchange_proc = subprocess.Popen(b"interchange.py", stdin=subprocess.PIPE)
stdin = self.interchange_proc.stdin
assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode"

logger.debug("Popened interchange process. Writing config object")
stdin.write(config_pickle)
stdin.flush()
logger.debug("Sent config object. Requesting worker ports")
try:
(self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120)
except CommandClientTimeoutError:
logger.error("Interchange has not completed initialization in 120s. Aborting")
logger.error("Interchange has not completed initialization. Aborting")
raise Exception("Interchange failed to start")
logger.debug("Got worker ports")

def _start_queue_management_thread(self):
"""Method to start the management thread as a daemon.
Expand Down Expand Up @@ -810,13 +816,12 @@ def shutdown(self, timeout: float = 10.0):
logger.info("Attempting HighThroughputExecutor shutdown")

self.interchange_proc.terminate()
self.interchange_proc.join(timeout=timeout)
if self.interchange_proc.is_alive():
try:
self.interchange_proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
logger.info("Unable to terminate Interchange process; sending SIGKILL")
self.interchange_proc.kill()

self.interchange_proc.close()

logger.info("Finished HighThroughputExecutor shutdown attempt")

def get_usage_information(self):
Expand Down
13 changes: 5 additions & 8 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,13 +672,10 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string:
logger.addHandler(handler)


@wrap_with_logs(target="interchange")
def starter(*args: Any, **kwargs: Any) -> None:
"""Start the interchange process
The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__
"""
if __name__ == "__main__":
setproctitle("parsl: HTEX interchange")
# logger = multiprocessing.get_logger()
ic = Interchange(*args, **kwargs)

config = pickle.load(sys.stdin.buffer)

ic = Interchange(**config)
ic.start()
31 changes: 24 additions & 7 deletions parsl/tests/test_htex/test_htex.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import pathlib
import warnings
from subprocess import Popen, TimeoutExpired
from unittest import mock

import pytest

from parsl import HighThroughputExecutor, curvezmq
from parsl.multiprocessing import ForkProcess

_MOCK_BASE = "parsl.executors.high_throughput.executor"

Expand Down Expand Up @@ -78,16 +78,33 @@ def test_htex_shutdown(
timeout_expires: bool,
htex: HighThroughputExecutor,
):
mock_ix_proc = mock.Mock(spec=ForkProcess)
mock_ix_proc = mock.Mock(spec=Popen)

if started:
htex.interchange_proc = mock_ix_proc
mock_ix_proc.is_alive.return_value = True

# This will, in the absence of any exit trigger, block forever if
# no timeout is given and if the interchange does not terminate.
# Raise an exception to report that, rather than actually block,
# and hope that nothing is catching that exception.

# this function implements the behaviour if the interchange has
# not received a termination call
def proc_wait_alive(timeout):
if timeout:
raise TimeoutExpired(cmd="mock-interchange", timeout=timeout)
else:
raise RuntimeError("This wait call would hang forever")

def proc_wait_terminated(timeout):
return 0

mock_ix_proc.wait.side_effect = proc_wait_alive

if not timeout_expires:
# Simulate termination of the Interchange process
def kill_interchange(*args, **kwargs):
mock_ix_proc.is_alive.return_value = False
mock_ix_proc.wait.side_effect = proc_wait_terminated

mock_ix_proc.terminate.side_effect = kill_interchange

Expand All @@ -96,16 +113,16 @@ def kill_interchange(*args, **kwargs):
mock_logs = mock_logger.info.call_args_list
if started:
assert mock_ix_proc.terminate.called
assert mock_ix_proc.join.called
assert {"timeout": 10} == mock_ix_proc.join.call_args[1]
assert mock_ix_proc.wait.called
assert {"timeout": 10} == mock_ix_proc.wait.call_args[1]
if timeout_expires:
assert "Unable to terminate Interchange" in mock_logs[1][0][0]
assert mock_ix_proc.kill.called
assert "Attempting" in mock_logs[0][0][0]
assert "Finished" in mock_logs[-1][0][0]
else:
assert not mock_ix_proc.terminate.called
assert not mock_ix_proc.join.called
assert not mock_ix_proc.wait.called
assert "has not started" in mock_logs[0][0][0]


Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
python_requires=">=3.8.0",
install_requires=install_requires,
scripts = ['parsl/executors/high_throughput/process_worker_pool.py',
'parsl/executors/high_throughput/interchange.py',
'parsl/executors/workqueue/exec_parsl_function.py',
'parsl/executors/workqueue/parsl_coprocess.py',
],
Expand Down

0 comments on commit 68e3a5d

Please sign in to comment.