forked from TurkuNLP/Turku-neural-parser-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
confidence_proxy.py
112 lines (101 loc) · 3.86 KB
/
confidence_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import os
import flask
from flask import Response, request, jsonify
import sys
import requests
import multiprocessing
import socket
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
app=flask.Flask(__name__)
clusters_port = "7689"
cluster_size = int(ip_address.split('.')[-1]) - 3 #2
cluster_nodes = [
'turku-neural-parser-pipeline_turku_parser_' + str(_)
for _ in range(1, cluster_size + 1)
]
cluster_nodes.append('turku-neural-parser-pipeline_original_parser_1')
def to_dict(analyzed_sents: str):
sents = []
for token in analyzed_sents.split('\n'):
if not len(sents) or not len(token):
sents.append({'tokens': [], 'size': 0})
continue
if token.startswith('#'):
if token.startswith('# sent_id'):
sents[-1]['sent_id'] = int(token.strip('# sent_id = '))
elif token.startswith('# text'):
sents[-1]['text'] = token.strip('# text = ')
continue
temp = token.split('\t')
idx, surface, base, pos = temp[:4]
sents[-1]['tokens'].append({
'id': int(idx),
'surface': surface,
'base': base,
'pos': pos
})
sents[-1]['size'] += 1
return [_ for _ in sents if _['size']]
def dispatch(input_tup):
try:
payload, url, port = input_tup
headers = { 'Content-Type': 'text/plain; charset=utf-8' }
response = requests.request("POST", 'http://{}:{}'.format(url, port),
headers=headers, data = payload.encode('utf-8'), timeout=30)
ret = to_dict(response.text)
return ret
except Exception as e:
print(e)
return []
def join_result(results: list, model_num: int):
joined_sents = []
for sent_list in zip(*results):
sent = {'text': sent_list[0]['text'], 'sent_id': sent_list[0]['sent_id'], 'tokens': []}
if any([_['size'] != sent_list[0]['size'] for _ in sent_list]):
sent['error'] = 'tokenizing error'
joined_sents.append(sent)
else:
token_lists = [_['tokens'] for _ in sent_list]
is_valid = True
for token_list in zip(*token_lists):
if any([_['surface']!=token_list[0]['surface'] for _ in token_list]):
is_valid = False
if is_valid:
base_pos_list = [(_['base'], _['pos']) for _ in token_list]
base_pos = []
for base, pos in set(base_pos_list):
conf = sum([_ == (base, pos) for _ in base_pos_list]) / len(base_pos_list)
base_pos.append({
'base': base,
'pos': pos,
'conf': conf
})
token = {
'id': token_list[0]['id'],
'surface': token_list[0]['surface'],
'base_pos': base_pos
}
sent['tokens'].append(token)
if not is_valid:
sent['tokens'] = []
sent['error'] = 'tokenizing error'
sent['model_num'] = model_num
joined_sents.append(sent)
return joined_sents
def parallel_run(text):
p = multiprocessing.Pool(len(cluster_nodes))
# this needs to be declared global to mutate
# the resultsAr defined in the enclosing scope
input_tup = [(text, cluster_node, clusters_port) for cluster_node in cluster_nodes]
result = p.map(dispatch, input_tup, chunksize=1)
p.close()
p.join()
return join_result([_ for _ in result if len(_)], sum([len(_) > 0 for _ in result]))
@app.route("/",methods=["post"])
def parse_get():
txt=request.get_data(as_text=True)
result = parallel_run(txt)
return jsonify(result)
if __name__=="__main__":
app.run(host='0.0.0.0',port='5000')