-
Notifications
You must be signed in to change notification settings - Fork 0
/
crisprdetectparser.py
167 lines (144 loc) · 5.36 KB
/
crisprdetectparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Parser for CRISPRDetect output files.
Run with --help for an how to run.
Minimum Python version is 3.6.
Hielke Walinga ([email protected])
"""
import argparse
import fileinput
import os
import re
import sys
from collections import defaultdict
from itertools import takewhile
from os import path
from typing import Dict
header = (
"genome\tcontig\tarray_id\tbegin\tend\torientation\tnumber_of_spacers\t"
"array_family\tquality_score\trepeat_sequence"
)
ap = argparse.ArgumentParser(
prog="CRISPRDetect parser",
description="Run this progrom on CRISPRDetect output files to extract"
" the spacers and the metadate about the CRISPR arrays."
" The program can process the CRISPR array files with sufficient quality score,"
" but also the array files with"
" a score below the required quality score (with the .fp extension).",
)
ap.add_argument(
"files", nargs="*", help="All the crisprdetect(with or without .fp) output files."
)
ap.add_argument(
"--spacers-directory",
nargs="?",
const="spacers_dir",
default=None,
help="Create spacers folder. Creates a folder with fasta files that contain the"
" spacer sequences for each genome."
" Leave out if you don't want to extract the spacers.",
)
ap.add_argument(
"--out",
type=argparse.FileType("a+"),
default=sys.stdout,
help="Output file, if left out, this will be stdout. If file already exist"
" it will append.",
)
ap.add_argument(
"--sep", default="\t", help="Seperator in the metadata file. (Default: tab)"
)
ap.add_argument(
"--crisprdetect-extension",
default="crisprdetect",
help="Extension of crisperdetect files that is cut off for genome name."
" (Default: crisprdetect)",
)
ap.add_argument(
"--spacers-extension",
default="spacers.fna",
help="Extension string of the spacers fasta file. (Default: spacers.fna)",
)
ap.add_argument(
"--header", action="store_true", help=f"Output file with a header: {header}"
)
ap.add_argument("--force", action="store_true", help="Overwrite existing files.")
args = ap.parse_args()
if args.spacers_directory:
if not path.isdir(args.spacers_directory):
os.mkdir(args.spacers_directory)
spacers_file = None
if args.header:
print(header, file=args.out)
contigs_counter: Dict[str, int] = defaultdict(int)
with fileinput.input(args.files) as f:
for line in f:
if f.isfirstline(): # Initialize for new file, new genome.
genome = path.basename(f.filename()).replace(
f".{args.crisprdetect_extension}", ""
)
contigs_counter.clear()
if args.spacers_directory:
if spacers_file:
spacers_file.close()
spacers_file_name = f"{genome}.{args.spacers_extension}"
spacers_path_name = path.join(args.spacers_directory, spacers_file_name)
if not args.force and path.exists(spacers_path_name):
raise Exception(
f"{spacers_path_name} already exists."
" Set --force to overwrite this file,"
" and other already existing files."
)
spacers_file = open(spacers_path_name, "w")
# Array definition starts now.
if line.startswith(">"):
F = line.split()
orientation = F[-1].strip()
contig = F[0].lstrip(">")
contigs_counter[contig] += 1
number = contigs_counter[contig]
array_id = f"{contig}_{number}"
# Loop untill spacers list.
all(takewhile(lambda l: not str(l).startswith("==="), f))
# Loop over all spacers.
for idx, line in enumerate(f):
F = line.split()
spacer_sequence = F[5]
if spacer_sequence == "|": # End of spacers list.
end = int(F[0]) + (1 if orientation == "Forward" else -1) * int(
F[1]
)
num_spacers = idx
next(f)
line = next(f)
repeat = line.split()[4]
break
if idx == 0:
begin = int(F[0])
if args.spacers_directory:
print(f">{array_id}_{idx + 1}", file=spacers_file)
print(spacer_sequence, file=spacers_file)
# Continue till Array family, and output array, and continue main loop.
for line in f:
if line.startswith("# Questionable array :"):
score = line.split(": ")[-1].strip()
if line.startswith("# Array family :"):
family = re.search(r"# Array family :\s(.*?)(?:\s|$)", line)[1]
if orientation != "Forward":
begin, end = end, begin
print(
genome,
contig,
array_id,
begin,
end,
orientation,
num_spacers,
family,
score,
repeat,
sep=args.sep,
file=args.out,
)
break