Skip to content

Commit

Permalink
bump cctools version, update parsl_coprocess.py
Browse files Browse the repository at this point in the history
  • Loading branch information
colinthomas-z80 committed Feb 26, 2024
1 parent 7bfcbc8 commit cbbb996
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CCTOOLS_INSTALL := /tmp/cctools
MPICH=mpich
OPENMPI=openmpi
export PATH := $(CCTOOLS_INSTALL)/bin/:$(PATH)
export CCTOOLS_VERSION=7.7.2
export CCTOOLS_VERSION=7.8.0
export HYDRA_LAUNCHER=fork
export OMPI_MCA_rmaps_base_oversubscribe=yes
MPI=$(MPICH)
Expand Down
170 changes: 99 additions & 71 deletions parsl/executors/workqueue/parsl_coprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,24 @@
import json
import os
import sys

# If enabled, coprocess will print to stdout
debug_mode = False

# Send a message on a binary I/O stream by sending the message length and then the (string) message.
def send_message(stream, data):
size = len(data)
size_msg = "{}\n".format(size)
stream.write(size_msg)
stream.write(data)

# Receive a standard message from a binary I/O stream by reading length and then returning the (string) message
def recv_message(stream):
line = stream.readline()
length = int(line)
return stream.read(length)

# Decorator for remotely execution functions to package things as json.
def remote_execute(func):
def remote_wrapper(event):
kwargs = event["fn_kwargs"]
Expand All @@ -21,108 +39,118 @@ def remote_wrapper(event):
return response
return remote_wrapper

read, write = os.pipe()
def send_configuration(config):
config_string = json.dumps(config)
config_cmd = f"{len(config_string) + 1}\n{config_string}\n"
sys.stdout.write(config_cmd)
sys.stdout.flush()
# Main loop of coprocess for executing network functions.
def main():
# Listen on an arbitrary port to be reported to the worker.
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# modify the port argument to be 0 to listen on an arbitrary port
s.bind(('localhost', 0))
except Exception as e:
s.close()
print(e, file=sys.stderr)
sys.exit(1)
# information to print to stdout for worker

# Inform the worker of name and port for later connection.
config = {
"name": name(), # noqa: F821
"port": s.getsockname()[1],
}
send_configuration(config)
"name": name(), # noqa: F821
"port": s.getsockname()[1],
}
send_message(sys.stdout, json.dumps(config))
sys.stdout.flush()

# Remember original working directory b/c we change for each invocation.
abs_working_dir = os.getcwd()

# Create pipe for communication with child process
rpipe, wpipe = os.pipe()
rpipestream = os.fdopen(rpipe, "r")

while True:
s.listen()
conn, addr = s.accept()
print('Network function: connection from {}'.format(addr), file=sys.stderr)
connstream = conn.makefile("rw", encoding="utf-8")

if debug_mode:
print('Network function: connection from {}'.format(addr), file=sys.stderr)

while True:
# peek at message to find newline to get the size
event_size = None
line = conn.recv(100, socket.MSG_PEEK)
eol = line.find(b'\n')
if eol >= 0:
size = eol+1
# actually read the size of the event
input_spec = conn.recv(size).decode('utf-8').split()
function_name = input_spec[0]
task_id = int(input_spec[1])
event_size = int(input_spec[2])
# Read the invocation header from the worker
line = connstream.readline()

# If end of file, then break out and accept again
if not line:
break

# Parse the invocation header.
input_spec = line.split()
function_name = input_spec[0]
task_id = int(input_spec[1])
event_size = int(input_spec[2])

# then read the contents of the event itself
event_str = connstream.read(event_size)
event = json.loads(event_str)
exec_method = event.get("remote_task_exec_method", None)

try:
if event_size:
# receive the bytes containing the event and turn it into a string
event_str = conn.recv(event_size).decode("utf-8")
# turn the event into a python dictionary
event = json.loads(event_str)
# see if the user specified an execution method
exec_method = event.get("remote_task_exec_method", None)
os.chdir(os.path.join(abs_working_dir, f't.{task_id}'))
if exec_method == "direct":
response = json.dumps(globals()[function_name](event)).encode("utf-8")
else:
p = os.fork()
if p == 0:
response =globals()[function_name](event)
os.write(write, json.dumps(response).encode("utf-8"))
os._exit(0)
elif p < 0:
# First move to target directory (is undone in finally block)
os.chdir(os.path.join(abs_working_dir, f't.{task_id}'))

# Then invoke function by desired method, resulting in
# response containing the text representation of the result.

if exec_method == "direct":
response = json.dumps(globals()[function_name](event))
else:
p = os.fork()
if p == 0:
response = globals()[function_name](event)
wpipestream = os.fdopen(wpipe, "w")
send_message(wpipestream, json.dumps(response))
wpipestream.flush()
os._exit(0)
elif p < 0:
if debug_mode:
print(f'Network function: unable to fork to execute {function_name}', file=sys.stderr)
response = {
"Result": "unable to fork",
"StatusCode": 500
}
else:
max_read = 65536
chunk = os.read(read, max_read).decode("utf-8")
all_chunks = [chunk]
while (len(chunk) >= max_read):
chunk = os.read(read, max_read).decode("utf-8")
all_chunks.append(chunk)
response = "".join(all_chunks).encode("utf-8")
os.waitpid(p, 0)
response_size = len(response)
size_msg = "{}\n".format(response_size)
# send the size of response
conn.sendall(size_msg.encode('utf-8'))
# send response
conn.sendall(response)
break
response = {
"Result": "unable to fork",
"StatusCode": 500
}
response = json.dumps(response)
else:
# Get response string from child process.
response = recv_message(rpipestream)
# Wait for child process to complete
os.waitpid(p, 0)

# At this point, response is set to a value one way or the other

except Exception as e:
print("Network function encountered exception ", str(e), file=sys.stderr)
if debug_mode:
print("Network function encountered exception ", str(e), file=sys.stderr)
response = {
'Result': f'network function encountered exception {e}',
'Status Code': 500
}
response = json.dumps(response).encode('utf-8')
response_size = len(response)
size_msg = "{}\n".format(response_size)
# send the size of response
conn.sendall(size_msg.encode('utf-8'))
# send response
conn.sendall(response)
response = json.dumps(response)
finally:
# Restore the working directory, no matter how the function ended.
os.chdir(abs_working_dir)

# Send response string back to parent worker process.
send_message(connstream, response)
connstream.flush()

return 0
def name():
return 'parsl_coprocess'
@remote_execute
def run_parsl_task(a, b, c):
import parsl.executors.workqueue.exec_parsl_function as epf
try:
map_file, function_file, result_file = (a, b, c)
(map_file, function_file, result_file) = (a, b, c)
try:
namespace, function_code, result_name = epf.load_function(map_file, function_file)
(namespace, function_code, result_name) = epf.load_function(map_file, function_file)
except Exception:
raise
try:
Expand Down

0 comments on commit cbbb996

Please sign in to comment.