-
Notifications
You must be signed in to change notification settings - Fork 7
/
lib_API_mapping.py
executable file
·348 lines (323 loc) · 11.7 KB
/
lib_API_mapping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
import ast
import os
import re
import sys
import json
from queue import Queue
from copy import deepcopy
from core import *
from core.source_visitor import SourceVisitor
from wheel_inspect import inspect_wheel
import tarfile
from zipfile import ZipFile
from pkg_resources import parse_version
import networkx as nx
class Tree:
def __init__(self, name):
self.name = name
self.children = []
self.parent = None
self.cargo = {}
self.source = ''
self.ast = None
def __str__(self):
return str(self.name)
def parse_import(tree):
module_item_dict = {}
try:
for node in ast.walk(tree):
if isinstance(node, ast.ImportFrom):
if node.module is None and node.level not in module_item_dict:
module_item_dict[node.level] = []
elif node.module not in module_item_dict:
module_item_dict[node.module] = []
items = [nn.__dict__ for nn in node.names]
for d in items:
if node.module is None:
module_item_dict[node.level].append(d['name'])
else:
module_item_dict[node.module].append(d['name'])
return module_item_dict
except(AttributeError):
return None
def gen_AST(filename):
try:
source = open(filename).read()
tree = ast.parse(source, mode='exec')
return tree
except (SyntaxError,UnicodeDecodeError,): # to avoid non-python code
pass
return None
def parse_pyx(filename):
lines = open(filename).readlines()
all_func_names = []
for line in lines:
names = re.findall('def ([\s\S]*?)\(', str(line))
if len(names)>0:
all_func_names.append(names[0])
def extract_class(filename):
try:
print(filename)
source = open(filename).read()
tree = ast.parse(source, mode='exec')
visitor = SourceVisitor()
visitor.visit(tree)
print('testing')
return visitor.result, tree
except Exception as e: # to avoid non-python code
# fail passing python3
if filename[-3:] == 'pyx':
parse_pyx(filename)
return {}, None # return empty
def extract_class_from_source(source):
try:
tree = ast.parse(source, mode='exec')
visitor = SourceVisitor()
visitor.visit(tree)
return visitor.result, tree
except Exception as e: # to avoid non-python code
#if filename[-3:] == 'pyx':
# #print(filename)
# parse_pyx(filename)
print(e)
return {}, None# return empty
def build_dir_tree(node):
if node.name in ['test', 'tests', 'testing']:
return
if os.path.isdir(node.name) is True:
os.chdir(node.name)
items = os.listdir('.')
for item in items:
child_node = Tree(item)
child_node.parent = node
build_dir_tree(child_node)
node.children.append(child_node)
os.chdir('..')
else:
# this is a file
if node.name.endswith('.py'):
source = open(node.name, 'rb').read()
node.source = source.decode("utf-8", errors="ignore")
res, tree = extract_class_from_source(node.source)
node.cargo = res
node.ast = tree
def leaf2root(node):
tmp_node = node
path_to_root = []
# not init.py
while tmp_node is not None:
path_to_root.append(tmp_node.name)
tmp_node = tmp_node.parent
if node.name == '__init__.py':
path_to_root = path_to_root[1:]
path_name = ".".join(reversed(path_to_root))
return path_name
else:
path_name = ".".join(reversed(path_to_root[1:]))
path_name = "{}.{}".format(path_name, node.name.split('.')[0])
return path_name
def find_child_by_name(node, name):
for ch in node.children:
if ch.name == name:
return ch
return None
def find_node_by_name(nodes, name):
for node in nodes:
if node.name == name or node.name.rstrip('.py')== name:
return node
return None
def go_to_that_node(root, cur_node, visit_path):
route_node_names = visit_path.split('.')
route_length = len(route_node_names)
tmp_node = None
# go to the siblings of the current node
tmp_node = find_node_by_name(cur_node.parent.children, route_node_names[0])
if tmp_node is not None:
for i in range(1,route_length):
tmp_node = find_node_by_name(tmp_node.children, route_node_names[i])
if tmp_node is None:
break
# from the topmost
elif route_node_names[0] == root.name:
tmp_node = root
for i in range(1,route_length):
tmp_node = find_node_by_name(tmp_node.children, route_node_names[i])
if tmp_node is None:
break
return tmp_node
# from its parent
elif route_node_names[0] == cur_node.parent.name:
tmp_node = cur_node.parent
for i in range(1,route_length):
tmp_node = find_node_by_name(tmp_node.children, route_node_names[i])
if tmp_node is None:
break
# we are still in the directory
if tmp_node is not None and tmp_node.name.endswith('.py') is not True:
tmp_node = find_node_by_name(tmp_node.children, '__init__.py')
return tmp_node
def tree_infer_levels(root_node):
API_name_lst = []
leaf_stack = []
working_queue = []
working_queue.append(root_node)
# bfs to search all I leafs
while len(working_queue)>0:
tmp_node = working_queue.pop(0)
if tmp_node.name.endswith('.py') == True:
leaf_stack.append(tmp_node)
working_queue.extend(tmp_node.children)
# visit all elements from the stack
for node in leaf_stack[::-1]:
# private modules
if node.name!='__init__.py' and node.name[0]=='_':
continue
module_item_dict = parse_import(node.ast)
if module_item_dict is None:
continue
for k, v in module_item_dict.items():
if k is None or isinstance(k, int):
continue
dst_node = go_to_that_node(root_node, node, k)
if dst_node is not None:
if v[0] =='*':
for k_ch, v_ch in dst_node.cargo.items():
node.cargo[k_ch] = v_ch
k_ch_all = list(dst_node.cargo.keys())
else:
for api in v:
if api in dst_node.cargo:
node.cargo[api]= dst_node.cargo[api]
else:
pass
for node in leaf_stack:
# get visit path
API_prefix = leaf2root(node)
node_API_lst = make_API_full_name(node.cargo, API_prefix)
API_name_lst.extend(node_API_lst)
return API_name_lst
def make_API_full_name(meta_data, API_prefix):
API_lst = []
for k, v in meta_data.items():
# to be revised
if k[0] == '_':
continue # private functions or classes
# this is a function def
if isinstance(v, tuple):
if k[0] != '_':
API_name = "{}.{},{},{},{}".format(API_prefix, k, ";".join(v[0]), v[1], "func")
API_lst.append(API_name)
# this is a class
elif isinstance(v, dict):
# there is a constructor
if '__init__' in v:
args = v['__init__']
API_name = "{}.{},{},{}".format(API_prefix,k, ";".join(args[0]), args[1], "func")
API_lst.append(API_name)
# there is no a constructor
else:
args = ([], "")
API_name = "{}.{},{},{}".format(API_prefix,k, ";".join(args[0]), args[1], "func")
API_lst.append(API_name)
for f_name, args in v.items():
if f_name[0] != '_': # private functions
API_name = "{}.{}.{},{},{},{}".format(API_prefix, k, f_name, ";".join(args[0]), args[1], "cls")
API_lst.append(API_name)
return API_lst
def search_targets(root_dir, targets):
entry_points = []
for root, dirs, files in os.walk(root_dir):
n_found = 0
for t in targets:
if t in dirs :
entry_points.append(os.path.join(root, t))
n_found += 1
elif t+'.py' in files:
entry_points.append(os.path.join(root, t+'.py'))
n_found += 1
if n_found == len(targets):
return entry_points
return None
# filter wheel
# notice we will add egginfo soon
def process_wheel(path, l_name):
# there will be multiple wheel files
res = []
all_file_names = os.listdir(path)
whl_final = ''
max_py_ver = ''
for fn in all_file_names:
if fn.endswith('.whl') and (fn.find('linux')>=0 or fn.find('any')>=0): # this is a wheel
whl_path = os.path.join(path, fn)
try:
output = inspect_wheel(whl_path)
if output['pyver'][-1]> max_py_ver: # -1 means the last one. Use the biggest version number
max_py_ver = output['pyver'][-1]
whl_final = fn
except Exception as e:
print("failed to handle {}".format(whl_path))
print(e)
if whl_final != '':
whl_path = os.path.join(path, whl_final)
output = inspect_wheel(whl_path)
#print(output.keys())
if 'top_level' not in output['dist_info']:
top_levels = [l_name]
else:
top_levels = output['dist_info']['top_level']
with ZipFile(whl_path, 'r') as zipObj:
# Extract all the contents of zip file in current directory
source_dir = os.path.join(path, 'tmp')
if not os.path.exists(source_dir):
zipObj.extractall(source_dir)
entry_points = search_targets(source_dir, top_levels)
return entry_points
return None
def process_single_module(module_path):
API_name_lst = []
# process other modules !!!
if os.path.isfile(module_path):
name_segments = os.path.basename(module_path).rstrip('.py*') # .py and .pyx
# process a single file module
res, tree = extract_class(module_path)
node_API_lst = make_API_full_name(res, name_segments)
API_name_lst.extend(node_API_lst)
else:
first_name = os.path.basename(module_path)
working_dir = os.path.dirname(module_path)
path = []
cwd = os.getcwd() # save current working dir
os.chdir(working_dir)
root_node = Tree(first_name)
build_dir_tree(root_node)
API_name_lst = tree_infer_levels(root_node)
os.chdir(cwd) # go back cwd
return API_name_lst
def main():
lib_dir = sys.argv[1]
lib_name = sys.argv[2]
versions = os.listdir(lib_dir)
versions.sort(key=lambda x:parse_version(x))
API_data = {"module":[], "API":{}, "version":[]}
API_data['version'] = versions
for v in versions:
v_dir = os.path.join(lib_dir, v)
print(v_dir)
entry_points = process_wheel(v_dir, lib_name)
if entry_points is not None:
API_data['module'] = entry_points
for ep in entry_points:
API_name_lst = process_single_module(ep) # finish one version
if API_name_lst is None:
continue
for name in API_name_lst:
if name not in API_data['API']:
API_data['API'][name] = [v]
else:
API_data['API'][name] += [v]
f = open("{}.json".format(lib_name), 'w')
f.write(json.dumps(API_data))
f.close()
print(lib_name)
if __name__ == '__main__':
main()