-
Notifications
You must be signed in to change notification settings - Fork 1
/
STT.py
110 lines (87 loc) · 3.04 KB
/
STT.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python3
# Requires PyAudio and PySpeech.
# Speech To Text
import sys,os
import time
from threading import Thread
from threading import Event
import app_logger
import sound
import TTS
import VA
import config
from common.speech.SttVosk import SttVosk
KEEP_SPEECH_TIMEOUT=6 # time during what will be ready for continue talk without keyword, in RPI4 8 gb 3.5 good, with 4 gb 6 good ?
pathname = os.path.dirname(sys.argv[0])
work_dir=os.path.abspath(pathname)
thread_listen = None
recognition = None
Running = True
keywords = [config.TTS_KEYWORD]
def init(use_thread=False):
global thread_listen
global thread_notify
global recognition
recognition = SttVosk(app_logger.logger, config.vosk_model_path_small, sound.jabra_device_id, keywords)
if use_thread:
thread_listen = Thread(target=listen_thread, args=(10, ))
thread_listen.kill_received = False
thread_listen.start()
def deinit():
global thread_listen
global Running
if Running:
Running = False
app_logger.info("start stoping "+os.path.basename(__file__))
if recognition:
recognition.force_stop()
if thread_listen != None:
thread_listen.kill_received = True
thread_listen.join()
thread_listen = None
app_logger.info("end stoping "+os.path.basename(__file__))
def listen_thread(arg):
try:
if sound.jabra_device_id == -1 :
app_logger.info(f"no jabra found, no listen: {sound.jabra_device_id}")
return
# just thread started and ready for commands
sound.beep(0.05)
while Running:
listen_cycle()
except Exception:
app_logger.exception("STT thread exception:")
def remove_prefix(s, prefix):
return s[len(prefix):] if s.startswith(prefix) else s
def wait_for_keyword():
while not recognition.is_force_stop():
text = recognition.recognize()
if text :
text=text.strip().lower() # remove spaces and convert to lower case
app_logger.info(f"STT: {text}")
#remove keyword from command
for keyword in recognition.keywords:
if text.startswith(keyword):
return remove_prefix(text, keyword).strip()
#this must be after keyword in case if keyword was talked again
time_diff=time.time()-TTS.last_say_time
if (time_diff) < KEEP_SPEECH_TIMEOUT:
# we are keep talking no need for keyword check
return text
return ""
def listen_cycle():
try:
command=wait_for_keyword()
if not command and Running:
command = listen_phrase()
if not command or not Running:
return
VA.process_comand(command)
except Exception:
app_logger.exception("voice recognition exception:")
def listen_phrase():
TTS.say("Слушаю")
command = recognition.recognize(3, 0.5)
if not command :
TTS.say("Ни шанса понять, давай все заново")
return command