Skip to content

Commit

Permalink
Make ZMQ context per pipe explicit (#3462)
Browse files Browse the repository at this point in the history
This clarifies our intention to use a separate ZMQ context for each ZMQ
pipe, which simplifies the handling of ZMQ pipe objects.

Fixes #3457
  • Loading branch information
rjmello authored May 28, 2024
1 parent 11664da commit d30be4d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 24 deletions.
6 changes: 3 additions & 3 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,13 @@ def start(self):
)

self.outgoing_q = zmq_pipes.TasksOutgoing(
curvezmq.ClientContext(self.cert_dir), "127.0.0.1", self.interchange_port_range
"127.0.0.1", self.interchange_port_range, self.cert_dir
)
self.incoming_q = zmq_pipes.ResultsIncoming(
curvezmq.ClientContext(self.cert_dir), "127.0.0.1", self.interchange_port_range
"127.0.0.1", self.interchange_port_range, self.cert_dir
)
self.command_client = zmq_pipes.CommandClient(
curvezmq.ClientContext(self.cert_dir), "127.0.0.1", self.interchange_port_range
"127.0.0.1", self.interchange_port_range, self.cert_dir
)

self._queue_management_thread = None
Expand Down
34 changes: 22 additions & 12 deletions parsl/executors/high_throughput/zmq_pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import threading
import time
from typing import Optional

from parsl import curvezmq
from parsl.errors import InternalConsistencyError
Expand All @@ -15,20 +16,23 @@
class CommandClient:
""" CommandClient
"""
def __init__(self, zmq_context: curvezmq.ClientContext, ip_address, port_range):
def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
"""
Parameters
----------
zmq_context: curvezmq.ClientContext
CurveZMQ client context used to create secure sockets
ip_address: str
IP address of the client (where Parsl runs)
port_range: tuple(int, int)
Port range for the comms between client and interchange
cert_dir: str | None
Path to the certificate directory. Setting this to None will disable encryption.
default: None
"""
self.zmq_context = zmq_context
self.zmq_context = curvezmq.ClientContext(cert_dir)
self.ip_address = ip_address
self.port_range = port_range
self.port = None
Expand Down Expand Up @@ -119,20 +123,23 @@ def close(self):
class TasksOutgoing:
""" Outgoing task queue from the executor to the Interchange
"""
def __init__(self, zmq_context: curvezmq.ClientContext, ip_address, port_range):
def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
"""
Parameters
----------
zmq_context: curvezmq.ClientContext
CurveZMQ client context used to create secure sockets
ip_address: str
IP address of the client (where Parsl runs)
port_range: tuple(int, int)
Port range for the comms between client and interchange
cert_dir: str | None
Path to the certificate directory. Setting this to None will disable encryption.
default: None
"""
self.zmq_context = zmq_context
self.zmq_context = curvezmq.ClientContext(cert_dir)
self.zmq_socket = self.zmq_context.socket(zmq.DEALER)
self.zmq_socket.set_hwm(0)
self.port = self.zmq_socket.bind_to_random_port("tcp://{}".format(ip_address),
Expand Down Expand Up @@ -172,20 +179,23 @@ class ResultsIncoming:
""" Incoming results queue from the Interchange to the executor
"""

def __init__(self, zmq_context: curvezmq.ClientContext, ip_address, port_range):
def __init__(self, ip_address, port_range, cert_dir: Optional[str] = None):
"""
Parameters
----------
zmq_context: curvezmq.ClientContext
CurveZMQ client context used to create secure sockets
ip_address: str
IP address of the client (where Parsl runs)
port_range: tuple(int, int)
Port range for the comms between client and interchange
cert_dir: str | None
Path to the certificate directory. Setting this to None will disable encryption.
default: None
"""
self.zmq_context = zmq_context
self.zmq_context = curvezmq.ClientContext(cert_dir)
self.results_receiver = self.zmq_context.socket(zmq.DEALER)
self.results_receiver.set_hwm(0)
self.port = self.results_receiver.bind_to_random_port("tcp://{}".format(ip_address),
Expand Down
11 changes: 2 additions & 9 deletions parsl/tests/test_htex/test_command_client_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
def test_command_not_sent() -> None:
"""Tests timeout on command send.
"""
ctx = curvezmq.ClientContext(None)

# RFC6335 ephemeral port range
cc = CommandClient(ctx, "127.0.0.1", (49152, 65535))
cc = CommandClient("127.0.0.1", (49152, 65535))

# cc will now wait for a connection, but we won't do anything to make the
# other side of the connection exist, so any command given to cc should
Expand All @@ -43,10 +41,8 @@ def test_command_ignored() -> None:
htex makes multithreaded use of the command client: see issue #3376 about
that lack of thread safety.
"""
ctx = curvezmq.ClientContext(None)

# RFC6335 ephemeral port range
cc = CommandClient(ctx, "127.0.0.1", (49152, 65535))
cc = CommandClient("127.0.0.1", (49152, 65535))

ic_ctx = curvezmq.ServerContext(None)
ic_channel = ic_ctx.socket(zmq.REP)
Expand All @@ -63,7 +59,4 @@ def test_command_ignored() -> None:
cc.run("ANOTHER_COMMAND")

cc.close()
ctx.term()

ic_channel.close()
ic_ctx.term()

0 comments on commit d30be4d

Please sign in to comment.