diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 5407346ad8..762ab62c66 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -4,10 +4,9 @@ import typeguard import logging import threading -import queue import pickle from dataclasses import dataclass -from multiprocessing import Process, Queue +from multiprocessing import Process from typing import Dict, Sequence from typing import List, Optional, Tuple, Union, Callable import math @@ -21,6 +20,7 @@ from parsl.jobs.states import JobStatus, JobState, TERMINAL_STATES from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput import interchange +from parsl.executors.high_throughput.errors import CommandClientTimeoutError from parsl.executors.errors import ( BadMessage, ScalingFailed, ) @@ -531,9 +531,7 @@ def _start_local_interchange_process(self): Starts the interchange process locally and uses an internal command queue to get the worker task and result ports that the interchange has bound to. """ - comm_q = Queue(maxsize=10) self.interchange_proc = ForkProcess(target=interchange.starter, - args=(comm_q,), kwargs={"client_ports": (self.outgoing_q.port, self.incoming_q.port, self.command_client.port), @@ -552,9 +550,10 @@ def _start_local_interchange_process(self): name="HTEX-Interchange" ) self.interchange_proc.start() + try: - (self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120) - except queue.Empty: + (self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120) + except CommandClientTimeoutError: logger.error("Interchange has not completed initialization in 120s. Aborting") raise Exception("Interchange failed to start") diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index fe7a081c40..7034d703c7 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -import multiprocessing import zmq import os import sys @@ -328,6 +327,9 @@ def _command_server(self) -> NoReturn: reply = None + elif command_req == "WORKER_PORTS": + reply = (self.worker_task_port, self.worker_result_port) + else: logger.error(f"Received unknown command: {command_req}") reply = None @@ -672,7 +674,7 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: @wrap_with_logs(target="interchange") -def starter(comm_q: multiprocessing.Queue, *args: Any, **kwargs: Any) -> None: +def starter(*args: Any, **kwargs: Any) -> None: """Start the interchange process The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__ @@ -680,6 +682,4 @@ def starter(comm_q: multiprocessing.Queue, *args: Any, **kwargs: Any) -> None: setproctitle("parsl: HTEX interchange") # logger = multiprocessing.get_logger() ic = Interchange(*args, **kwargs) - comm_q.put((ic.worker_task_port, - ic.worker_result_port)) ic.start()