-
Notifications
You must be signed in to change notification settings - Fork 0
/
preprocessing.py
125 lines (92 loc) · 3.63 KB
/
preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
'''
Pipeline phase 1-2
'''
import stanza
import time
import json
import re
from pprint import pprint
from utils import replace_name, alter_token, alter_text
from pipeline_config import filter_entity
filter_entity_dict=filter_entity()
delim, replacement, exclude_types, include_types = filter_entity_dict['delim'], filter_entity_dict['replacement'], filter_entity_dict['exclude_types'], filter_entity_dict["include_types"]
def texts2NER(texts, report=False, exclude=False, include=False, tweets_per_round=20000):
'''
MAIN running file.
NER tagging over tweet texts.
texts: [text]
text: tweet full_text.
return [[NER]] index corresponds to [text], hence traceable
TESTED
'''
i, page_number, NER_list, i2add = 0,1,[], 0 #i2add: alter start_char, end_char by this much
texts_str = delim.join(texts)
while i<len(texts): #Process texts in rounds, since Stanza seems to take the least time when process less than 20000 tweets.
print("\tRound %i"%page_number)
text = alter_text(delim.join(texts[i:tweets_per_round*page_number]))
start=time.time()
NERs = get_NERs(text)
end=time.time()
print("\nOutput length for round %i: %i" %(page_number, len(NERs)))
print("\t...takes %i hours %f seconds." %((end-start)//3600, (end-start)%3600)) #report process taken how long.
print("Start post processing...incl type filtering, start_char & end_char altering, to_dict...")
NERs = list(replace_all(NERs))
exclude_types=filter_entity_dict["exclude_types"] if exclude else []
include_types=filter_entity_dict["include_types"] if include else []
count=0
for e in NERs:
e['end_char'], e['start_char'] = e['end_char']+i2add, e['start_char']+i2add
e_list_i = get_NER_list_index(e.get("end_char"), texts_str)
if (not include_types and e.get("type") not in exclude_types) or (not exclude_types and e.get("type") in include_types):
while len(NER_list)<e_list_i+1: NER_list.append([])
NER_list[e_list_i].append(e)
count+=1
if report: print(e['text'], e['type'])
i2add+=len(text)
i+=tweets_per_round
page_number+=1
del NERs
print("\tDone with %i NER entities."%count)
return NER_list
def get_NER_list_index(NER_end_index, text):
'''
Return index of the full_text the NER belongs to.
NER_end_index: int.
text: the input of get_NERs
output: int.
'''
return len(text[:NER_end_index].split(delim))-1
def replace_all(NERs):
'''
post processing of NERs
replace_name: e.g.
realDonaldTrump > Donald Trump,
realdonaldtrump > realdonaldtrump
replace_by_dict: e.g. AU > Australia to compress entity space.
alter_token: e.g. Australia!! > Australia
'''
replacement_dict = filter_entity_dict['replacement']
def replace_by_dict(token):
if token in replacement_dict: token = replacement_dict[token]
return token
for NER in NERs:
text = replace_by_dict(alter_token(replace_name(NER)))
if text:
yield {
"text":text,
"start_char": NER.start_char,
"end_char": NER.end_char,
"type":NER.type
}
def get_NERs(text):
pipe = stanza.Pipeline(lang='en', processors='tokenize,ner', use_gpu=True)
return pipe(text).entities
if __name__ == '__main__':
sample_texts = json.load(open("4.Get Tweets/2020-03-30.json"))
sample_texts = [item[1] for item in sample_texts]
# text = delim.join(sample_texts)
# text = "@GladysB Stable door closed by @ScottMorrisonMP. \nHorse has bolted. \nAustralia has to contend with community transmission and recession due to fatal dithering and delay. \nhttps://t.co/maHJlRSTgy \n#covid19australia #coronavirus"
# out = get_NERs(text)
l = texts2NER(sample_texts, include=True, tweets_per_round=50000)
pprint(l)
# out = NER2texts(e, sample_texts)