forked from TurkuNLP/Turku-neural-parser-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
full_pipeline_stream.py
141 lines (120 loc) · 6.43 KB
/
full_pipeline_stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from pipeline import Pipeline
import sys
import select
import os
import yaml
import time
import gzip
import re
# def non_blocking_batch(inp,timeout=0.2,batch_lines=10000,wait_for_empty_line=False):
# line_buffer=[]
# #Feeds input
# while True:
# ready_to_read=select.select([inp], [], [], timeout)[0] #check whether f is ready to be read, wait at least timeout (otherwise we run a crazy fast loop)
# if not ready_to_read:
# # Stdin is not ready, yield what we've got, if anything
# if line_buffer:
# print("Yielding on timeout",len(line_buffer),"lines",file=sys.stderr,flush=True)
# yield "".join(line_buffer)
# line_buffer=[]
# continue #next try
# # inp is ready to read!
# # we should always try to get stuff until the next empty line - we might be parsing text paragraphs
# while True:
# line=inp.readline()
# if not line: #End of file detected
# if line_buffer:
# print("Yielding on end-of-input",len(line_buffer),"lines",file=sys.stderr,flush=True)
# yield "".join(line_buffer)
# print("End-of-file detected",file=sys.stderr,flush=True)
# return
# line_buffer.append(line)
# if not line.strip(): #empty line
# break #our chance to yield
# if not wait_for_empty_line and len(line_buffer)>batch_lines: #okay, we just have to yield
# break
# # Now we got the next sentence --- do we have enough to yield?
# if len(line_buffer)>batch_lines:
# print("Yielding on full batch",len(line_buffer),"lines",file=sys.stderr,flush=True)
# yield "".join(line_buffer) #got enough
# line_buffer=[]
def read_pipelines(fname):
absdir=os.path.dirname(os.path.abspath(fname))
with open(fname) as f:
pipelines=yaml.load(f, Loader=yaml.BaseLoader)
for pipeline_name,component_list in pipelines.items():
new_component_list=[c.format(thisdir=absdir) for c in component_list]
pipelines[pipeline_name]=new_component_list
return pipelines
def batch_endswith_text(lines):
global comment_regex
for line in lines[::-1]:
if not line.strip(): #skip empty lines
continue
if comment_regex.match(line):
return False
return True
return False
def batch_has_text(lines):
global comment_regex
for line in lines:
if not line.strip():
continue
if comment_regex.match(line):
continue
return True
return False
if __name__=="__main__":
import argparse
THISDIR=os.path.dirname(os.path.abspath(__file__))
argparser = argparse.ArgumentParser(description='Parser pipeline')
general_group = argparser.add_argument_group(title='General', description='General pipeline arguments')
general_group.add_argument('--conf-yaml', default=os.path.join(THISDIR,"pipelines.yaml"), help='YAML with pipeline configs. Default: parser_dir/pipelines.yaml')
general_group.add_argument('--pipeline', default="parse_plaintext", help='[DEPRECATED] Name of the pipeline to run, one of those given in the YAML file. Default: %(default)s')
general_group.add_argument('--empty-line-batching', default=False, action="store_true", help='Only ever batch on newlines (useful with pipelines that input conllu)')
general_group.add_argument('--batch-lines', default=1000, type=int, help='Number of lines in a job batch. Default %(default)d, consider setting a higher value if using conllu input instead of raw text (maybe 5000 lines), and try smaller values in case of running out of memory with raw text.')
general_group.add_argument('action', default=None, nargs='?', help="What to do. Either 'list' to lists pipelines or a pipeline name to parse, or nothing in which case the default parse_plaintext is used.")
lemmatizer_group = argparser.add_argument_group(title='lemmatizer_mod', description='Lemmatizer arguments')
lemmatizer_group.add_argument('--gpu', dest='lemmatizer_mod.gpu', type=int, default=0, help='GPU device id for the lemmatizer, if -1 use CPU')
lemmatizer_group.add_argument('--batch_size', dest='lemmatizer_mod.batch_size', type=int, default=100, help='Lemmatizer batch size')
args = argparser.parse_args()
pipelines=read_pipelines(args.conf_yaml)
if args.action=="list":
print(sorted(pipelines.keys()),file=sys.stderr,flush=True)
sys.exit(0)
elif args.action is not None and args.action!="parse": #deprecated legacy stuff, allowing calls like --pipeline pipelinename parse
pipeline=pipelines[args.action]
elif args.action is None or args.action=="parse":
pipeline=pipelines[args.pipeline]
if pipeline[0].startswith("extraoptions"):
extraoptions=pipeline[0].split()[1:]
pipeline.pop(0)
newoptions=extraoptions+sys.argv[1:]
print("Got extra arguments from the pipeline, now running with", newoptions, file=sys.stderr, flush=True)
args=argparser.parse_args(newoptions)
pipeline.append("output_mod")
p=Pipeline(steps=pipeline, extra_args=args)
print("Waiting for input",file=sys.stderr,flush=True)
comment_regex=re.compile("^####?C:")
line_buffer=[]
for line in sys.stdin:
line_buffer.append(line)
if not comment_regex.match(line) and (line.strip()=="" or not args.empty_line_batching) and len(line_buffer)>args.batch_lines and batch_endswith_text(line_buffer):
if not p.is_alive(): #gotta end if something dies
print("Something crashed. Exiting.",file=sys.stderr,flush=True)
sys.exit(-1)
print("Feeding a batch",file=sys.stderr,flush=True)
p.put("".join(line_buffer))
line_buffer=[]
else:
if line_buffer:
if not batch_has_text(line_buffer):
print("WARNING: Comments and empty lines at the end of the input will be removed in order to produce valid conll-u. The input must not end with comments",file=sys.stderr,flush=True)
else:
if not p.is_alive(): #gotta end if something dies
print("Something crashed. Exiting.",file=sys.stderr,flush=True)
sys.exit(-1)
print("Feeding final batch",file=sys.stderr,flush=True)
p.put("".join(line_buffer))
p.send_final()
p.join()