forked from Tribler/dispersy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
requestcache.py
233 lines (174 loc) · 8.2 KB
/
requestcache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from random import random
import logging
from twisted.internet import reactor
from twisted.python.threadable import isInIOThread
from .taskmanager import TaskManager
class NumberCache(object):
def __init__(self, request_cache, prefix, number):
assert isinstance(number, (int, long)), type(number)
assert isinstance(prefix, unicode), type(prefix)
super(NumberCache, self).__init__()
self._logger = logging.getLogger(self.__class__.__name__)
if request_cache.has(prefix, number):
raise RuntimeError("This number is already in use '%s'" % number)
self._prefix = prefix
self._number = number
@property
def prefix(self):
return self._prefix
@property
def number(self):
return self._number
@property
def timeout_delay(self):
return 10.0
def on_timeout(self):
raise NotImplementedError()
def __str__(self):
return "<%s %s-%d>" % (self.__class__.__name__, self.prefix, self.number)
class RandomNumberCache(NumberCache):
def __init__(self, request_cache, prefix):
assert isinstance(prefix, unicode), type(prefix)
# find an unclaimed identifier
number = RandomNumberCache.find_unclaimed_identifier(request_cache, prefix)
super(RandomNumberCache, self).__init__(request_cache, prefix, number)
@classmethod
def find_unclaimed_identifier(cls, request_cache, prefix):
for _ in xrange(1000):
number = int(random() * 2 ** 16)
if not request_cache.has(prefix, number):
break
else:
raise RuntimeError("Could not find a number that isn't in use")
return number
class SignatureRequestCache(RandomNumberCache):
def __init__(self, request_cache, members, response_func, response_args, timeout):
super(SignatureRequestCache, self).__init__(request_cache, u"signature-request")
self.request = None
# MEMBERS is a list containing all the members that should add their signature. currently
# we only support double signed messages, hence MEMBERS contains only a single Member
# instance.
self.members = members
self.response_func = response_func
self.response_args = response_args
self._timeout_delay = timeout
@property
def timeout_delay(self):
return self._timeout_delay
def on_timeout(self):
self._logger.debug("signature timeout")
self.response_func(self, None, True, *self.response_args)
class IntroductionRequestCache(RandomNumberCache):
@property
def timeout_delay(self):
# we will accept the response at most 10.5 seconds after our request
return 10.5
def __init__(self, community, helper_candidate):
super(IntroductionRequestCache, self).__init__(community.request_cache, u"introduction-request")
self.community = community
self.helper_candidate = helper_candidate
self.response_candidate = None
self.puncture_candidate = None
self._introduction_response_received = False
self._puncture_received = False
def on_timeout(self):
if not self._introduction_response_received:
# helper_candidate did not respond to a request message in this
# community. The obsolete candidates will be removed by the
# dispersy_get_walk_candidate() in community.
self._logger.debug("walker timeout for %s", self.helper_candidate)
self.community.dispersy.statistics.walk_failure_count += 1
self.community.dispersy.statistics.dict_inc(u"walk_failure_dict", self.helper_candidate.sock_addr)
# set the walk repsonse to be invalid
self.helper_candidate.walk_response(-1.0)
def _check_if_both_received(self):
if self._introduction_response_received and self._puncture_received:
self.community.request_cache.pop(self.prefix, self.number)
def on_introduction_response(self):
self._introduction_response_received = True
self._check_if_both_received()
def on_puncture(self):
self._puncture_received = True
self._check_if_both_received()
class RequestCache(TaskManager):
def __init__(self):
"""
Creates a new RequestCache instance.
"""
super(RequestCache, self).__init__()
assert isInIOThread(), "RequestCache must be used on the reactor's thread"
self._logger = logging.getLogger(self.__class__.__name__)
self._identifiers = dict()
def add(self, cache):
"""
Add CACHE into this RequestCache instance.
Returns CACHE when CACHE.identifier was not yet added, otherwise returns None.
"""
assert isInIOThread(), "RequestCache must be used on the reactor's thread"
assert isinstance(cache, NumberCache), type(cache)
assert isinstance(cache.number, (int, long)), type(cache.number)
assert isinstance(cache.prefix, unicode), type(cache.prefix)
assert isinstance(cache.timeout_delay, float), type(cache.timeout_delay)
assert cache.timeout_delay > 0.0, cache.timeout_delay
identifier = self._create_identifier(cache.number, cache.prefix)
if identifier in self._identifiers:
self._logger.error("add with duplicate identifier \"%s\"", identifier)
return None
else:
self._logger.debug("add %s", cache)
self._identifiers[identifier] = cache
self.register_task(cache, reactor.callLater(cache.timeout_delay, self._on_timeout, cache))
return cache
def has(self, prefix, number):
"""
Returns True when IDENTIFIER is part of this RequestCache.
"""
assert isInIOThread(), "RequestCache must be used on the reactor's thread"
assert isinstance(number, (int, long)), type(number)
assert isinstance(prefix, unicode), type(prefix)
return self._create_identifier(number, prefix) in self._identifiers
def get(self, prefix, number):
"""
Returns the Cache associated with IDENTIFIER when it exists, otherwise returns None.
"""
assert isInIOThread(), "RequestCache must be used on the reactor's thread"
assert isinstance(number, (int, long)), type(number)
assert isinstance(prefix, unicode), type(prefix)
return self._identifiers.get(self._create_identifier(number, prefix))
def pop(self, prefix, number):
"""
Returns the Cache associated with IDENTIFIER, and removes it from this RequestCache, when it exists, otherwise
raises a KeyError exception.
"""
assert isInIOThread(), "RequestCache must be used on the reactor's thread"
assert isinstance(number, (int, long)), type(number)
assert isinstance(prefix, unicode), type(prefix)
identifier = self._create_identifier(number, prefix)
cache = self._identifiers.pop(identifier)
self.cancel_pending_task(cache)
return cache
def _on_timeout(self, cache):
"""
Called CACHE.timeout_delay seconds after CACHE was added to this RequestCache.
_on_timeout is called for every Cache, except when it has been popped before the timeout expires. When called
_on_timeout will CACHE.on_timeout().
"""
assert isInIOThread(), "RequestCache must be used on the reactor's thread"
assert isinstance(cache, NumberCache), type(cache)
self._logger.debug("timeout on %s", cache)
cache.on_timeout()
# the on_timeout call could have already removed the identifier from the cache using pop
identifier = self._create_identifier(cache.number, cache.prefix)
if identifier in self._identifiers:
del self._identifiers[identifier]
self.cancel_pending_task(cache)
def _create_identifier(self, number, prefix):
return u"%s:%d" % (prefix, number)
def clear(self):
"""
Clear the cache, canceling all pending tasks.
"""
assert isInIOThread(), "RequestCache must be used on the reactor's thread"
self._logger.debug("Clearing %s [%s]", self, len(self._identifiers))
self.cancel_all_pending_tasks()
self._identifiers.clear()