diff --git a/raftos/replicator.py b/raftos/replicator.py index f9ae478..f280e0d 100644 --- a/raftos/replicator.py +++ b/raftos/replicator.py @@ -1,6 +1,21 @@ +import asyncio +import functools + from .state import State +def atomic_method(func): + + @functools.wraps(func) + async def wrapped(self, *args, **kwargs): + with await self.lock: + result = await func(self, *args, **kwargs) + + return result + + return wrapped + + class Replicated: """ Replication class makes sure data changes are all applied to State Machine @@ -10,6 +25,7 @@ class Replicated: DEFAULT_VALUE = None def __init__(self, name, default='REPLICATED_DEFAULT'): + self.lock = asyncio.Lock() self.name = name # For subclasses like ReplicatedDict @@ -50,6 +66,7 @@ class ReplicatedDict(ReplicatedContainer): DEFAULT_VALUE = {} + @atomic_method async def update(self, kwargs): data = await self.get() data.update(kwargs) @@ -67,12 +84,14 @@ async def items(self): data = await self.get() return data.items() + @atomic_method async def pop(self, key, default): data = await self.get() item = data.pop(key, default) await self.set(data) return item + @atomic_method async def delete(self, key): data = await self.get() del data[key] @@ -84,11 +103,13 @@ class ReplicatedList(ReplicatedContainer): DEFAULT_VALUE = [] + @atomic_method async def append(self, kwargs): data = await self.get() data.append(kwargs) await self.set(data) + @atomic_method async def extend(self, lst): data = await self.get() data.extend(lst) diff --git a/setup.py b/setup.py index 54c5d2f..6a9618a 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup -__version__ = '0.2' +__version__ = '0.2.1' short_description = 'Raft replication in Python' requirements = [req.strip() for req in open('requirements.txt').readlines()]