-
Notifications
You must be signed in to change notification settings - Fork 2
/
run_the_matrix_alca.py
304 lines (256 loc) · 11.1 KB
/
run_the_matrix_alca.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""
PdmV's simplified implementation of runTheMatrix.py used for AlCaDB
"""
from __future__ import print_function
import sys
import argparse
import json
import importlib
import inspect
import re
#pylint: disable=wrong-import-position,import-error
import Configuration.PyReleaseValidation.relval_steps as steps_module
import alcaval_steps as alcasteps_module
from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector
#pylint: enable=wrong-import-position,import-error
def clean_split(string, separator=','):
"""
Split a string by separator and collect only non-empty values
"""
return [x.strip() for x in string.split(separator) if x.strip()]
def get_wmsplit():
"""
Get wmsplit dictionary from MatrixInjector prepare() method
"""
try:
src = MatrixInjector.get_wmsplit()
return src
except Exception:
try:
src = inspect.getsource(MatrixInjector.prepare)
src = [x.strip() for x in src.split('\n') if 'wmsplit' in x]
src = [x.replace(' ', '') for x in src if not x.startswith('#')]
src = [x for x in src if re.match('wmsplit\\[.*\\]=', x)]
src = [x.replace('wmsplit[\'', '').replace('\']', '') for x in src]
src = {x[0]: x[1] for x in [x.split('=') for x in src]}
return src
except Exception as ex:
print(ex)
return {}
def extract_events_per_lumi(step):
"""
Extract process.source.numberEventsInLuminosityBlock value from a step it it exists
"""
customise_commands = step.get('--customise_commands', '')
if 'process.source.numberEventsInLuminosityBlock' not in customise_commands:
return None
regex = 'process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\\(([0-9]*)\\)'
events_per_lumi = re.findall(regex, customise_commands)
if not events_per_lumi or not events_per_lumi[-1].isdigit():
return None
events_per_job = int(step.get('--relval', '').split(',')[1])
events_per_lumi = int(events_per_lumi[-1])
customise_commands = re.sub(regex, '', customise_commands).replace('""', '')
if not customise_commands:
del step['--customise_commands']
else:
step['--customise_commands'] = customise_commands
# Events per lumi has to be less or equal to events per job
return min(events_per_lumi, events_per_job)
def split_command_to_dict(command):
"""
Split string command into a dictionary
"""
command_dict = {}
# Split by spaces
command = [x for x in command.strip().split(' ') if x.strip()]
# Split by equal signs
command = [x.split('=', 1) for x in command]
# Flatten the list
command = [x for command_part in command for x in command_part]
for index, value in enumerate(command):
if value.startswith('-'):
if index + 1 < len(command) and not command[index + 1].startswith('-'):
command_dict[value] = command[index + 1]
else:
command_dict[value] = ''
return command_dict
def get_workflows_module(name):
"""
Load a specified module from Configuration.PyReleaseValidation
"""
workflows_module_name = 'Configuration.PyReleaseValidation.relval_' + name
if name=='alca':
workflows_module_name = 'relval_' + name
workflows_module = importlib.import_module(workflows_module_name)
print('Loaded %s. Found %s workflows inside' % (workflows_module_name,
len(workflows_module.workflows)))
return workflows_module
def get_workflow_name(matrix):
"""
Get workflow name if it is present
"""
workflow_name = matrix[0]
if isinstance(workflow_name, list):
if workflow_name:
workflow_name = workflow_name[0]
else:
workflow_name = ''
print('Workflow name: %s' % (workflow_name))
return workflow_name
def should_apply_additional_command(workflow_step, command_steps):
"""
Return whether workflow step includes steps specified in "command_steps"
and should have additional command applied to it
"""
if not command_steps:
return True
if '-s' in workflow_step:
steps = workflow_step['-s']
elif '--step' in workflow_step:
steps = workflow_step['--step']
else:
return True
steps = set(clean_split(steps))
should_apply = len(command_steps & steps) > 0
print('Workflow step steps: %s, command_steps: %s, should apply: %s',
steps,
command_steps,
should_apply)
return should_apply
def merge_additional_command(workflow_step, command):
"""
Merge workflow arguments with additional parameters provided by user
"""
command_dict = split_command_to_dict(command)
if '--step' in command_dict:
command_dict['-s'] = command_dict.pop('--step')
if '--number' in command_dict:
command_dict['-n'] = command_dict.pop('--number')
print('Merging user commands %s' % (command_dict))
print('Merging to %s' % (workflow_step))
return steps_module.merge([command_dict, workflow_step])
def make_relval_step(workflow_step, workflow_step_name, wmsplit):
"""
Build one workflow step - either input dataset or cmsDriver
"""
step = {'name': workflow_step_name}
if workflow_step_name in wmsplit:
step['lumis_per_job'] = wmsplit[workflow_step_name]
elif 'INPUT' in workflow_step:
step['lumis_per_job'] = workflow_step['INPUT'].split
else:
# Default to 10
step['lumis_per_job'] = 10
if 'INPUT' in workflow_step:
# This step has input dataset
step_input = workflow_step['INPUT']
step['input'] = {'dataset': step_input.dataSet,
'lumisection': step_input.ls,
'run': step_input.run,
'label': step_input.label,
'events': step_input.events}
else:
# This is cmsDriver step
# Rename some arguments
if '-s' in workflow_step:
workflow_step['--step'] = workflow_step.pop('-s')
if 'cfg' in workflow_step:
workflow_step['fragment_name'] = workflow_step.pop('cfg')
if '-n' in workflow_step:
workflow_step['--number'] = workflow_step.pop('-n')
# Change "flags" value to True, e.g. --data, --mc, --fast
for arg_name, arg_value in workflow_step.items():
if arg_value == '':
workflow_step[arg_name] = True
events_per_lumi = extract_events_per_lumi(workflow_step)
if events_per_lumi:
step['events_per_lumi'] = events_per_lumi
step['arguments'] = workflow_step
print(step)
return step
def main():
"""
Main
"""
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--list',
dest='workflow_ids',
help='Comma separated list of workflow ids')
parser.add_argument('-w', '--what',
dest='matrix_name',
help='RelVal workflows file: standard, upgrade, ...')
parser.add_argument('-c', '--command',
dest='command',
help='Additional command to add to each cmsDriver')
parser.add_argument('-cs', '--command_steps',
dest='command_steps',
help='Specify which RelVal steps should have additional command applied',
default='')
parser.add_argument('-o', '--output',
dest='output_file',
help='Output file name')
parser.add_argument('-r', '--recycle_gs',
dest='recycle_gs',
action='store_true',
help='Recycle GS')
opt = parser.parse_args()
workflow_ids = sorted(list({float(x) for x in opt.workflow_ids.split(',')}))
print('Given workflow ids (%s): %s' % (len(workflow_ids), workflow_ids))
print('Workflows file: %s' % (opt.matrix_name))
print('User given command: %s (%s)' % (opt.command, opt.command_steps))
print('Output file: %s' % (opt.output_file))
print('Recycle GS: %s' % (opt.recycle_gs))
workflows_module = get_workflows_module(opt.matrix_name)
command_steps = set(clean_split(opt.command_steps))
# wmsplit is a dictionary with LumisPerJob values
wmsplit = get_wmsplit()
workflows = {}
for workflow_id in workflow_ids:
print('Getting %s workflow' % (workflow_id))
# workflow_matrix is a list where first element is the name of workflow
# and second element is list of step names
# if workflow name is not present, first step name is used
if workflow_id not in workflows_module.workflows:
print('Can\'t find %s in %s matrix' % (workflow_id, opt.matrix_name), file=sys.stderr)
sys.exit(1)
workflow_matrix = workflows_module.workflows[workflow_id]
print('Matrix: %s' % (workflow_matrix))
workflows[workflow_id] = {'steps': [], 'workflow_name': get_workflow_name(workflow_matrix)}
if workflow_matrix.overrides:
print('Overrides: %s' % (workflow_matrix.overrides))
# Go through steps and get the arguments
steps = steps_module.steps | alcasteps_module.steps
for workflow_step_index, workflow_step_name in enumerate(workflow_matrix[1]):
print('\nStep %s. %s' % (workflow_step_index + 1, workflow_step_name))
if workflow_step_index == 0 and opt.recycle_gs:
# Add INPUT to step name to recycle GS
workflow_step_name += 'INPUT'
print('Step name changed to %s to recycle input' % (workflow_step_name))
if workflow_step_name not in steps:
print('Could not find %s in steps module' % (workflow_step_name),
file=sys.stderr)
sys.exit(1)
# Merge user command, workflow and overrides
workflow_step = steps[workflow_step_name]
if workflow_step is None:
print('Workflow step %s is none, skipping it' % (workflow_step_name))
continue
# Because first item in the list has highest priority
print('Step: %s' % (workflow_step))
workflow_step = steps_module.merge([workflow_matrix.overrides,
workflow_step])
if opt.command and should_apply_additional_command(workflow_step, command_steps):
workflow_step = merge_additional_command(workflow_step, opt.command)
workflows[workflow_id]['steps'].append(make_relval_step(workflow_step,
workflow_step_name,
wmsplit))
# Additional newline inbetween each workflow
print('\n')
print('All workflows:')
print(json.dumps(workflows, indent=2, sort_keys=True))
if opt.output_file:
with open(opt.output_file, 'w') as workflows_file:
json.dump(workflows, workflows_file)
if __name__ == '__main__':
main()