-
Notifications
You must be signed in to change notification settings - Fork 0
/
datastore.py
363 lines (313 loc) · 12.1 KB
/
datastore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
import sys
import time
import threading
import re
import codecs
try:
## use included json in > Python 2.6
import json
except ImportError:
try:
import simplejson as json
except ImportError:
print >>sys.stderr, "apt-get install python-simplejson"
sys.exit(1)
try:
from apscheduler.scheduler import Scheduler as apscheduler
except ImportError:
print >> sys.stderr, "apt-get install python-apscheduler"
sys.exit(1)
class datastore(object):
"""
Datastore Instance
"""
def __init__(self,parent):
####################################################
## Function: __init__
## Parameter:
## WireGateInstance
## Description:
## Contructor for the DATASTORE instance
##
####################################################
self._parent = parent
if parent:
self.WG = parent.WG
self.log("DATASTORE starting up")
self.DBLOADED = False
self.dataobjects = {}
self.CYCLER = apscheduler()
self.CYCLER.start()
self.locked = threading.RLock()
self.locked.acquire()
## Load JSON Database
self.load()
def update(self,id,val,connector=False):
## Update the communication Object with value
####################################################
## Function: update
## Parameter:
## id: Connector specific id
## val: Value that should be set in the Datastoreobject
## Description:
## update or create a Datastoreobject
## schould be used by all connectors to set their Values
####################################################
##
## get the Datastore object
obj = self.get(id,connector=connector)
self.debug("Updating %s (%s): %r" % (obj.name,id,val))
## Set the value of the object
obj.setValue(val,source=connector)
##TODO: central subscriber function for other connectore or servers
obj.sendConnected()
## return the object for additional updates
return obj
def get(self,id,connector=False):
####################################################
## Function: get
## Parameter:
## id: the id to look for in the Datastore
## Description:
## returns or create and returns the Dataobejct with ID id
##
####################################################
self.locked.acquire()
try:
## check for existence
type(self.dataobjects[id])
except KeyError:
## create a new one if it don't exist
self.dataobjects[id] = dataObject(self,id)
if hasattr(connector,'get_ds_defaults'):
self.dataobjects[id].config = connector.get_ds_defaults(id)
## return it
self.locked.release()
return self.dataobjects[id]
def namespaceRead(self,namespace):
return filter(lambda x,y=namespace: x.namespace == y,self.dataobjects.values())
def load(self):
self.debug("load DATASTORE")
try:
db = codecs.open(self.WG.config['WireGate']['datastore'],"r",encoding='UTF-8')
data = db.read()
loaddict = json.loads(data)
db.close()
for name, obj in loaddict.items():
self.dataobjects[name] = dataObject(self,obj['id'],obj['name'])
self.dataobjects[name].lastupdate = obj['lastupdate']
self.dataobjects[name].config = obj['config']
self.dataobjects[name].connected = obj['connected']
self.dataobjects[name].value = obj['value']
self.debug("%d entries loaded in DATASTORE" % len(self.dataobjects))
self.DBLOADED = True
except IOError:
## no DB File
pass
except ValueError,e:
## empty DB File
self.DBLOADED = True
result = re.findall(r"line\s(\d+)\s", e.message)
if result:
## only on json errors not when db is empty
lnum = int(result[0])
print "Can't load Datastore"
data = data.split("\n")
lines = data[lnum -10:lnum +10]
for line in range(lnum-10,lnum+10):
if line == lnum-1:
print "--" + data[line]
else:
print "##" + data[line]
sys.exit(1)
except:
self.WG.errorlog()
## error
pass
self.locked.release()
def save(self):
self.debug("save DATASTORE")
if not self.DBLOADED:
self.debug("No valid config, not saving")
return False
self.locked.acquire()
savedict = {}
## FIXME: user create a __reduce__ method for the Datastoreitem object
for name,obj in self.dataobjects.items():
savedict[name] = {
'name' : obj.name,
'id' : obj.id,
'value' : obj.value,
'lastupdate' : obj.lastupdate,
'config' : obj.config,
'connected' : obj.connected
}
dbfile = codecs.open(self.WG.config['WireGate']['datastore'],"w",encoding='UTF-8')
utfdb = json.dumps(savedict,dbfile,ensure_ascii=False,sort_keys=True,indent=3)
dbfile.write(utfdb)
dbfile.close()
def shutdown(self):
self.CYCLER.shutdown()
self.save()
def debug(self,msg):
####################################################
## Function: debug
## Parameter:
## msg: a message object, could be str/float/dict whateverk
## Description:
## Debugging either to logobject or can be changed to log to stdout
####################################################
self.log(msg,'debug')
## Central logging
def log(self,msg,severity='info',instance=False):
self.WG.log(msg,severity,"datastore")
class dataObject(object):
def __init__(self,parent,id,name=False):
self._parent = parent
if parent:
self.WG = parent.WG
## Threadlocking
self.write_mutex = threading.RLock()
self.read_mutex = threading.RLock()
## check for namespace
namespace = id.split(":",1)
if len(namespace)>1:
namespace = namespace[0]
else:
## Fixme: maybe default Namespace
namespace = "UNKNOWN"
self.namespace = namespace
if not name:
## Initial Name
self.name = u"%s:unbekannt-%s" % (namespace, time.strftime("%Y-%m-%d_%H:%M:%S"))
else:
self.name = name
## set Name to function for Scheduler
if type(self.name) <> unicode:
## guess that non unicode is default encoded
self.name = name.decode(self.WG.config['WireGate']['defaultencoding'])
## some defaults
self.value = None
self.lastupdate = 0
self.lastsource = None
self.id = id
## connector specific vars
self.config = {}
self.cyclestore = []
self._cyclejob = False
## connected Logics, communication objects ... goes here
self.connected = []
def __repr__(self):
return json.dumps({
'name' : self.name,
'id' : self.id,
'value' : self.value,
'lastupdate' : self.lastupdate,
'config' : self.config,
'connected' : self.connected
})
def __str__(self):
return "%s - %s" % (self.id,self.name)
def _setValue(self,refered_self):
## self override
## override with connector send function
if self.namespace:
try:
self.write_mutex.acquire()
#self._setValue = self.WG.connectors[self.namespace].setValue
self.WG.connectors[self.namespace].setValue(refered_self)
finally:
self.write_mutex.release()
def setValue(self,val,send=False,source=None):
if 'sendcycle' in self.config and self.value <> None:
if not self._cyclejob:
self._parent.debug("start Cycle ID: %s" % self.id)
cycletime = float(self.config['sendcycle']) + self.lastupdate - time.time()
if cycletime < 0.0:
cycletime = 0
## add value to List
self.cyclestore.append(val)
##
repeat = 1
if 'always' in self.config['sendcycleoption'].split(","):
repeat = 0
self.write_mutex.acquire()
self._cyclejob = self.WG.DATASTORE.CYCLER.add_interval_job(self._cycle,seconds=cycletime,repeat=repeat)
self.write_mutex.release()
else:
self._parent.debug("ignore Cycle ID: %s" % self.id)
self.cyclestore.append(val)
else:
self._real_setValue(val,send,source=source)
def _real_setValue(self,val,send,source=None):
try:
## get read lock
self.read_mutex.acquire()
## get write lock
self.write_mutex.acquire()
## save the modified time
self.lastupdate = time.time()
self.lastsource = source
if type(val) == str:
self.WG.log("Non Unicode Value received for %s" % self.id,'warning')
val = unicode(val,encoding=self.WG.config['WireGate']['defaultencoding'],errors='ignore')
self.value = val
if send:
self._setValue(self)
finally:
## release locks
self.write_mutex.release()
self.read_mutex.release()
def sendConnected(self):
self.read_mutex.acquire()
val = self.getValue()
self.read_mutex.release()
for attached in self.connected:
try:
self.WG.DATASTORE.get(attached).setValue(val,True,source=self)
except:
self.WG.log("sendconnected failed for %s" % attached,'error')
__import__('traceback').print_exc(file=__import__('sys').stdout)
def getValue(self,other=''):
try:
## get read lock
self.read_mutex.acquire()
ret = self.value
if other == 'lastupdate':
ret = [ret,self.lastupdate]
return ret
finally:
## release lock
self.read_mutex.release()
def _cycle(self):
self._parent.debug("----------------------execute Cycle ID: %s" % self.id)
val = None
cycleopts = []
if 'sendcycleoption' in self.config:
cycleopts = self.config['sendcycleoption'].split(",")
## average only possible with data in cyclesotre else DivisionByZero
if len(self.cyclestore) > 0:
## Average/Min/Max only possible with int and float
if type(self.value) in (int,float):
val = type(self.value)(0)
if 'average' in cycleopts:
for i in self.cyclestore:
val += i
val = val / len(self.cyclestore)
self._parent.debug("Cycle ID: %s average: %f (%r)" % (self.id, val, self.cyclestore ))
## default use last
else:
val = self.cyclestore.pop()
else:
val = self.cyclestore.pop()
if 'always' in cycleopts:
if val == None:
val = self.getValue()
else:
self.write_mutex.acquire()
self._cyclejob = False
self.write_mutex.release()
## reset cyclestore
self.cyclestore = []
if val <> None:
self._real_setValue(val,False)