This repository has been archived by the owner on Feb 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
gff_to_genepred_converter.py
702 lines (527 loc) · 23.6 KB
/
gff_to_genepred_converter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
"""
Converts a gff3 file to genePred
"""
import operator
import shutil
import stat
import subprocess
import sys
import argparse
import os
import tempfile
from shutil import copyfile
from six.moves.urllib.request import urlopen
from six.moves.urllib.error import URLError, HTTPError
import zipfile
from collections import OrderedDict
"""
sorting order for non integer chromosomes
"""
sorting_order = {'X': 100, 'Y': 101, 'MT': 102}
"""
download URL to the gff3ToGenePred tool
"""
gff_converter_url = "http://hgdownload.cse.ucsc.edu/admin/exe/linux.x86_64/gff3ToGenePred"
def parse_args():
"""
parses the arguments
:return: argparse object containing all provided arguments
"""
print("parsing args...")
parser = argparse.ArgumentParser(description="Converts gff3 files into genePred format")
parser.add_argument("gff_file", help="file path to the gff3 input file (with ensembl annotation)")
parser.add_argument("hgnc_file",
help="file path to the the HGNC table file (containing HGNC id <-> gene name mapping")
parser.add_argument("genome_file", help="file path to a IGV .genome file which is then used to generate own .genome"
+ " file with the generated annotation file")
parser.add_argument("output", help="file path for the generated output IGV .genome file")
return parser.parse_args()
def validate_args(args):
"""
validates all given parameters
:param args: argparse object containing all given arguments
:return: True if parameter are correct, else False
"""
print("validating args...")
# check gff file
if not os.path.isfile(args.gff_file):
sys.stderr.write("gff file not found in %s" % args.gff)
return False
# check HGNC file
if not os.path.isfile(args.hgnc_file):
sys.stderr.write("hgnc mapping file not found in %s" % args.gff)
return False
# check genome file
if not os.path.isfile(args.genome_file):
sys.stderr.write("genome file not found in %s" % args.gff)
return False
return True
def read_gff_file(gff_file_path):
"""
reads the gff file and returns it divided in header and content
:param gff_file_path: file path to the gff file
:return:
header list of all header lines
content content of the gff file as list of lists
"""
print("reading gff file...")
with open(gff_file_path, 'r') as gff_file:
data = gff_file.readlines()
header = []
content = []
replaced_genes = 0
replaced_transcripts = 0
for line in data:
if line.startswith('#'):
if not line.startswith('###'):
header.append(line.strip())
else:
# ignore GL000xxx entries:
if line.startswith('GL000'):
continue
# ignore KI270xxx entries:
if line.startswith('KI270'):
continue
# split line by tab
split_line = line.strip().split('\t')
# treat all entries with ENST id as transcript and entries with ENSG id as genes
# generate temp dict with all values:
meta_data_dict = {entry.split('=')[0]: entry.split('=')[1] for entry in split_line[8].split(';')}
if "ID" in meta_data_dict.keys():
if meta_data_dict["ID"].startswith("transcript:ENST") and split_line[2] != "transcript":
split_line[2] = "transcript"
replaced_transcripts += 1
elif meta_data_dict["ID"].startswith("gene:ENSG") and split_line[2] != "gene" and \
not split_line[2].endswith("_gene_segment"):
split_line[2] = "gene"
replaced_genes += 1
content.append(split_line)
# report replaced genes and transcripts
print("\t {} entries with ENSG ids are treated as genes".format(replaced_genes))
print("\t {} entries with ENST ids are treated as transcripts".format(replaced_transcripts))
return header, content
def sort_gff(content):
"""
sorts the content gff file based on the sorting order defined above
:param content: content of the gff file (without headers)
:return: sorted list of lists
"""
print("sorting gff data...")
# preprocess chromosome column and positioning column
for line in content:
if line[0] in sorting_order:
line[0] = sorting_order[line[0]]
else:
line[0] = int(line[0])
line[3] = int(line[3])
line[4] = int(line[4])
# sort content
content.sort(key=operator.itemgetter(0, 3))
# create reverse sorting_order:
reverse_sorting_order = {}
for k, v in sorting_order.items():
reverse_sorting_order[v] = k
# replace placeholder with original chromosome names:
for line in content:
if line[0] in reverse_sorting_order:
line[0] = reverse_sorting_order[line[0]]
return content
def write_gff(gff_file_handle, header, content):
"""
writes the header and content to a gff file
:param gff_file_handle: file handle for the gff file
:param header: header lines (starting with #)
:param content: content of the gff file (as list of lists)
:return:
"""
print("writing gff file...")
# write header:
for line in header:
gff_file_handle.write(line)
gff_file_handle.write("\n")
# write content:
for line in content:
gff_file_handle.write("\t".join(str(e) for e in line))
gff_file_handle.write("\n")
return
def setup_gff_converter(file_handle):
"""
downloads the gff converter tool from the ucsc website
:param file_handle: file handle to save the downloaded binary
:return:
"""
print("initializing gff3ToGenePred converter...")
# download gff converter
try:
print("downloading converter from: " + gff_converter_url)
f = urlopen(gff_converter_url)
# write downloaded file to disk
file_handle.write(f.read())
# handle errors
except HTTPError as e:
print("HTTP Error:", e.code, gff_converter_url)
except URLError as e:
print("URL Error:", e.reason, gff_converter_url)
# make tool executable:
os.chmod(file_handle.name, os.stat(file_handle.name).st_mode | stat.S_IEXEC)
def run_gff_converter(converter_file_handle, gff_file_handle, gene_pred_file_handle):
"""
converts a gff3 file into the genePred format using the gff3ToGenePred tool from the ucsc website
:param converter_file_handle: file handle for the gff3ToGenePred binary
:param gff_file_handle: file handle for the input gff3 file
:param gene_pred_file_handle: file handle for the genePred output file
:return: gene_pred_file_handle: containing the modified and reopened tempfile
"""
print("converting gff file...")
# close temporary files before converting
converter_file_handle.close()
gff_file_handle.close()
gene_pred_file_handle.close()
# concat the command
cmd = [converter_file_handle.name, gff_file_handle.name, gene_pred_file_handle.name]
subprocess.call(cmd)
# delete sorted gff file and converter (not required anymore)
os.remove(gff_file_handle.name)
os.remove(converter_file_handle.name)
return open(gene_pred_file_handle.name, "r")
def read_hgnc_file(file_path):
"""
reads a hgnc file and returns two dictionaries gene->id and id->gene
:param file_path: file path to the HGNC file
:return:
gene_to_hgnc: dict mapping genes to HGNC ids
hgnc_to_gene: dict mapping HGNC ids to gene names
"""
print("reading HGNC file...")
# read hgnc file
with open(file_path, 'r') as hgnc_file:
hgnc_data = hgnc_file.readlines()
# process data
gene_to_hgnc = {}
hgnc_to_gene = {}
alt_gene_names = {}
valid_gene_names = []
# start at 1 to skip header
for line in hgnc_data[1:]:
split_line = line.split('\t')
hgnc_id = int(split_line[0].split(':')[1])
gene_name = split_line[1]
# store in dicts
gene_to_hgnc[gene_name] = hgnc_id
hgnc_to_gene[hgnc_id] = gene_name
# create list of alternative gene names which can uniquely be mapped to a valid HGNC name:
alt_names = []
if split_line[8].strip() != "":
if split_line[8].strip().startswith("\""):
alt_names += split_line[8].strip()[1:-1].upper().split('|')
else:
alt_names.append(split_line[8].strip().upper())
if split_line[10].strip() != "":
if split_line[10].strip().startswith("\""):
alt_names += split_line[10].strip()[1:-1].upper().split('|')
else:
alt_names.append(split_line[10].strip().upper())
alt_names = set(alt_names)
for alt_name in alt_names:
if alt_name in alt_gene_names.keys():
# alt name is not unique -> remove from list
del alt_gene_names[alt_name]
else:
# new alt name <-> hgnc name relation
alt_gene_names[alt_name] = gene_name
print("\t %i HGNC ids read" % len(hgnc_to_gene))
return gene_to_hgnc, hgnc_to_gene, alt_gene_names
def generate_ensg_hgnc_mapping(gff_file_path, valid_hgnc_ids, alt_gene_names):
"""
extracts a ENSG<->HGNC mapping from the gff3 file
:param gff_file_path: file path to the gff3 file
:param valid_hgnc_ids: set with valid HGNC ids
:param alt_gene_names: dict containing mapping from older/alternative gene names to valid HGNC genes
:return:
ensg_to_hgnc: dict mapping ensembl gene id to HGNC ids
hgnc_to_ensg: dict mapping HGNC ids to ensembl gene id
"""
print("generating ensembl gene id <-> HGNC mapping...")
ensg_to_hgnc = {}
hgnc_to_ensg = {}
ensg_to_non_hgnc_gene = {}
non_hgnc_gene_to_ensg = {}
ignored_genes = []
ignored_genes_dots = []
updated_gene_names = 0
genes_without_names = 0
genes_without_description = 0
# read file content without header
with open(gff_file_path, 'r') as gff_file:
gff_data = [line for line in gff_file.readlines() if not line.startswith('#')]
# remove all non gene lines and split by tab
gff_gene_lines = [line.split('\t') for line in gff_data if
line.split('\t')[2] in ["gene", "pseudogene", "processed_transcript", "RNA"]
or line.split('\t')[2].endswith('_gene_segment') or line.split('\t')[2].endswith('_gene')]
# extract ENSG<->HGNC mapping:
for line in gff_gene_lines:
meta_data = line[8].split(';')
# generate temp dict with all values:
meta_data_dict = {entry.split('=')[0]: entry.split('=')[1] for entry in meta_data}
# skip entries which do not have a ensembl gene id:
try:
ensg = meta_data_dict["gene_id"]
except KeyError:
continue
# tries to extract HGNC id or saves gene name instead
try:
# skip all non HGNC ids:
if "Source:HGNC Symbol%3BAcc:" not in meta_data_dict["description"]:
raise ValueError
hgnc = int(meta_data_dict["description"].split(':')[-1][:-1])
# skip all invalid HGNC ids
if hgnc not in valid_hgnc_ids:
raise ValueError
except (ValueError, KeyError):
# use gene name instead of HGNC id:
if "Name" in meta_data_dict.keys():
if '.' in meta_data_dict["Name"] or '-' in meta_data_dict["Name"]:
ignored_genes_dots.append(meta_data_dict["Name"])
else:
ignored_genes.append(meta_data_dict["Name"])
gene_name = meta_data_dict["Name"].upper()
else:
genes_without_names += 1
# use description as fallback
if "description" in meta_data_dict.keys():
description = meta_data_dict["description"] \
.replace("%2C", ",") \
.replace("%3B", ";") \
.replace("%26", "&")
gene_name = ensg + " (" + description + ")"
else:
# skip gene
genes_without_description += 1
continue
# replace alternative or old gene names with current valid HGNC gene names
if gene_name in alt_gene_names.keys():
gene_name = alt_gene_names[gene_name]
updated_gene_names += 1
ensg_to_non_hgnc_gene[ensg] = gene_name
non_hgnc_gene_to_ensg[gene_name] = ensg
continue
ensg_to_hgnc[ensg] = hgnc
hgnc_to_ensg[hgnc] = ensg
print("\t %i genes with HGNC ids mapped" % len(ensg_to_hgnc))
print("\t %i genes without HGNC ids mapped" % len(ensg_to_non_hgnc_gene))
print("\t %i genes without names" % genes_without_names)
print("\t %i genes without description" % genes_without_description)
print("\t %i gene names were updated to current HGNC symbols" % updated_gene_names)
return ensg_to_hgnc, hgnc_to_ensg, ensg_to_non_hgnc_gene, non_hgnc_gene_to_ensg
def read_gene_pred_file(gene_pred_file_handle):
"""
reads a genePred file as list of lists
:param gene_pred_file_handle: file handle for the genePred input file
:return: list of lists of all entries in the file
"""
print("reading genePred file...")
raw_data = gene_pred_file_handle.readlines()
gene_pred_table = [line.split('\t') for line in raw_data]
# close and delete file
print("\t" + gene_pred_file_handle.name)
gene_pred_file_handle.close()
os.remove(gene_pred_file_handle.name)
return gene_pred_table
def modify_gene_pred_data(gene_pred_data, ensg_to_hgnc, hgnc_to_gene, ensg_to_non_hgnc_gene):
"""
modifies the genePred data to match the IGV requirements using HGNC ids
:param gene_pred_data: content of the genePred file as list of lists
:param ensg_to_hgnc: dict with mapping ensembl id -> HGNC id
:param hgnc_to_gene: dict with mapping HGNC id -> gene name
:param ensg_to_non_hgnc_gene: dict with mapping ensembl id -> gene name (which do not have a HGNC id)
:return: list of lists with all entries of the modified genePred data
"""
print("modifying genePred file...")
hgnc_genes = 0
non_hgnc_genes = 0
genes_without_name = 0
for line in gene_pred_data:
# remove "transcript:" in front of the ENST id:
line[0] = line[0].split(':')[1]
# get ENSG id
ensg = line[11].split(':')[1].strip()
# replace ENSG id with gene name
try:
gene_name = hgnc_to_gene[ensg_to_hgnc[ensg]]
hgnc_genes += 1
except KeyError:
try:
gene_name = ensg_to_non_hgnc_gene[ensg]
non_hgnc_genes += 1
except KeyError:
gene_name = ensg
genes_without_name += 1
line[11] = gene_name
# add ENSG number as gene id in the first column
ensg_int = int(ensg[4:])
line.insert(0, str(ensg_int))
print("\t gene names from %i hgnc genes and %i non hgnc genes were replaced" % (hgnc_genes, non_hgnc_genes))
print("\t %i genes do not have a name (only id)" % (genes_without_name))
return gene_pred_data
def write_gene_pred_file(gene_pred_data, gene_pred_file_handle):
"""
writes a genePred file using the data (list of lists) given in gene_pred_data
:param gene_pred_data: list of lists containing all entries of the genePred file
:param gene_pred_file_handle: file handle to the genePred file
:return:
"""
print("writing genePred file...")
for line in gene_pred_data:
gene_pred_file_handle.write("\t".join(line))
# close genePred file
gene_pred_file_handle.close()
return
def extract_genome_file(genome_file_path, working_directory):
"""
extracts a IGV .genome file in the working directory
:param genome_file_path: file path to the .genome file
:param working_directory: path to the working directory
:return:
"""
print("extracting genome file...")
# generate temp folder to extract the files
temp_folder = os.path.join(working_directory, ".".join(os.path.split(genome_file_path)[1].split('.')[:-1]))
if not os.path.exists(temp_folder):
os.mkdir(temp_folder)
# extract .genome file
genome_file = zipfile.ZipFile(genome_file_path, 'r')
genome_file.extractall(temp_folder)
genome_file.close()
return temp_folder
def modify_genome_property_file(property_file_path):
"""
modifies the property.txt in the extracted genome file by changing the id and name and returning the
gene file name
:param property_file_path: path to the property.txt
:return: file name of the gene file,
"""
print("modifying genome property file (property.txt)...")
# read property.txt
with open(property_file_path, 'r') as property_file:
property_file_content = property_file.readlines()
# parse content:
properties = OrderedDict(
[(line.split('=')[0], "=".join(line.split('=')[1:]).strip()) for line in property_file_content if
not line.split('=')[0].strip() is ''])
# modify id and name
properties["id"] += "_ensembl"
properties["name"] += " ensembl"
# write property.txt back to disk
with open(property_file_path, 'w') as property_file:
for entry in properties:
property_file.write(entry + "=" + properties[entry] + "\n")
return properties["geneFile"], properties["chrAliasFile"]
def modify_chr_alias_file(alias_file_path):
"""
modifies the chr alias file in the genome file to map chrMT to chrM
:param alias_file_path: path to chr alias file
:return:
"""
# read alias file
with open(alias_file_path, 'r') as alias_file:
alias_file_content = alias_file.readlines()
# write alias file
with open(alias_file_path, 'w') as alias_file:
for line in alias_file_content:
if line.startswith("chrM"):
line = line.strip() + "\tchrMT\tM\n"
alias_file.write(line)
return
def replace_gene_file(original_gene_file, generated_gene_file):
"""
replaces the gene file from the .genome file with the previously generated one
:param original_gene_file: file path to the original genePred file (extracted from the .genome file)
:param generated_gene_file: file path to the previously generated and modified genePred file
:return:
"""
print("replacing gene file...")
copyfile(generated_gene_file, original_gene_file)
def generate_genome_file(src_folder, output_genome_file_name):
"""
generates a IGV .genome file from all files in src_folder
:param src_folder: path to directory containing all required files
:param output_genome_file_name: file path for the output genome
:return:
"""
print("generating genome file...")
# get all files from src_folder
files = [f for f in os.listdir(src_folder) if os.path.isfile(os.path.join(src_folder, f))]
# generate .genome (zip) file
genome_file = zipfile.ZipFile(output_genome_file_name, "w", zipfile.ZIP_DEFLATED)
for f in files:
genome_file.write(os.path.join(src_folder, f), f)
genome_file.close()
return
def generate_temp_file(named=False, mode='w+t', delete=True):
"""
generates a (named) temporary file
:param named: if true a NamedTemporaryFile instead of a TemporaryFile is created
(can be necessary for file system operations)
:param mode: defines the mode of the created file (default: 'w+t')
:return:
"""
if named:
temp_file = tempfile.NamedTemporaryFile(mode=mode, delete=delete)
else:
temp_file = tempfile.TemporaryFile(mode=mode, delete=delete)
return temp_file
def main():
args = parse_args()
# exit script if input file is missing
if not validate_args(args):
return
### convert gff file to genePred
temp_files = {}
# read input gff
header, content = read_gff_file(args.gff_file)
# sort input gff
sorted_content = sort_gff(content)
# write sorted gff to file
temp_files["sorted gff file"] = generate_temp_file(True, 'w+t', False)
write_gff(temp_files["sorted gff file"], header, sorted_content)
# setup converter
temp_files["gff3ToGenePred binary"] = generate_temp_file(True, 'w+b', False)
setup_gff_converter(temp_files["gff3ToGenePred binary"])
# run converter
temp_files["genePred file"] = generate_temp_file(True, 'w+t', False)
temp_files["genePred file"] = run_gff_converter(temp_files["gff3ToGenePred binary"], temp_files["sorted gff file"],
temp_files["genePred file"])
### modify genePred file
# parse HGNC file:
gene_to_hgnc, hgnc_to_gene, alt_gene_names = read_hgnc_file(args.hgnc_file)
# generate ENSG-HGNC mapping
ensg_to_hgnc, hgnc_to_ensg, ensg_to_non_hgnc_gene, non_hgnc_gene_to_ensg = \
generate_ensg_hgnc_mapping(args.gff_file, hgnc_to_gene.keys(), alt_gene_names)
# read genePred
gene_pred_data = read_gene_pred_file(temp_files["genePred file"])
# modify genePred to fit IGV requirements
modified_gene_pred_data = modify_gene_pred_data(gene_pred_data, ensg_to_hgnc, hgnc_to_gene, ensg_to_non_hgnc_gene)
# write modified genePred data back to disk
temp_files["genePred file modified"] = generate_temp_file(True, 'w+t', False)
write_gene_pred_file(modified_gene_pred_data, temp_files["genePred file modified"])
### replace gene file in genome file
# generate temporary directory
temp_files["zip extraction folder"] = tempfile.mkdtemp()
# extract given .genome file
temp_folder = extract_genome_file(args.genome_file, temp_files["zip extraction folder"])
# modify property.txt
gene_file_name, chr_alias_file_name = modify_genome_property_file(os.path.join(temp_folder, "property.txt"))
# modify chr alias file
modify_chr_alias_file(os.path.join(temp_folder, chr_alias_file_name))
# replace gene file
replace_gene_file(os.path.join(temp_folder, gene_file_name), temp_files["genePred file modified"].name)
# repack the files into a .genome file
generate_genome_file(temp_folder, args.output)
### cleanup temp files
# remove generated genePred file
os.remove(temp_files["genePred file modified"].name)
# remove zip extraction folder
shutil.rmtree(temp_files["zip extraction folder"])
return
if __name__ == '__main__':
main()