-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmacmpi.py
executable file
·208 lines (156 loc) · 5.8 KB
/
macmpi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python3
# macmpi - tool for running MPI processes in iTerm2 tabs
#
# Adapted from tmux-mpi by Will Saunders, https://github.com/wrs20/tmux-mpi
import iterm2
import sys
import subprocess
import tempfile
import os
import glob
import time
import shutil
import pty
import atexit
import psutil
import shlex
PROGRAM_PATH = os.path.abspath(__file__)
MPI_EXEC = shlex.split(os.environ.get("MACMPI_MPIRUN", "mpiexec"))
TAB_PANE_MODE = os.environ.get("MACMPI_MODE", "tab")
if TAB_PANE_MODE != "tab" and TAB_PANE_MODE != "pane":
sys.exit("MACMPI_MODE must be either 'tab' or 'pane'")
def check_dtach():
dtach = shutil.which("dtach")
if dtach == None:
raise RuntimeError("This tool requires dtach. We could not find dtach using which.")
def print_help():
print(
"""macmpi is a tool for running MPI processes in iTerm2 tabs. Run with:
macmpi <nproc> <executable>
If the program crashes there are likely to be dtach instances that will need
manually cleaning up. See README.rst for configuration.
"""
)
exit(-1)
def check_args():
if len(sys.argv) < 3:
print_help()
class TerminalSession:
def __init__(self):
self.connection = None
self.app = None
self.window = None
self.n = 0
async def connect(self):
self.connection = await iterm2.Connection.async_create()
async def add(self, n):
self.app = await iterm2.async_get_app(self.connection)
self.window = await iterm2.Window.async_create(self.connection)
self.n = n
if TAB_PANE_MODE == "tab":
for px in range(n - 1):
await self.window.async_create_tab()
await self.window.tabs[0].async_activate()
elif TAB_PANE_MODE == "pane":
for px in range(n - 1):
await self.window.tabs[0].root.sessions[0].async_split_pane(vertical=True)
await self.window.tabs[0].sessions[0].async_activate()
await self.app.async_activate(False)
async def get_session(self, ix):
if TAB_PANE_MODE == "tab":
return self.window.tabs[ix].current_session
elif TAB_PANE_MODE == "pane":
return self.window.tabs[0].root.sessions[ix]
async def send_keys(self, ix, keys):
session = await self.get_session(ix)
await session.async_send_text(keys)
async def send_enter(self):
for ix in range(self.n):
session = await self.get_session(ix)
await session.async_send_text("\n")
async def cleanup(self):
try:
await self.window.async_close()
except Exception:
pass
_cleanup = []
def cleanup():
for cx in _cleanup:
cx()
async def main(connection):
# register the atexit function that tries to cleanup if needed
atexit.register(cleanup)
it2session = TerminalSession()
await it2session.connect()
nproc = int(sys.argv[1])
cmd = sys.argv[2:]
# create n windows or panes
await it2session.add(nproc)
# directory for dtach sockets
temp_dir = tempfile.TemporaryDirectory(prefix="it2-mpi")
# do the mpi launch
launch_cmd = MPI_EXEC + ["-n", str(nproc), sys.executable, PROGRAM_PATH, "DTACH_CHILD", temp_dir.name] + cmd
mpiproc = subprocess.Popen(launch_cmd)
# Wait for all the dtach processes to create sockets before trying to connect to them
def get_socket_files():
return sorted(glob.glob(os.path.join(temp_dir.name, "*", "dtach.socket")))
time.sleep(0.2)
socket_files = get_socket_files()
while len(socket_files) != nproc:
print("Waiting for dtach sockets to appear. Found {} out of {}.".format(len(socket_files), nproc))
time.sleep(0.2)
socket_files = get_socket_files()
print("Waiting for dtach sockets to appear. Found {} out of {}.".format(len(socket_files), nproc))
mpiproc_children = psutil.Process(mpiproc.pid).children(recursive=True)
# run the launch command in each window or pane
for px in range(nproc):
win_cmd = "dtach -a " + socket_files[px] + "\n"
await it2session.send_keys(px, win_cmd)
# loop over the iterm tabs and send a newline to allow the execution to continue
await it2session.send_enter()
def cleanup_mpi():
try:
temp_dir.cleanup()
except Exception as e:
print(e)
try:
mpiproc.kill()
except Exception as e:
pass
for pidx in mpiproc_children:
try:
pidx.kill()
except Exception as e:
pass
_cleanup.append(cleanup_mpi)
# Try to terminate cleanly
mpiproc.communicate()
a = input("\nPress Enter to close iTerm window and quit")
await it2session.cleanup()
def dtach_child():
"""
Creates a new dtach instance with a socket in the temp dir that runs this script again to invoke exec_child.
"""
dtach_socket = os.path.join(tempfile.mkdtemp(prefix=str(os.getpid()) + "_", dir=sys.argv[2]), "dtach.socket")
cmd = sys.argv[3:]
dtach_cmd = ["dtach", "-N", dtach_socket, sys.executable, PROGRAM_PATH, "EXEC_CHILD", sys.argv[2]] + cmd
# Using execv worked for mpich/openmpi but not intel MPI, using pty.spawn seems to keep intel MPI happy
pty.spawn(dtach_cmd)
def exec_child():
"""
Waits for the newline from iTerm then runs the user command.
"""
# Wait for the newline to be send that indicates all the iTerm windows are connected.
a = input("Waiting for iTerm windows to all be connected...\n")
# launch the actual user command
cmd = sys.argv[3:]
os.execv(shutil.which(cmd[0]), cmd)
if __name__ == "__main__":
check_dtach()
check_args()
if sys.argv[1] == "DTACH_CHILD":
dtach_child()
elif sys.argv[1] == "EXEC_CHILD":
exec_child()
else:
iterm2.run_until_complete(main)