-
Notifications
You must be signed in to change notification settings - Fork 0
/
contact_calculations.py
executable file
·248 lines (198 loc) · 7.18 KB
/
contact_calculations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
import os
import glob
import numpy as np
import pandas as pd
from collections import defaultdict
from scipy.spatial import distance
from Bio import pairwise2
import pdb
#Arguments for argparse module:
parser = argparse.ArgumentParser(description = '''Calculate contacts for both structural and sequence alignments.
A contact is defined as 2 C-betas (C-alphas for glycine) of side-chains separated
by more than five residues being less than 8 Å apart.''')
parser.add_argument('indir', nargs=1, type= str,
default=sys.stdin, help = '''path to input directory. include / in end''')
parser.add_argument('outdir', nargs=1, type= str,
default=sys.stdin, help = '''path to output directory. include / in end''')
parser.add_argument('fastadir', nargs=1, type= str,
default=sys.stdin, help = '''path to fasta directory. include / in end''')
parser.add_argument('df_path', nargs=1, type= str,
default=sys.stdin, help = '''path to df.''')
parser.add_argument('group', nargs=1, type= str,
default=sys.stdin, help = '''Group.''')
#FUNCTIONS
def read_fasta(filepath):
'''Read fasta sequence
'''
with open(filepath, 'r') as file:
sequence = ''
for line in file:
line = line.rstrip() #remove \n
if line[0] == '>':
uid = line[1:]
else:
sequence += line
return(sequence)
def match_contacts(df, indir, outdir, fastadir):
'''Match alignments to dssp surface acc and 2ndary str
'''
contact_dict = {}
fasta_dict = {}
#Create new columns to add in df
DIFFC_seqaln = []
DIFFC_straln = []
for index, row in df.iterrows():
group = row['group']
uid1 = row['uid1']
uid2 = row['uid2']
if uid1 not in contact_dict.keys():
contacts, sequence = read_cbs(indir+group+'/'+uid1+'.pdb')
contact_dict[uid1] = [contacts, sequence]
fasta_dict[uid1] = read_fasta(fastadir+group+'/'+uid1+'.fa')
if uid2 not in contact_dict.keys():
contacts, sequence = read_cbs(indir+group+'/'+uid2+'.pdb')
contact_dict[uid2] = [contacts, sequence]
fasta_dict[uid2] = read_fasta(fastadir+group+'/'+uid2+'.fa')
for suffix in ['_seqaln', '_straln']:
#Match sequences to alignments
aln1 = row['seq1'+suffix]
aln2 = row['seq2'+suffix]
#Save gapless alignments
gapless_aln1 = ''
gapless_aln2 = ''
for i in range(len(aln1)):
if aln1[i] == '-' or aln2[i] == '-':
continue
else:
gapless_aln1 += aln1[i]
gapless_aln2 += aln2[i]
(mc1, M) = match(gapless_aln1, contact_dict[uid1], fasta_dict[uid1])
(mc2, N) = match(gapless_aln2, contact_dict[uid2], fasta_dict[uid2])
C = 0 #Keep track of the number of conserved contacts in the alignment
for i in range(len(mc1)):
c1 = mc1[i]
c2 = mc2[i]
for j in c1:
if j in c2:#If the contacts at the same position is shared.
C+=1
try:
diff = 1-(C/(M+N-C))
except: #division by 0
diff = 1 # No conserved
if suffix == '_seqaln':
DIFFC_seqaln.append(diff)
else:
DIFFC_straln.append(diff)
#Set new columns in df
df['DIFFC_seqaln'] = DIFFC_seqaln
df['DIFFC_straln'] = DIFFC_straln
#Write new df to outdir
df.to_csv(outdir+group+'_df.csv')
return None
def read_cbs(pdbfile):
'''Get the C-betas from a pdb file.
'''
three_to_one = {'ARG':'R', 'HIS':'H', 'LYS':'K', 'ASP':'D', 'GLU':'E', 'SER':'S', 'THR':'T', 'ASN':'N', 'GLN':'Q', 'CYS':'C', 'GLY':'G', 'PRO':'P', 'ALA':'A', 'ILE':'I', 'LEU':'L', 'MET':'M', 'PHE':'F', 'TRP':'W', 'TYR':'Y', 'VAL':'V', 'UNK': 'X'}
sequence = ''
pos = [] #Save positions in space
prev_res = -1 #Keep track of potential alternative residues
with open(pdbfile, 'r') as file:
for line in file:
record = parse_atm_record(line)
if record['atm_name'] == 'CB':
if record['res_no'] == prev_res:
continue
else:
prev_res = record['res_no']
pos.append(np.array([record['x'], record['y'], record['z']]))
sequence += three_to_one[record['res_name']]
if record['atm_name'] == 'CA' and record['res_name'] == 'GLY':
prev_res = record['res_no']
pos.append(np.array([record['x'], record['y'], record['z']]))
sequence += three_to_one[record['res_name']]
contacts = get_contacts(pos)
return contacts, sequence
def parse_atm_record(line):
record = defaultdict()
record['name'] = line[0:6].strip()
record['atm_no'] = int(line[6:11])
record['atm_name'] = line[12:16].strip()
record['res_name'] = line[17:20].strip()
record['chain'] = line[21]
record['res_no'] = int(line[22:26])
record['insert'] = line[26].strip()
record['resid'] = line[22:29]
record['x'] = float(line[30:38])
record['y'] = float(line[38:46])
record['z'] = float(line[46:54])
record['occ'] = float(line[54:60])
record['B'] = float(line[60:66])
return record
def get_contacts(pos):
contacts = [] #Save each residue's contacts
for i in range(len(pos)):
contacts.append([])
for j in range(i+5, len(pos)):
dist = distance.euclidean(pos[i], pos[j])
if dist < 8:
contacts[i].append(j)
return contacts
def match(gapless_aln, contact_info, org_seq):
'''Get contacts matching alignment'''
contacts = contact_info[0]
c_seq = contact_info[1]
#align to original sequence
aln1 = pairwise2.align.globalxx(org_seq, gapless_aln) #org_seq is the fasta sequence
aln2 = pairwise2.align.globalxx(org_seq, c_seq) #c_seq is the sequence extracted from the pdb file
seq1 = aln1[0][1] #gapless aligned sequence to org
seq2 = aln2[0][1] #c_seq to org
index = np.zeros([max(len(seq1), len(seq2)),3], dtype=int) #Create an index of the conversion btw positions in the alignments
i1=1
i2=1
for i in range(len(seq2)): #seq2 is longer than seq 1, potentially creating mismatched lengths
index[i,0] = i+1 #Can't have 0 index - since non-matches are zeros
if seq2[i] != '-':
index[i,2] = i2 #gapless alignment
i2+=1
if i > len(seq1)-1:#If the end of seq1 has been reached
continue
if seq1[i] != '-':
index[i,1] = i1 #extracted sequence from pdb file
i1+=1
#Save matched contacts
matched_contacts = []
#Save number of contacts
T = 0
for i in range(len(seq1)):
if seq1[i] != '-': #If there are no gaps in the alignment
matched_contacts.append([])
if seq2[i] != '-': #If there are no gaps in the alignment
try:
match = index[np.where(index[:,0]==i+1)[0][0]] #Get index matches
except:
continue
cis = contacts[match[2]-1] #One minus in contacts compared to contact index match
if not cis: #if no contacts here
continue
for c in cis: #Match contacts to gapless alignment index
alni = index[np.where(index[:,2]==c+1)[0][0]][1] #One plus in index compared to contact index
if alni != 0: #If not zero; zero = not present
matched_contacts[-1].append(alni-1) #One minus in aln compared to contact index match
T+=1 #Total number of contacts in gapless alignment
return matched_contacts, T
#MAIN
args = parser.parse_args()
indir = args.indir[0]
outdir = args.outdir[0]
fastadir = args.fastadir[0]
group = args.group[0]
df_path = args.df_path[0]
df = pd.read_csv(df_path)
if 'group' not in df.columns: #If the group name is not group
df = df.rename(columns={'H_group':'group'})
df = df[df['group']==group]
match_contacts(df, indir, outdir, fastadir)