Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for IPV6 addresses #3704

Merged
merged 8 commits into from
Nov 25, 2024
4 changes: 4 additions & 0 deletions parsl/curvezmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket:
except zmq.ZMQError as e:
raise ValueError("Invalid CurveZMQ key format") from e
sock.setsockopt(zmq.CURVE_SERVER, True) # Must come before bind

# This flag enables IPV6 in addition to IPV4
sock.setsockopt(zmq.IPV6, True)
return sock

def term(self):
Expand Down Expand Up @@ -202,4 +205,5 @@ def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket:
sock.setsockopt(zmq.CURVE_SERVERKEY, server_public_key)
except zmq.ZMQError as e:
raise ValueError("Invalid CurveZMQ key format") from e
sock.setsockopt(zmq.IPV6, True)
return sock
26 changes: 20 additions & 6 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@

address : string
An address to connect to the main Parsl process which is reachable from the network in which
workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx).
workers will be running. This field expects an IPv4 address (xxx.xxx.xxx.xxx) (when
``enable_ipv6`` is not set)
benclifford marked this conversation as resolved.
Show resolved Hide resolved
Most login nodes on clusters have several network interfaces available, only some of which
can be reached from the compute nodes. This field can be used to limit the executor to listen
only on a specific interface, and limiting connections to the internal network.
By default, the executor will attempt to enumerate and connect through all possible addresses.
Setting an address here overrides the default behavior.
If ``enable_ipv6`` is set, specify an IPv6 address in brackets, for eg: ``address=[::1]``
rjmello marked this conversation as resolved.
Show resolved Hide resolved
default=None

worker_ports : (int, int)
Expand Down Expand Up @@ -224,6 +226,10 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
Parsl will create names as integers starting with 0.

default: empty list

enable_ipv6: bool
Set this flag to enable communication over IPV6 in addition to IPV4.
default: False
yadudoc marked this conversation as resolved.
Show resolved Hide resolved
"""

@typeguard.typechecked
Expand Down Expand Up @@ -253,7 +259,8 @@ def __init__(self,
worker_logdir_root: Optional[str] = None,
manager_selector: ManagerSelector = RandomManagerSelector(),
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
encrypted: bool = False,
enable_ipv6: bool = False):

logger.debug("Initializing HighThroughputExecutor")

Expand All @@ -268,6 +275,13 @@ def __init__(self,
self.address = address
self.address_probe_timeout = address_probe_timeout
self.manager_selector = manager_selector
self.enable_ipv6 = enable_ipv6
if self.enable_ipv6:
# This will force usage of IPV6 for internal communication
self._internal_address = "[::1]"
rjmello marked this conversation as resolved.
Show resolved Hide resolved
else:
self._internal_address = "127.0.0.1"

if self.address:
self.all_addresses = address
else:
Expand Down Expand Up @@ -408,13 +422,13 @@ def start(self):
)

self.outgoing_q = zmq_pipes.TasksOutgoing(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self._internal_address, self.interchange_port_range, self.cert_dir
)
self.incoming_q = zmq_pipes.ResultsIncoming(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self._internal_address, self.interchange_port_range, self.cert_dir
)
self.command_client = zmq_pipes.CommandClient(
"127.0.0.1", self.interchange_port_range, self.cert_dir
self._internal_address, self.interchange_port_range, self.cert_dir
)

self._result_queue_thread = None
Expand Down Expand Up @@ -515,7 +529,7 @@ def _start_local_interchange_process(self) -> None:
get the worker task and result ports that the interchange has bound to.
"""

interchange_config = {"client_address": "127.0.0.1",
interchange_config = {"client_address": self._internal_address,
"client_ports": (self.outgoing_q.port,
self.incoming_q.port,
self.command_client.port),
Expand Down
6 changes: 4 additions & 2 deletions parsl/executors/high_throughput/mpi_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def __init__(self,
worker_logdir_root: Optional[str] = None,
mpi_launcher: str = "mpiexec",
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
encrypted: bool = False,
enable_ipv6: bool = False):
super().__init__(
# Hard-coded settings
cores_per_worker=1e-9, # Ensures there will be at least an absurd number of workers
Expand All @@ -92,7 +93,8 @@ def __init__(self,
address_probe_timeout=address_probe_timeout,
worker_logdir_root=worker_logdir_root,
block_error_handler=block_error_handler,
encrypted=encrypted
encrypted=encrypted,
enable_ipv6=enable_ipv6,
)
self.enable_mpi_mode = True
self.mpi_launcher = mpi_launcher
Expand Down
1 change: 1 addition & 0 deletions parsl/executors/high_throughput/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def probe_addresses(addresses, task_port, timeout=120):
for addr in addresses:
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
socket.setsockopt(zmq.IPV6, True)
url = "tcp://{}:{}".format(addr, task_port)
logger.debug("Trying to connect back on {}".format(url))
socket.connect(url)
Expand Down
1 change: 1 addition & 0 deletions parsl/tests/configs/htex_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def fresh_config():
executors=[
HighThroughputExecutor(
label="htex_local",
enable_ipv6=True,
worker_debug=True,
cores_per_worker=1,
encrypted=True,
Expand Down
3 changes: 2 additions & 1 deletion parsl/tests/test_htex/test_zmq_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,5 @@ def test_limited_interface_binding(cert_dir: Optional[str]):

matched_conns = [conn for conn in conns if conn.laddr.port == ix.worker_result_port]
assert len(matched_conns) == 1
assert matched_conns[0].laddr.ip == address
# laddr.ip can return ::ffff:127.0.0.1 when using IPv6
assert address in matched_conns[0].laddr.ip
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this always exactly the string "127.0.0.1" or the string "::ffff:127.0.0.1"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all my testing it is.

Loading