Skip to content

Commit

Permalink
Use CommandClient for interchange port reporting instead of Queue
Browse files Browse the repository at this point in the history
Before this PR, the interchange used a multiprocessing.Queue to send a single
message containing the ports it is listening on back to the submitting
process. This ties the interchange into being forked via multiprocessing,
even though the rest of the interchange is architected to be forked anyhow,
as part of earlier remote-interchange work.

After this PR, the CommandClient used for other submit-side to interchange
communication is used to retrieve this information, removing that reliance
on multiprocessing and reducing the number of different communication channels
used between the interchange and submit side by one.

See issue #3373 for more context on launching the interchange via fork/exec
rather than using multiprocessing.
  • Loading branch information
benclifford committed May 27, 2024
1 parent 11664da commit 49b0d5d
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
11 changes: 5 additions & 6 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import typeguard
import logging
import threading
import queue
import pickle
from dataclasses import dataclass
from multiprocessing import Process, Queue
from multiprocessing import Process
from typing import Dict, Sequence
from typing import List, Optional, Tuple, Union, Callable
import math
Expand All @@ -21,6 +20,7 @@
from parsl.jobs.states import JobStatus, JobState, TERMINAL_STATES
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput import interchange
from parsl.executors.high_throughput.errors import CommandClientTimeoutError
from parsl.executors.errors import (
BadMessage, ScalingFailed,
)
Expand Down Expand Up @@ -531,9 +531,7 @@ def _start_local_interchange_process(self):
Starts the interchange process locally and uses an internal command queue to
get the worker task and result ports that the interchange has bound to.
"""
comm_q = Queue(maxsize=10)
self.interchange_proc = ForkProcess(target=interchange.starter,
args=(comm_q,),
kwargs={"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
Expand All @@ -552,9 +550,10 @@ def _start_local_interchange_process(self):
name="HTEX-Interchange"
)
self.interchange_proc.start()

try:
(self.worker_task_port, self.worker_result_port) = comm_q.get(block=True, timeout=120)
except queue.Empty:
(self.worker_task_port, self.worker_result_port) = self.command_client.run("WORKER_PORTS", timeout_s=120)
except CommandClientTimeoutError:
logger.error("Interchange has not completed initialization in 120s. Aborting")
raise Exception("Interchange failed to start")

Expand Down
8 changes: 4 additions & 4 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/usr/bin/env python
import multiprocessing
import zmq
import os
import sys
Expand Down Expand Up @@ -328,6 +327,9 @@ def _command_server(self) -> NoReturn:

reply = None

elif command_req == "WORKER_PORTS":
reply = (self.worker_task_port, self.worker_result_port)

else:
logger.error(f"Received unknown command: {command_req}")
reply = None
Expand Down Expand Up @@ -672,14 +674,12 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string:


@wrap_with_logs(target="interchange")
def starter(comm_q: multiprocessing.Queue, *args: Any, **kwargs: Any) -> None:
def starter(*args: Any, **kwargs: Any) -> None:
"""Start the interchange process
The executor is expected to call this function. The args, kwargs match that of the Interchange.__init__
"""
setproctitle("parsl: HTEX interchange")
# logger = multiprocessing.get_logger()
ic = Interchange(*args, **kwargs)
comm_q.put((ic.worker_task_port,
ic.worker_result_port))
ic.start()

0 comments on commit 49b0d5d

Please sign in to comment.