This repository has been archived by the owner on Oct 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dispatch.py
151 lines (141 loc) · 5.36 KB
/
dispatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
############################################################################
# #
# Copyright (c) 2017 eBay Inc. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); #
# you may not use this file except in compliance with the License. #
# You may obtain a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
# #
############################################################################
from __future__ import print_function
from __future__ import division
import os
import time
from signal import SIGTERM, SIGKILL
from compat import PY3
from status_messaging import statmsg
from status import children, statmsg_endwait
from extras import json_encode
class JobError(Exception):
def __init__(self, jobid, method, status):
Exception.__init__(self, "Failed to build %s (%s)" % (jobid, method,))
self.jobid = jobid
self.method = method
self.status = status
def format_msg(self):
res = ["%s (%s):" % (self.jobid, self.method,)]
for component, msg in self.status.items():
res.append(" %s:" % (component,))
res.append(" %s" % (msg.replace("\n", "\n "),))
return "\n".join(res)
valid_fds = []
def update_valid_fds():
# Collect all valid fds, so we can close them in job processes
global valid_fds
valid_fds = []
from fcntl import fcntl, F_GETFD
from resource import getrlimit, RLIMIT_NOFILE
for fd in range(3, getrlimit(RLIMIT_NOFILE)[0]):
try:
fcntl(fd, F_GETFD)
valid_fds.append(fd)
except Exception:
pass
def close_fds(keep):
for fd in valid_fds:
# Apparently sometimes one of them has gone away.
# That's a little worrying, so try to protect our stuff (and ignore errors).
try:
if fd not in keep:
os.close(fd)
except OSError:
pass
def run(cmd, close_in_child, keep_in_child, with_pgrp=True):
child = os.fork()
if child:
return child
if with_pgrp:
os.setpgrp() # this pgrp is killed if the job fails
for fd in close_in_child:
os.close(fd)
status_fd = int(os.getenv('BD_STATUS_FD'))
keep_in_child = set(keep_in_child)
keep_in_child.add(status_fd)
close_fds(keep_in_child)
# unreadable stdin - less risk of stuck jobs
devnull = os.open('/dev/null', os.O_RDONLY)
os.dup2(devnull, 0)
os.close(devnull)
if PY3:
keep_in_child.update([1, 2])
for fd in keep_in_child:
os.set_inheritable(fd, True)
os.execv(cmd[0], cmd)
os._exit()
def launch(workdir, setup, config, Methods, active_workspaces, slices, debug, daemon_url, subjob_cookie, parent_pid):
starttime = time.time()
jobid = setup.jobid
method = setup.method
if subjob_cookie:
print_prefix = ''
else:
print_prefix = ' '
print('%s| %s [%s] |' % (print_prefix, jobid, method,))
statmsg('| %s [%s] |' % (jobid, method,))
args = dict(
workdir=workdir,
slices=slices,
jobid=jobid,
result_directory=config.get('result_directory', ''),
common_directory=config.get('common_directory', ''),
source_directory=config.get('source_directory', ''),
workspaces=active_workspaces,
daemon_url=daemon_url,
subjob_cookie=subjob_cookie,
parent_pid=parent_pid,
)
from runner import runners
runner = runners[Methods.db[method].version]
child, prof_r = runner.launch_start(args)
# There's a race where if we get interrupted right after fork this is not recorded
# (the launched job could continue running)
try:
children.add(child)
status, data = runner.launch_finish(child, prof_r, workdir, jobid, method)
if status:
os.killpg(child, SIGTERM) # give it a chance to exit gracefully
msg = json_encode(status, as_str=True)
print('%s| %s [%s] failed! (%5.1fs) |' % (print_prefix, jobid, method, time.time() - starttime))
statmsg('| %s [%s] failed! |' % (jobid, method))
statmsg(msg)
time.sleep(1) # give it a little time to do whatever cleanup it feels the need to do
# There is a race where stuff on the status socket has not arrived when
# the sending process exits. This is basically benign, but let's give
# it a chance to arrive to cut down on confusing warnings.
statmsg_endwait(child, 0.25)
finally:
try:
os.killpg(child, SIGKILL) # this should normally be a no-op, but in case it left anything.
except Exception:
pass
try:
children.remove(child)
except Exception:
pass
try:
os.waitpid(child, 0) # won't block (we just killed it, plus it had probably already exited)
except Exception:
pass
if status:
raise JobError(jobid, method, status)
print('%s| %s [%s] completed. (%5.1fs) |' % (print_prefix, jobid, method, time.time() - starttime))
statmsg('| %s [%s] completed. |' % (jobid, method))
return data