From d30be4d947221602e50dd143a0d1de57ff570cc3 Mon Sep 17 00:00:00 2001 From: rjmello <30907815+rjmello@users.noreply.github.com> Date: Tue, 28 May 2024 15:32:50 -0400 Subject: [PATCH] Make ZMQ context per pipe explicit (#3462) This clarifies our intention to use a separate ZMQ context for each ZMQ pipe, which simplifies the handling of ZMQ pipe objects. Fixes #3457 --- parsl/executors/high_throughput/executor.py | 6 ++-- parsl/executors/high_throughput/zmq_pipes.py | 34 ++++++++++++------- .../test_htex/test_command_client_timeout.py | 11 ++---- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 5407346ad8..845ea5538b 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -415,13 +415,13 @@ def start(self): ) self.outgoing_q = zmq_pipes.TasksOutgoing( - curvezmq.ClientContext(self.cert_dir), "127.0.0.1", self.interchange_port_range + "127.0.0.1", self.interchange_port_range, self.cert_dir ) self.incoming_q = zmq_pipes.ResultsIncoming( - curvezmq.ClientContext(self.cert_dir), "127.0.0.1", self.interchange_port_range + "127.0.0.1", self.interchange_port_range, self.cert_dir ) self.command_client = zmq_pipes.CommandClient( - curvezmq.ClientContext(self.cert_dir), "127.0.0.1", self.interchange_port_range + "127.0.0.1", self.interchange_port_range, self.cert_dir ) self._queue_management_thread = None diff --git a/parsl/executors/high_throughput/zmq_pipes.py b/parsl/executors/high_throughput/zmq_pipes.py index 8a730c1c38..ca0a6ff251 100644 --- a/parsl/executors/high_throughput/zmq_pipes.py +++ b/parsl/executors/high_throughput/zmq_pipes.py @@ -4,6 +4,7 @@ import logging import threading import time +from typing import Optional from parsl import curvezmq from parsl.errors import InternalConsistencyError @@ -15,20 +16,23 @@ class CommandClient: """ CommandClient """ - def __init__(self, zmq_context: curvezmq.ClientContext, ip_address, port_range): + def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): """ Parameters ---------- - zmq_context: curvezmq.ClientContext - CurveZMQ client context used to create secure sockets ip_address: str IP address of the client (where Parsl runs) + port_range: tuple(int, int) Port range for the comms between client and interchange + cert_dir: str | None + Path to the certificate directory. Setting this to None will disable encryption. + default: None + """ - self.zmq_context = zmq_context + self.zmq_context = curvezmq.ClientContext(cert_dir) self.ip_address = ip_address self.port_range = port_range self.port = None @@ -119,20 +123,23 @@ def close(self): class TasksOutgoing: """ Outgoing task queue from the executor to the Interchange """ - def __init__(self, zmq_context: curvezmq.ClientContext, ip_address, port_range): + def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): """ Parameters ---------- - zmq_context: curvezmq.ClientContext - CurveZMQ client context used to create secure sockets ip_address: str IP address of the client (where Parsl runs) + port_range: tuple(int, int) Port range for the comms between client and interchange + cert_dir: str | None + Path to the certificate directory. Setting this to None will disable encryption. + default: None + """ - self.zmq_context = zmq_context + self.zmq_context = curvezmq.ClientContext(cert_dir) self.zmq_socket = self.zmq_context.socket(zmq.DEALER) self.zmq_socket.set_hwm(0) self.port = self.zmq_socket.bind_to_random_port("tcp://{}".format(ip_address), @@ -172,20 +179,23 @@ class ResultsIncoming: """ Incoming results queue from the Interchange to the executor """ - def __init__(self, zmq_context: curvezmq.ClientContext, ip_address, port_range): + def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None): """ Parameters ---------- - zmq_context: curvezmq.ClientContext - CurveZMQ client context used to create secure sockets ip_address: str IP address of the client (where Parsl runs) + port_range: tuple(int, int) Port range for the comms between client and interchange + cert_dir: str | None + Path to the certificate directory. Setting this to None will disable encryption. + default: None + """ - self.zmq_context = zmq_context + self.zmq_context = curvezmq.ClientContext(cert_dir) self.results_receiver = self.zmq_context.socket(zmq.DEALER) self.results_receiver.set_hwm(0) self.port = self.results_receiver.bind_to_random_port("tcp://{}".format(ip_address), diff --git a/parsl/tests/test_htex/test_command_client_timeout.py b/parsl/tests/test_htex/test_command_client_timeout.py index 4abdf991bc..85e44f1470 100644 --- a/parsl/tests/test_htex/test_command_client_timeout.py +++ b/parsl/tests/test_htex/test_command_client_timeout.py @@ -18,10 +18,8 @@ def test_command_not_sent() -> None: """Tests timeout on command send. """ - ctx = curvezmq.ClientContext(None) - # RFC6335 ephemeral port range - cc = CommandClient(ctx, "127.0.0.1", (49152, 65535)) + cc = CommandClient("127.0.0.1", (49152, 65535)) # cc will now wait for a connection, but we won't do anything to make the # other side of the connection exist, so any command given to cc should @@ -43,10 +41,8 @@ def test_command_ignored() -> None: htex makes multithreaded use of the command client: see issue #3376 about that lack of thread safety. """ - ctx = curvezmq.ClientContext(None) - # RFC6335 ephemeral port range - cc = CommandClient(ctx, "127.0.0.1", (49152, 65535)) + cc = CommandClient("127.0.0.1", (49152, 65535)) ic_ctx = curvezmq.ServerContext(None) ic_channel = ic_ctx.socket(zmq.REP) @@ -63,7 +59,4 @@ def test_command_ignored() -> None: cc.run("ANOTHER_COMMAND") cc.close() - ctx.term() - ic_channel.close() - ic_ctx.term()