-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
244 lines (213 loc) · 11.2 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4
import rapidsms
from rapidsms.apps.base import AppBase
from rapidsms.models import Connection
from rapidsms.messages import OutgoingMessage
from models import *
from rapidsms.models import Contact
#TODO this is mad hackery to make sure porting is complete
#from i18n.utils import get_translation as _
#from i18n.utils import get_language_code
def _(self, text, lang=None):
if text is not None:
return text
#TODO this is mad hackery to make sure porting is complete
def get_language_code(self, lang):
pass
class App(AppBase):
registered_functions = {}
session_listeners = {}
def start(self):
self.configure()
def configure(self, last_message="You are done with this survey. Thanks for participating!", **kwargs):
self.last_message = last_message
def handle(self, msg):
# if this caller doesn't have a session attribute,
# they're not currently answering a question tree, so
# just search for triggers and return
sessions = Session.objects.all().filter(state__isnull=False)\
.filter(connection=msg.connection)
if not sessions:
try:
tree = Tree.objects.get(trigger=msg.text)
# start a new session for this person and save it
self.start_tree(tree, msg.connection, msg)
return True
# no trigger found? no big deal. the
# message is probably for another app
except Tree.DoesNotExist:
return False
# the caller is part-way though a question
# tree, so check their answer and respond
else:
session = sessions[0]
state = session.state
self.debug(state)
# loop through all transitions starting with
# this state and try each one depending on the type
# this will be a greedy algorithm and NOT safe if
# multiple transitions can match the same answer
transitions = Transition.objects.filter(current_state=state)
found_transition = None
for transition in transitions:
if self.matches(transition.answer, msg):
found_transition = transition
break
# the number of tries they have to get out of this state
# if empty there is no limit. When the num_retries is hit
# a user's session will be terminated.
# not a valid answer, so remind
# the user of the valid options.
if not found_transition:
transitions = Transition.objects.filter(current_state=state)
# there are no defined answers. therefore there are no more questions to ask
if len(transitions) == 0:
# send back some precanned response
msg.respond(self.last_message)
# end the connection so the caller can start a new session
self._end_session(session)
return
else:
# send them some hints about how to respond
if state.question.error_response:
response = (_(state.question.error_response, get_language_code(session.connection)))
if "%(answer)s" in response:
response = response % ({"answer" : msg.text})
else:
flat_answers = " or ".join([trans.answer.helper_text() for trans in transitions])
# Make translation happen all at the end. This is currently more practical
#translated_answers = _(flat_answers, get_language_code(session.connection))
#response = _('"%(answer)s" is not a valid answer. You must enter %(hint)s',
# get_language_code(session.connection))% ({"answer" : msg.text, "hint": translated_answers})
untranslated_response ='"%(answer)s" is not a valid answer. You must enter ' + flat_answers
response = _(untranslated_response,
get_language_code(session.connection))% ({"answer" : msg.text})
msg.respond(response)
# update the number of times the user has tried
# to answer this. If they have reached the
# maximum allowed then end their session and
# send them an error message.
session.num_tries = session.num_tries + 1
if state.num_retries and session.num_tries >= state.num_retries:
session.state = None
msg.respond(_("Sorry, invalid answer %(retries)s times. Your session will now end. Please try again later.",
get_language_code(session.connection)) % {"retries": session.num_tries })
session.save()
return True
# create an entry for this response
# first have to know what sequence number to insert
ids = Entry.objects.all().filter(session=session).order_by('sequence_id').values_list('sequence_id', flat=True)
if ids:
# not sure why pop() isn't allowed...
sequence = ids[len(ids) -1] + 1
else:
sequence = 1
entry = Entry(session=session,sequence_id=sequence,transition=found_transition,text=msg.text)
entry.save()
self.debug("entry %s saved" % entry)
# advance to the next question, or remove
# this caller's state if there are no more
# this might be "None" but that's ok, it will be the
# equivalent of ending the session
session.state = found_transition.next_state
session.num_tries = 0
session.save()
# if this was the last message, end the session,
# and also check if the tree has a defined
# completion text and if so send it
if not session.state:
self._end_session(session)
if session.tree.completion_text:
msg.respond(_(session.tree.completion_text, get_language_code(session.connection)))
# if there is a next question ready to ask
# send it along
self._send_question(session, msg)
# if we haven't returned long before now, we're
# long committed to dealing with this message
return True
def start_tree(self, tree, connection, msg=None):
'''Initiates a new tree sequence, terminating any active sessions'''
self.end_sessions(connection)
session = Session(connection=connection,
tree=tree, state=tree.root_state, num_tries=0)
session.save()
self.debug("new session %s saved" % session)
# also notify any session listeners of this
# so they can do their thing
if self.session_listeners.has_key(tree.trigger):
for func in self.session_listeners[tree.trigger]:
func(session, False)
self._send_question(session, msg)
def _send_question(self, session, msg=None):
'''Sends the next question in the session, if there is one'''
state = session.state
if state and state.question:
response = _(state.question.text, get_language_code(session.connection))
self.info("Sending: %s" % response)
if msg:
msg.respond(response)
else:
# we need to get the real backend from the router
# to properly send it
real_backend = self.router.get_backend(session.connection.backend.slug)
if real_backend:
connection = Connection(real_backend, session.connection.identity)
outgoing_msg = OutgoingMessage(connection, response)
outgoing_msg.send()
else:
# todo: do we want to fail more loudly than this?
error = "Can't find backend %s. Messages will not be sent" % connection.backend.slug
self.error(error)
def _end_session(self, session, canceled=False):
'''Ends a session, by setting its state to none,
and alerting any session listeners'''
session.state = None
session.canceled = canceled
session.save()
if self.session_listeners.has_key(session.tree.trigger):
for func in self.session_listeners[session.tree.trigger]:
func(session, True)
def end_sessions(self, connection):
''' Ends all open sessions with this connection.
does nothing if there are no open sessions '''
sessions = Session.objects.filter(connection=connection).exclude(state=None)
for session in sessions:
self._end_session(session, True)
def register_custom_transition(self, name, function):
''' Registers a handler for custom logic within a
state transition '''
self.info("Registering keyword: %s for function %s" %(name, function.func_name))
self.registered_functions[name] = function
def set_session_listener(self, tree_key, function):
'''Adds a session listener to this. These functions
get called at the beginning and end of every session.
The contract of the function is func(Session, is_ending)
where is_ending = false at the start and true at the
end of the session.
'''
self.info("Registering session listener %s for tree %s" %(function.func_name, tree_key))
# I can't figure out how to deal with duplicates, so only allowing
# a single registered function at a time.
#
# if self.session_listeners.has_key(tree_key):
# # have to check existence. This is mainly for the unit tests
# if function not in self.session_listeners[tree_key]:
# self.session_listeners[tree_key].append(function)
# else:
self.session_listeners[tree_key] = [function]
def matches(self, answer, message):
answer_value = message.text
'''returns True if the answer is a match for this.'''
if not answer_value:
return False
if answer.type == "A":
return answer_value.lower() == answer.answer.lower()
elif answer.type == "R":
return re.match(answer.answer, answer_value, re.IGNORECASE)
elif answer.type == "C":
if self.registered_functions.has_key(answer.answer):
return self.registered_functions[answer.answer](message)
else:
raise Exception("Can't find a function to match custom key: %s", answer)
raise Exception("Don't know how to process answer type: %s", answer.type)