forked from ReproNim/simple_workflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run_demo_workflow.py
executable file
·187 lines (159 loc) · 7.46 KB
/
run_demo_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#/usr/bin/env python
"""Run a demo workflow that retrieves brain images and processes them
"""
import os
from nipype import config
config.enable_provenance()
from nipype import Workflow, Node, MapNode, Function
from nipype.interfaces.fsl import BET, FAST, FIRST, Reorient2Std, ImageMaths, ImageStats
from nipype.interfaces.io import DataSink
def download_file(url):
"""Download file for a given participant"""
import requests
import os
URL = 'http://www.nitrc.org/ir/'
session = requests.session()
r = session.get(URL)
local_filename = url.split('/')[-1]
r = session.get(url, stream=True, cookies=r.cookies)
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
return os.path.abspath(local_filename)
def toJSON(stats, seg_file, structure_map):
"""Combine stats files to a single JSON file"""
import json
import os
import nibabel as nb
import numpy as np
img = nb.load(seg_file)
data = img.get_data()
voxel2vol = np.prod(img.header.get_zooms())
idx = np.unique(data)
reverse_map = {k:v for v, k in structure_map}
out_dict = dict(zip([reverse_map[val] for val in idx], np.bincount(data.flatten())[idx]))
for key in out_dict.keys():
out_dict[key] = [out_dict[key], voxel2vol * out_dict[key]]
mapper = dict([(0, 'csf'), (1, 'gray'), (2, 'white')])
out_dict.update(**{mapper[idx]: val for idx, val in enumerate(stats)})
out_file = 'segstats.json'
with open(out_file, 'wt') as fp:
json.dump(out_dict, fp, sort_keys=True, indent=4, separators=(',', ': '))
return os.path.abspath(out_file)
def create_workflow(subject_id, outdir, file_url):
"""Create a workflow for a single participant"""
sink_directory = os.path.join(outdir, subject_id)
wf = Workflow(name=subject_id)
getter = Node(Function(input_names=['url'], output_names=['localfile'],
function=download_file), name="download_url")
getter.inputs.url = file_url
orienter = Node(Reorient2Std(), name='reorient_brain')
wf.connect(getter, 'localfile', orienter, 'in_file')
better = Node(BET(), name='extract_brain')
wf.connect(orienter, 'out_file', better, 'in_file')
faster = Node(FAST(), name='segment_brain')
wf.connect(better, 'out_file', faster, 'in_files')
firster = Node(FIRST(), name='parcellate_brain')
structures = ['L_Hipp', 'R_Hipp',
'L_Accu', 'R_Accu',
'L_Amyg', 'R_Amyg',
'L_Caud', 'R_Caud',
'L_Pall', 'R_Pall',
'L_Puta', 'R_Puta',
'L_Thal', 'R_Thal']
firster.inputs.list_of_specific_structures = structures
wf.connect(orienter, 'out_file', firster, 'in_file')
fslstatser = MapNode(ImageStats(), iterfield=['op_string'], name="compute_segment_stats")
fslstatser.inputs.op_string = ['-l {thr1} -u {thr2} -v'.format(thr1=val + 0.5, thr2=val + 1.5) for val in range(3)]
wf.connect(faster, 'partial_volume_map', fslstatser, 'in_file')
jsonfiler = Node(Function(input_names=['stats', 'seg_file', 'structure_map', 'struct_file'],
output_names=['out_file'],
function=toJSON), name='save_json')
structure_map = [('Background', 0),
('Left-Thalamus-Proper', 10),
('Left-Caudate', 11),
('Left-Putamen', 12),
('Left-Pallidum', 13),
('Left-Hippocampus', 17),
('Left-Amygdala', 18),
('Left-Accumbens-area', 26),
('Right-Thalamus-Proper', 49),
('Right-Caudate', 50),
('Right-Putamen', 51),
('Right-Pallidum', 52),
('Right-Hippocampus', 53),
('Right-Amygdala', 54),
('Right-Accumbens-area', 58)]
jsonfiler.inputs.structure_map = structure_map
wf.connect(fslstatser, 'out_stat', jsonfiler, 'stats')
wf.connect(firster, 'segmentation_file', jsonfiler, 'seg_file')
sinker = Node(DataSink(), name='store_results')
sinker.inputs.base_directory = sink_directory
wf.connect(better, 'out_file', sinker, 'brain')
wf.connect(faster, 'bias_field', sinker, 'segs.@bias_field')
wf.connect(faster, 'partial_volume_files', sinker, 'segs.@partial_files')
wf.connect(faster, 'partial_volume_map', sinker, 'segs.@partial_map')
wf.connect(faster, 'probability_maps', sinker, 'segs.@prob_maps')
wf.connect(faster, 'restored_image', sinker, 'segs.@restored')
wf.connect(faster, 'tissue_class_files', sinker, 'segs.@tissue_files')
wf.connect(faster, 'tissue_class_map', sinker, 'segs.@tissue_map')
wf.connect(firster, 'bvars', sinker, 'parcels.@bvars')
wf.connect(firster, 'original_segmentations', sinker, 'parcels.@origsegs')
wf.connect(firster, 'segmentation_file', sinker, 'parcels.@segfile')
wf.connect(firster, 'vtk_surfaces', sinker, 'parcels.@vtk')
wf.connect(jsonfiler, 'out_file', sinker, '@stats')
return wf
if __name__ == '__main__':
from argparse import ArgumentParser, RawTextHelpFormatter
defstr = ' (default %(default)s)'
parser = ArgumentParser(description=__doc__,
formatter_class=RawTextHelpFormatter)
parser.add_argument("--key", dest="key",
help="google docs key")
parser.add_argument("-o", "--output_dir", dest="sink_dir", default='output',
help="Sink directory base")
parser.add_argument("-w", "--work_dir", dest="work_dir",
help="Output directory base")
parser.add_argument("-p", "--plugin", dest="plugin",
default='MultiProc',
help="Plugin to use")
parser.add_argument("--plugin_args", dest="plugin_args",
help="Plugin arguments")
parser.add_argument("-n", dest="num_subjects", type=int,
help="Number of subjects")
args = parser.parse_args()
if args.work_dir:
work_dir = os.path.abspath(args.work_dir)
else:
work_dir = os.getcwd()
sink_dir = os.path.abspath(args.sink_dir)
from StringIO import StringIO # got moved to io in python3.
import requests
import pandas as pd
#key = '11an55u9t2TAf0EV2pHN0vOd8Ww2Gie-tHp9xGULh_dA'
r = requests.get('https://docs.google.com/spreadsheets/d/{key}/export?format=csv&id={key}'.format(key=args.key))
data = r.content
df = pd.read_csv(StringIO(data))
max_subjects = df.shape[0]
if args.num_subjects:
max_subjects = args.num_subjects
elif ('CIRCLECI' in os.environ and os.environ['CIRCLECI'] == 'true'):
max_subjects = 1
meta_wf = Workflow('metaflow')
count = 0
for row in df.iterrows():
wf = create_workflow(row[1].Subject, sink_dir, row[1]['File Path'])
meta_wf.add_nodes([wf])
print('Added workflow for: {}'.format(row[1].Subject))
count = count + 1
# run this for only one person on CircleCI
if count >= max_subjects:
break
meta_wf.base_dir = work_dir
meta_wf.config['execution']['remove_unnecessary_files'] = False
meta_wf.config['execution']['poll_sleep_duration'] = 2
if args.plugin_args:
meta_wf.run(args.plugin, plugin_args=eval(args.plugin_args))
else:
meta_wf.run(args.plugin)