-
Notifications
You must be signed in to change notification settings - Fork 0
/
kvs.py
330 lines (302 loc) · 8.73 KB
/
kvs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# Author: Hansheng Zhao <[email protected]> (https://www.zhs.me)
try:
import anydbm as dbm
except ImportError:
import dbm
# import directive
__all__ = (
'__author__', '__license__', '__version__', 'KVS'
)
# package metadata
__author__ = 'Hansheng Zhao'
__license__ = 'BSD-2-Clause + MIT'
__version__ = '1.0.0'
class KVS(object):
""" KVS class for K-V pair storage """
# statically defined attributes
__slots__ = ('_database', '_serialize')
def __init__(
self, database = ':memory:',
serialize = None, **kwargs
):
"""
KVStore class constructor
:param database: object, database path or instance
:param serialize: object|None, the serializer
:param kwargs: other arguments
"""
# import
from seco import SeCo
# check if using on-disk|in-memory|external store
if isinstance(database, (str, bytes, bytearray)):
# convert bytes and byte-array into string
database = database.decode(encoding = 'UTF8') \
if isinstance(database, (bytes, bytearray)) \
else database
# check if using in-memory storage
if database.lower() == ':memory:':
self._database = {}
else:
flag = kwargs['flag'] \
if 'flag' in kwargs else 'c'
self._database = \
dbm.open(database, flag, **kwargs)
# any object is now considered database
elif isinstance(database, dict) or True:
# using external storage
self._database = database
# initialize serializer
if serialize is not None:
self._serialize = serialize
else:
self._serialize = SeCo(
serialize = 'json', compress = 'zlib'
)
def __call__(self, key, value = None, *args, **kwargs):
"""
Call method for setting & getting item
:param key: mixed, any hashable key
:param value: mixed|None, any object
:param args: mixed, the arguments
:param kwargs: dict, the kw arguments
:return: None|mixed
"""
if value is None:
# read from kvstore if value not provided
return self.__getitem__(key, *args, **kwargs)
else:
# set item into kvstore instead
self.__setitem__(key, value, *args, **kwargs)
def __del__(self):
"""
Sync and close database connection if not in-memory
:return: None
"""
self.sync()
self.close()
@staticmethod
def _convert(key):
"""
Convert key to supported format
:param key: mixed, any hashable object
:return: bytes, bytes representation of key
"""
# check if key is supported or hashable
# supports only bytes by default
if isinstance(key, bytes):
return key
# encode string type
elif isinstance(key, str):
return key.encode(encoding = 'UTF8')
# change byte-array into bytes
elif isinstance(key, bytearray):
return bytes(key)
# convert numeric into string
elif isinstance(key, (int, float, complex)):
return str(key).encode(encoding = 'UTF8')
# convert hashable collections into string
elif isinstance(key, (range, tuple, frozenset)):
return str(key).encode(encoding = 'UTF8')
# other types not supported
else:
raise TypeError('Unsupported type.')
def __contains__(self, key):
"""
Dict-like object contains method
:param key: mixed, any hashable key
:return: bool, whether item exists
"""
database = self._database
key = self._convert(key)
return key in database \
if hasattr(database, '__contains__') \
else self.__getitem__(key) is not None
def __setitem__(self, key, value, *args, **kwargs):
"""
Dict-like object setitem method
:param key: mixed, any hashable key
:param value: mixed, any object
:param args: mixed, the arguments
:param kwargs: dict, the kw arguments
:return: None
"""
# acquire database and lint key
database = self._database
key = self._convert(key)
# check if value is not None
if value is not None:
# serialize and compress the value
value = self._serialize.dumps(value)
# set item into database
if hasattr(database, 'set'):
database.set(key, value, *args, **kwargs)
elif hasattr(database, 'put'):
database.put(key, value, *args, **kwargs)
else:
database.__setitem__(key, value)
else:
# delete item from database instead
self.__delitem__(key, *args, **kwargs)
# aliases for setitem method
set = put = __setitem__
def __getitem__(self, key, *args, **kwargs):
"""
Dict-like object getitem method
:param key: mixed, any hashable key
:param args: mixed, the arguments
:param kwargs: dict, the kw arguments
:return: mixed
"""
# acquire database and lint key
database = self._database
key = self._convert(key)
# acquire item from database
value = database.get(key, *args, **kwargs) \
if hasattr(database, 'get') \
else database.__getitem__(key)
return self._serialize.loads(value) \
if value else None
# aliases for getitem method
get = __getitem__
def __delitem__(self, key, *args, **kwargs):
"""
Dict-like object delitem method
:param key: mixed, any hashable key
:param args: mixed, the arguments
:param kwargs: dict, the kw arguments
:return: None
"""
# acquire database and lint key
database = self._database
key = self._convert(key)
# attempt to remove item from database
try:
return database.delete(
key, *args, **kwargs
) if hasattr(database, 'delete') \
else database.__delitem__(key)
except KeyError:
return None
# aliases for delitem method
delete = __delitem__
def __setattr__(self, key, value):
"""
Proxy for saving item to database
:param key: str, the kv-pair key
:param value: mixed, the value
:return: None
"""
try:
# attempts to modify the object first
super(KVS, self).__setattr__(key, value)
except AttributeError:
# set item into database instead
self.__setitem__(key, value)
def __getattr__(self, key):
"""
Proxy for getting item from database
:param key: str, the attribute name
:return: mixed, function or value
"""
try:
# attempt to acquire attributes first
return self._database.__getattribute__(key)
except AttributeError:
# return database item instead
return self.__getitem__(key)
def __delattr__(self, key):
"""
Proxy for deleting item from database
:param key: str, the kv-pair key
:return: None
"""
try:
# attempt to delete attribute first
super(KVS, self).__delattr__(key)
except AttributeError:
# delete from database
self.__delitem__(key)
def pop(self, key):
"""
Pop a value from the kv-store
:param key: mixed, any hashable key
:return: mixed
"""
value = self.__getitem__(key)
self.__delitem__(key)
return value
def keys(self):
"""
Acquire all the keys in database
:return: collection|generator, keys
"""
yield from self._database.keys() \
if hasattr(self._database, 'keys') \
else ()
def values(self):
"""
Acquire all the values in database
:return: collection|generator, values
"""
# check if method already exists
if hasattr(self._database, 'values'):
yield from map(
lambda _: self._serialize.loads(_),
self._database.values()
)
else:
# create generator for values
for key in self.keys():
yield self.__getitem__(key)
def items(self):
"""
Acquire k-v pairs in database
:return: collection| generator, tuples
"""
# check if method already exists
if hasattr(self._database, 'items'):
yield from map(lambda _: (
_[0], self._serialize.loads(_[1])
), self._database.items())
else:
# create generator for items
for key in self.keys():
yield (key, self.__getitem__(key))
def sync(self):
"""
Commit changes to disk
:return: None
"""
# check if is in-memory mode
if hasattr(self._database, 'sync'):
self._database.sync()
def optimize(self):
"""
Optimize GDBM database
:return: None
"""
if hasattr(self._database, 'reorganize'):
self._database.reorganize()
def close(self):
"""
Close database connection
:return: None
"""
self.sync()
if hasattr(self._database, 'close'):
self._database.close()
def clear(self):
"""
Clear database content
:return: None
"""
if hasattr(self._database, 'clear'):
self._database.clear()
elif hasattr(self._database, 'flushall'):
self._database.flushall()
elif hasattr(self._database, 'flush_all'):
self._database.flush_all()
else:
for key in self.keys():
self.__delitem__(key)
self.optimize()