This repository has been archived by the owner on Apr 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.nf
230 lines (162 loc) · 4.59 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/usr/bin/env nextflow
/*
========================================================================================
SETUP PARAMS
========================================================================================
*/
// Ensure DSL1
nextflow.enable.dsl = 1
// Default values
params.s3_prefix = false
params.parent_id = false
params.synapse_config = false
params.filename_string = false
if ( !params.s3_prefix ) {
exit 1, "Parameter 'params.s3_prefix' is required!\n"
}
if ( !params.parent_id ) {
exit 1, "Parameter 'params.parent_id' is required!\n"
}
matches = ( params.s3_prefix =~ '^s3://([^/]+)(?:/+([^/]+(?:/+[^/]+)*)/*)?$' ).findAll()
if ( matches.size() == 0 ) {
exit 1, "Parameter 'params.s3_prefix' must be an S3 URI (e.g., 's3://bucket-name/some/prefix/')!\n"
} else {
bucket_name = matches[0][1]
base_key = matches[0][2]
base_key = base_key ?: '/'
s3_prefix = "s3://${bucket_name}/${base_key}" // Ensuring common format
}
if ( !params.parent_id ==~ 'syn[0-9]+' ) {
exit 1, "Parameter 'params.parent_id' must be the Synapse ID of a folder (e.g., 'syn98765432')!\n"
}
ch_synapse_config = params.synapse_config ? Channel.value( file(params.synapse_config) ) : "null"
publish_dir = "${s3_prefix}/synindex/under-${params.parent_id}/"
/*
========================================================================================
SETUP PROCESSES
========================================================================================
*/
process get_user_id {
label 'synapse'
cache false
secret 'SYNAPSE_AUTH_TOKEN'
afterScript "rm -f ${syn_config}"
input:
file syn_config from ch_synapse_config
output:
stdout ch_user_id
script:
config_cli_arg = params.synapse_config ? "--config ${syn_config}" : ""
"""
get_user_id.py \
${config_cli_arg}
"""
}
process update_owner {
label 'aws'
input:
val user_id from ch_user_id
val s3_prefix from s3_prefix
output:
val true into ch_update_owner_done
script:
"""
( \
( aws s3 cp ${s3_prefix}/owner.txt - 2>/dev/null || true ); \
echo $user_id \
) \
| sort -u \
| aws s3 cp - ${s3_prefix}/owner.txt
"""
}
process register_bucket {
label 'synapse'
secret 'SYNAPSE_AUTH_TOKEN'
afterScript "rm -f ${syn_config}"
input:
val bucket from bucket_name
val base_key from base_key
file syn_config from ch_synapse_config
val flag from ch_update_owner_done
output:
stdout ch_storage_id
script:
config_cli_arg = params.synapse_config ? "--config ${syn_config}" : ""
"""
register_bucket.py \
--bucket ${bucket} \
--base_key ${base_key} \
${config_cli_arg}
"""
}
process list_objects {
label 'aws'
input:
val s3_prefix from s3_prefix
val bucket from bucket_name
val filename_string from params.filename_string
output:
path 'objects.txt' into ch_objects
script:
"""
aws s3 ls ${s3_prefix}/ --recursive \
| grep -v -e '/\$' -e 'synindex/under-' -e 'owner.txt\$' \
-e 'synapseConfig' -e 'synapse_config' \
| awk '{\$1=\$2=\$3=""; print \$0}' \
| sed 's|^ |s3://${bucket}/|' \
${filename_string ? "| grep '${filename_string}'" : ""} \
> objects.txt
"""
}
process synapse_mirror {
label 'synapse'
secret 'SYNAPSE_AUTH_TOKEN'
afterScript "rm -f ${syn_config}"
publishDir publish_dir, mode: 'copy'
input:
path objects from ch_objects
val s3_prefix from s3_prefix
val parent_id from params.parent_id
file syn_config from ch_synapse_config
output:
path 'parent_ids.csv' into ch_parent_ids_csv
script:
config_cli_arg = params.synapse_config ? "--config ${syn_config}" : ""
"""
synmirror.py \
--objects ${objects} \
--s3_prefix ${s3_prefix} \
--parent_id ${parent_id} \
${config_cli_arg} \
> parent_ids.csv
"""
}
// Parse list of object URIs and their Synapse parents
ch_parent_ids_csv
.text
.splitCsv()
.map { row -> [ row[0], file(row[0]), row[1] ] }
.set { ch_parent_ids }
process synapse_index {
label 'synapse'
secret 'SYNAPSE_AUTH_TOKEN'
afterScript "rm -f ${syn_config}"
input:
tuple val(uri), file(object), val(parent_id) from ch_parent_ids
val storage_id from ch_storage_id
file syn_config from ch_synapse_config
output:
stdout ch_file_ids
script:
config_cli_arg = params.synapse_config ? "--config ${syn_config}" : ""
"""
synindex.py \
--storage_id ${storage_id} \
--file ${object} \
--uri '${uri}' \
--parent_id ${parent_id} \
${config_cli_arg}
"""
}
ch_file_ids
.collectFile(name: "file_ids.csv", storeDir: publish_dir, newLine: true)