Skip to content

Commit

Permalink
Adding support for IPV6 addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
yadudoc committed Nov 22, 2024
1 parent 07dfb42 commit b3c457f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
4 changes: 4 additions & 0 deletions parsl/curvezmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket:
except zmq.ZMQError as e:
raise ValueError("Invalid CurveZMQ key format") from e
sock.setsockopt(zmq.CURVE_SERVER, True) # Must come before bind

# This flag enables IPV6 in addition to IPV4
sock.setsockopt(zmq.IPV6, True)
return sock

def term(self):
Expand Down Expand Up @@ -202,4 +205,5 @@ def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket:
sock.setsockopt(zmq.CURVE_SERVERKEY, server_public_key)
except zmq.ZMQError as e:
raise ValueError("Invalid CurveZMQ key format") from e
sock.setsockopt(zmq.IPV6, True)
return sock
26 changes: 20 additions & 6 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@
address : string
An address to connect to the main Parsl process which is reachable from the network in which
workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx).
workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx) (when
enable_ipv6 is not set)
Most login nodes on clusters have several network interfaces available, only some of which
can be reached from the compute nodes. This field can be used to limit the executor to listen
only on a specific interface, and limiting connections to the internal network.
By default, the executor will attempt to enumerate and connect through all possible addresses.
Setting an address here overrides the default behavior.
If `enable_ipv6` is set, specify an IPv6 address in brackets, for eg: address=[::1]
default=None
worker_ports : (int, int)
Expand Down Expand Up @@ -224,6 +226,10 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
Parsl will create names as integers starting with 0.
default: empty list
enable_ipv6: bool
Set this flag to enable communication over IPV6 in addition to IPV4.
default: False
"""

@typeguard.typechecked
Expand Down Expand Up @@ -253,7 +259,8 @@ def __init__(self,
worker_logdir_root: Optional[str] = None,
manager_selector: ManagerSelector = RandomManagerSelector(),
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
encrypted: bool = False,
enable_ipv6: bool = False):

logger.debug("Initializing HighThroughputExecutor")

Expand All @@ -268,6 +275,13 @@ def __init__(self,
self.address = address
self.address_probe_timeout = address_probe_timeout
self.manager_selector = manager_selector
self.enable_ipv6 = enable_ipv6
if self.enable_ipv6:
# This will force usage of IPV6 for internal communication
self._internal_address = "[::1]"
else:
self._internal_address = "127.0.0.1"

if self.address:
self.all_addresses = address
else:
Expand Down Expand Up @@ -408,13 +422,13 @@ def start(self):
)

self.outgoing_q = zmq_pipes.TasksOutgoing(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self._internal_address, self.interchange_port_range, self.cert_dir
)
self.incoming_q = zmq_pipes.ResultsIncoming(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self._internal_address, self.interchange_port_range, self.cert_dir
)
self.command_client = zmq_pipes.CommandClient(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self._internal_address, self.interchange_port_range, self.cert_dir
)

self._result_queue_thread = None
Expand Down Expand Up @@ -515,7 +529,7 @@ def _start_local_interchange_process(self) -> None:
get the worker task and result ports that the interchange has bound to.
"""

interchange_config = {"client_address": "127.0.0.1",
interchange_config = {"client_address": self._internal_address,
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
Expand Down
1 change: 1 addition & 0 deletions parsl/executors/high_throughput/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def probe_addresses(addresses, task_port, timeout=120):
for addr in addresses:
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
socket.setsockopt(zmq.IPV6, True)
url = "tcp://{}:{}".format(addr, task_port)
logger.debug("Trying to connect back on {}".format(url))
socket.connect(url)
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/configs/htex_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def fresh_config():
executors=[
HighThroughputExecutor(
label="htex_local",
enable_ipv6=True,
worker_debug=True,
cores_per_worker=1,
encrypted=True,
Expand Down

0 comments on commit b3c457f

Please sign in to comment.