Skip to content

Commit

Permalink
Rename client to pool for backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
rossnomann committed Jul 4, 2022
1 parent a654fc0 commit 35109a5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 33 deletions.
2 changes: 1 addition & 1 deletion aioworkers_redis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def set_config(self, config):
super().set_config(cfg)

@property
def client(self) -> Redis:
def pool(self) -> Redis:
connector = self._connector or self._get_connector()
assert connector._client is not None, 'Client is not ready'
return connector._client
Expand Down
24 changes: 12 additions & 12 deletions aioworkers_redis/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@ def factory(self, item, config=None):

async def put(self, value):
value = self.encode(value)
return await self.client.rpush(self.key, value)
return await self.pool.rpush(self.key, value)

async def get(self, *, timeout=0):
async with self._lock:
result = await self.client.blpop(self.key, timeout)
result = await self.pool.blpop(self.key, timeout)
if timeout and result is None:
raise TimeoutError
value = self.decode(result[-1])
return value

async def length(self):
return await self.client.llen(self.key)
return await self.pool.llen(self.key)

async def list(self):
return [self.decode(i) for i in await self.client.lrange(self.key, 0, -1)]
return [self.decode(i) for i in await self.pool.lrange(self.key, 0, -1)]

async def remove(self, value):
value = self.encode(value)
await self.client.lrem(self.key, 0, value)
await self.pool.lrem(self.key, 0, value)

async def clear(self):
return await self.client.delete(self.key)
return await self.pool.delete(self.key)


class BaseZQueue(Queue):
Expand All @@ -48,27 +48,27 @@ class BaseZQueue(Queue):
async def put(self, value):
score, val = value
val = self.encode(val)
return await self.client.zadd(self.key, {val: score})
return await self.pool.zadd(self.key, {val: score})

async def get(self):
async with self._lock:
while True:
lv = await self.client.eval(self.script, 1, self.key)
lv = await self.pool.eval(self.script, 1, self.key)
if lv:
break
await asyncio.sleep(self.config.timeout)
value, score = lv
return float(score), self.decode(value)

async def length(self):
return await self.client.zcard(self.key)
return await self.pool.zcard(self.key)

async def list(self):
return [self.decode(i) for i in await self.client.zrange(self.key, 0, -1)]
return [self.decode(i) for i in await self.pool.zrange(self.key, 0, -1)]

async def remove(self, value):
value = self.encode(value)
await self.client.zrem(self.key, value)
await self.pool.zrem(self.key, value)


@score_queue('time.time')
Expand Down Expand Up @@ -97,7 +97,7 @@ class TimestampZQueue(BaseZQueue):
async def get(self):
async with self._lock:
while True:
lv = await self.client.eval(self.script, 1, self.key, time.time())
lv = await self.pool.eval(self.script, 1, self.key, time.time())
if lv:
break
await asyncio.sleep(self.config.timeout)
Expand Down
40 changes: 20 additions & 20 deletions aioworkers_redis/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ def set_config(self, config):
)

async def list(self):
keys = await self.client.keys(self.raw_key('*'))
keys = await self.pool.keys(self.raw_key('*'))
return [self.clean_key(i) for i in keys]

async def length(self):
keys = await self.client.keys(self.raw_key('*'))
keys = await self.pool.keys(self.raw_key('*'))
return len(keys)

async def set(self, key, value):
Expand All @@ -31,21 +31,21 @@ async def set(self, key, value):
if not is_null:
value = self.encode(value)
if is_null:
return await self.client.delete(raw_key)
return await self.pool.delete(raw_key)
elif self._expiry:
return await self.client.setex(raw_key, self._expiry, value)
return await self.pool.setex(raw_key, self._expiry, value)
else:
return await self.client.set(raw_key, value)
return await self.pool.set(raw_key, value)

async def get(self, key):
raw_key = self.raw_key(key)
value = await self.client.get(raw_key)
value = await self.pool.get(raw_key)
if value is not None:
return self.decode(value)

async def expiry(self, key, expiry):
raw_key = self.raw_key(key)
await self.client.expire(raw_key, expiry)
await self.pool.expire(raw_key, expiry)


class HashStorage(FieldStorageMixin, Storage):
Expand All @@ -57,9 +57,9 @@ async def set(self, key, value, *, field=None, fields=None):
if value is None:
to_del.append(field)
else:
return await self.client.hset(raw_key, field, self.encode(value))
return await self.pool.hset(raw_key, field, self.encode(value))
elif value is None:
return await self.client.delete(raw_key)
return await self.pool.delete(raw_key)
else:
pairs = {}
for f in fields or value:
Expand All @@ -69,23 +69,23 @@ async def set(self, key, value, *, field=None, fields=None):
else:
pairs[f] = self.encode(v)
if pairs:
await self.client.hset(raw_key, mapping=pairs)
await self.pool.hset(raw_key, mapping=pairs)
if to_del:
await self.client.hdel(raw_key, *to_del)
await self.pool.hdel(raw_key, *to_del)
if self._expiry:
await self.client.expire(raw_key, self._expiry)
await self.pool.expire(raw_key, self._expiry)

async def get(self, key, *, field=None, fields=None):
raw_key = self.raw_key(key)
if field:
return self.decode(await self.client.hget(raw_key, field))
return self.decode(await self.pool.hget(raw_key, field))
elif fields:
v = await self.client.hmget(raw_key, *fields)
v = await self.pool.hmget(raw_key, *fields)
m = self.model()
for f, v in zip(fields, v):
m[f] = self.decode(v)
else:
a = await self.client.hgetall(raw_key)
a = await self.pool.hgetall(raw_key)
m = self.model()
for f, v in a.items():
m[f.decode()] = self.decode(v)
Expand All @@ -95,15 +95,15 @@ async def get(self, key, *, field=None, fields=None):
class HyperLogLogStorage(KeyEntity, AbstractBaseStorage):
async def set(self, key, value=True):
assert value is True
await self.client.pfadd(self.key, key)
await self.pool.pfadd(self.key, key)

async def get(self, key):
tmp_key = self.raw_key('tmp:hhl:' + key)
await self.client.pfmerge(tmp_key, self.key)
result = await self.client.pfadd(tmp_key, key)
await self.client.delete(tmp_key)
await self.pool.pfmerge(tmp_key, self.key)
result = await self.pool.pfadd(tmp_key, key)
await self.pool.delete(tmp_key)
return result == 0

async def length(self):
c = await self.client.pfcount(self.key)
c = await self.pool.pfcount(self.key)
return c

0 comments on commit 35109a5

Please sign in to comment.