From 93415e13a071d777ae78e55f32bc4d252136c994 Mon Sep 17 00:00:00 2001 From: Mosquito Date: Mon, 9 Jan 2017 14:30:48 +0300 Subject: [PATCH 1/2] ujson support and multiproccess example runner (#9) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [fea] ujson support 😀 * [fea] remove sh script and make it debuggable * [fix] remove detached process * [fix] return the sh script --- examples/run_cluster.py | 89 +++++++++++++++++++++++++++++++++++++++++ raftos/serializers.py | 6 ++- 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100755 examples/run_cluster.py diff --git a/examples/run_cluster.py b/examples/run_cluster.py new file mode 100755 index 0000000..cd215b3 --- /dev/null +++ b/examples/run_cluster.py @@ -0,0 +1,89 @@ +#!/usr/bin/bin python3 +import os +import logging +import asyncio +import random +import raftos +import raftos.serializers +from argparse import ArgumentParser +from datetime import datetime +from multiprocessing import Process + + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger() + + +CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class Class: + data = raftos.Replicated(name='data') + + +def main(log_dir, node, cluster): + loop = asyncio.new_event_loop() + + raftos.configure({ + 'log_path': log_dir, + 'serializer': raftos.serializers.JSONSerializer, + 'loop': loop + }) + + loop.run_until_complete(run(loop, node, cluster)) + + +async def run(loop, node, cluster): + await raftos.register(node, cluster=cluster, loop=loop) + + obj = Class() + + while True: + await asyncio.sleep(5, loop=loop) + + if raftos.get_leader() == node: + obj.data = { + 'id': random.randint(1, 1000), + 'data': { + 'amount': random.randint(1, 1000) * 1000, + 'created_at': datetime.now().strftime('%d/%m/%y %H:%M') + } + } + + +if __name__ == '__main__': + parser = ArgumentParser() + + parser.add_argument('-p', '--start-port', help='Start port', type=int, default=8000) + parser.add_argument('-n', '--processes', help='Cluster size', type=int, default=3) + parser.add_argument('-d', '--log-dir', default=os.path.abspath('logs'), + dest='log_dir', help="Log dir") + + args = parser.parse_args() + + os.makedirs(args.log_dir, exist_ok=True) + + neighbours = set( + "127.0.0.1:{}".format(args.start_port + i) for i in range(args.processes) + ) + + processes = set([]) + + try: + for neighbour in neighbours: + node_args = (args.log_dir, neighbour, neighbours - {neighbour}) + p = Process(target=main, args=node_args) + log.info("%r", node_args) + + p.start() + processes.add(p) + + while processes: + for process in tuple(processes): + process.join() + processes.remove(process) + finally: + for process in processes: + if process.is_alive(): + log.warning('Terminating %r', process) + process.terminate() diff --git a/raftos/serializers.py b/raftos/serializers.py index 60954ff..5745375 100644 --- a/raftos/serializers.py +++ b/raftos/serializers.py @@ -1,6 +1,10 @@ -import json import msgpack +try: + import ujson as json +except ImportError: + import json + class JSONSerializer: @staticmethod From 5014fc9bff52d28617a547c14f0a5f6fe333f574 Mon Sep 17 00:00:00 2001 From: Alistair Lynn Date: Thu, 12 Jan 2017 22:20:30 +0000 Subject: [PATCH 2/2] Use rsplit to only split on a single `:` (#10) This is needed for IPv6 literals which have colons. --- raftos/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raftos/server.py b/raftos/server.py index dbbb8d5..148366f 100644 --- a/raftos/server.py +++ b/raftos/server.py @@ -14,12 +14,12 @@ async def register(*address_list, cluster=None, loop=None): loop = loop or asyncio.get_event_loop() for address in address_list: - host, port = address.split(':') + host, port = address.rsplit(':', 1) node = Node(address=(host, int(port)), loop=loop) await node.start() for address in cluster: - host, port = address.split(':') + host, port = address.rsplit(':', 1) port = int(port) if (host, port) != (node.host, node.port):