Skip to content

Commit

Permalink
descriptors removed
Browse files Browse the repository at this point in the history
  • Loading branch information
zhebrak committed Nov 21, 2016
1 parent 0b61094 commit 74ed141
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 81 deletions.
44 changes: 29 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,57 @@ pip install raftos
import raftos


raftos.register(
# node running on this machine
'127.0.0.1:8000',

# other servers
cluster=[
'127.0.0.1:8001',
'127.0.0.1:8002'
]
loop.create_task(
raftos.register(
# node running on this machine
'127.0.0.1:8000',

# other servers
cluster=[
'127.0.0.1:8001',
'127.0.0.1:8002'
]
)
)
loop.run_forever()
```

#### Data replication

```python
class Class:
data = raftos.Replicated(name='data')
counter = raftos.Replicated(name='counter')
data = raftos.ReplicatedDict(name='data')


obj = Class()

# value on a leader gets replicated to all followers
obj.data = {
await counter.set(42)
await data.update({
'id': 337,
'data': {
'amount': 20000,
'created_at': '7/11/16 18:45'
}
}
})
```

#### In case you only need consensus algorithm with leader election

```python
await raftos.wait_until_leader(current_node)
```
or
```python
if raftos.get_leader() == current_node:
# make request or respond to a client
```
or
```python
raftos.configure({
'on_leader': start,
'on_follower': stop
})
```

Whenever the leader falls, someone takes its place.


Expand Down
40 changes: 24 additions & 16 deletions examples/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,32 @@
import raftos


class Class:
data = raftos.Replicated(name='data')
async def run(node_id):
# Any replicated object with get/set functions
data_id = raftos.Replicated(name='data_id')

# Dict-like object: data.update(), data['key'] etc
data = raftos.ReplicatedDict(name='data')

def run(node_id):
obj = Class()
loop = asyncio.get_event_loop()
# List-like object: data_list.append(), data_list[-1]
data_list = raftos.ReplicatedList(name='data_list')

while True:
loop.run_until_complete(asyncio.sleep(5))

if raftos.get_leader() == node_id:
obj.data = {
'id': random.randint(1, 1000),
'data': {
'amount': random.randint(1, 1000) * 1000,
'created_at': datetime.now().strftime('%d/%m/%y %H:%M')
}
# We can also check if raftos.get_leader() == node_id
await raftos.wait_until_leader(node_id)

await asyncio.sleep(2)

current_id = random.randint(1, 1000)
data_map = {
str(current_id): {
'created_at': datetime.now().strftime('%d/%m/%y %H:%M')
}
}

await data_id.set(current_id)
await data.update(data_map)
await data_list.append(data_map)


if __name__ == '__main__':
Expand All @@ -41,5 +48,6 @@ def run(node_id):
'serializer': raftos.serializers.JSONSerializer
})

raftos.register(node, cluster=cluster)
run(node)
loop = asyncio.get_event_loop()
loop.create_task(raftos.register(node, cluster=cluster))
loop.run_until_complete(run(node))
17 changes: 8 additions & 9 deletions raftos/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
from .conf import configure, config
from .replicator import Replicated
from .replicator import Replicated, ReplicatedDict, ReplicatedList
from .server import register, stop
from .state import State, Leader
from .state import State


__all__ = [
'Replicated',
'ReplicatedDict',
'ReplicatedList',

'config',
'configure',
'register',
'stop',

'get_leader'
'get_leader',
'wait_until_leader'
]


def get_leader():
leader = State.leader
if isinstance(leader, Leader):
return leader.id

return leader
get_leader = State.get_leader
wait_until_leader = State.wait_until_leader
93 changes: 69 additions & 24 deletions raftos/replicator.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,86 @@
from .state import State


class ReplicatedObjectWrapper:
def __init__(self, value):
self.value = value

def get(self):
return self.value

def set(self, new_value):
self.value = new_value


class Replicated:
"""Replication descriptor makes sure data changes are all applied to State Machine"""

def __init__(self, name, default=None):
DEFAULT_VALUE = None
def __init__(self, name, default='REPLICATED_DEFAULT'):
self.name = name
self.replicated_object = ReplicatedObjectWrapper(default)

# For subclasses like ReplicatedDict
if default == 'REPLICATED_DEFAULT':
self.value = self.DEFAULT_VALUE
else:
self.value = default

self.in_memory = False

def __get__(self, obj, obj_type):
async def get(self):
# If we didn't set a value in this life cycle try to get it from State Machine
if not self.in_memory:
try:
self.replicated_object.set(
State.get_value(self.get_name(obj))
)
self.value = await State.get_value(self.name)
except KeyError:
pass

return self.replicated_object.get()
return self.value

def __set__(self, obj, value):
State.set_value(self.get_name(obj), value)
self.replicated_object.set(value)
async def set(self, value):
await State.set_value(self.name, value)
self.value = value
self.in_memory = True

def get_name(self, obj):
return '{}.{}'.format(obj.__class__.__name__, self.name)

class ReplicatedContainer(Replicated):
async def __getitem__(self, key):
return (await self.get()).__getitem__(key)

async def length(self):
data = await self.get()
return len(data)


class ReplicatedDict(ReplicatedContainer):
DEFAULT_VALUE = {}

async def update(self, kwargs):
data = await self.get()
data.update(kwargs)
await self.set(data)

async def keys(self):
data = await self.get()
return data.keys()

async def values(self):
data = await self.get()
return data.values()

async def items(self):
data = await self.get()
return data.items()

async def pop(self, key, defaul):
data = await self.get()
item = data.pop(key, default)
await self.set(data)
return item

async def delete(self, key):
data = await self.get()
del data[key]
await self.set(data)


class ReplicatedList(ReplicatedContainer):
DEFAULT_VALUE = []

async def append(self, kwargs):
data = await self.get()
data.append(kwargs)
await self.set(data)

async def extend(self, lst):
data = await self.get()
data.extend(lst)
await self.set(data)
9 changes: 4 additions & 5 deletions raftos/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .state import State


def register(*address_list, cluster=None, loop=None):
async def register(*address_list, cluster=None, loop=None):
"""Start Raft node (server)
Args:
address_list — 127.0.0.1:8000 [, 127.0.0.1:8001 ...]
Expand All @@ -16,7 +16,7 @@ def register(*address_list, cluster=None, loop=None):
for address in address_list:
host, port = address.split(':')
node = Node(address=(host, int(port)), loop=loop)
node.start()
await node.start()

for address in cluster:
host, port = address.split(':')
Expand Down Expand Up @@ -45,18 +45,17 @@ def __init__(self, address, loop):
self.requests = asyncio.Queue(loop=self.loop)
self.__class__.nodes.append(self)

def start(self):
async def start(self):
protocol = UDPProtocol(
queue=self.requests,
request_handler=self.request_handler,
loop=self.loop
)
address = self.host, self.port
task = asyncio.Task(
self.transport, _ = await asyncio.Task(
self.loop.create_datagram_endpoint(protocol, local_addr=address),
loop=self.loop
)
self.transport, _ = self.loop.run_until_complete(task)
self.state.start()

def stop(self):
Expand Down
Loading

0 comments on commit 74ed141

Please sign in to comment.