-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.py
159 lines (133 loc) · 5.44 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/python
"""Client code modified from phase 2."""
import argparse
import socket
import common
import common2
import time
def set_val(lockid, key, val):
"""set value on a server."""
svr_addr, svr_port = lockid.split(':')
msg = {'cmd': 'setr', 'key': key, 'val': val}
print 'set %s to %s' % (key, val)
return common.send_receive(svr_addr, svr_port, msg)
def find_svrs(key, hashes):
"""find the servers to set in."""
key_hash = common.hash_number(key)
len_ = len(hashes)
if len_ > 3:
for i in xrange(len_):
if key_hash < hashes[i][0]:
pass
else:
return [hashes[i], hashes[(i + 1) % len_],
hashes[(i + 2) % len_]]
return hashes[:3]
else:
return hashes
def setr_request(lockid, key):
"""Request votes from servers."""
svr_addr, svr_port = map(str, lockid.split(':'))
msg = {'cmd': 'setr_request', 'key': key}
return common.send_receive(svr_addr, svr_port, msg)
def setr_deny(svrs, key):
"""setr not successful."""
deny = {'cmd': 'setr_deny', 'key': key}
for svr in svrs:
svr_addr, svr_port = svr.split(':')
common.send_receive(svr_addr, svr_port, deny)
return {'status': 'setr not successful'}
def setr_accept(svrs, key, val):
"""setr successful."""
for svr in svrs:
set_val(svr, key, val)
return {'status': 'setr successful'}
def main():
"""Client entry point."""
parser = argparse.ArgumentParser()
parser.add_argument('--server', default='localhost')
parser.add_argument('--viewleader', default='localhost')
subparsers = parser.add_subparsers(dest='cmd')
parser_set = subparsers.add_parser('set')
parser_set.add_argument('key', type=str)
parser_set.add_argument('val', type=str)
parser_get = subparsers.add_parser('get')
parser_get.add_argument('key', type=str)
parser_print = subparsers.add_parser('print')
parser_print.add_argument('text', nargs="*")
parser_query = subparsers.add_parser('query_all_keys')
parser_server_query = subparsers.add_parser('query_servers')
parser_lock_get = subparsers.add_parser('lock_get')
parser_lock_get.add_argument('lockid', type=str)
parser_lock_get.add_argument('requestor', type=str)
parser_lock_get = subparsers.add_parser('lock_release')
parser_lock_get.add_argument('lockid', type=str)
parser_lock_get.add_argument('requestor', type=str)
parser_setr = subparsers.add_parser('setr')
parser_setr.add_argument('key', type=str)
parser_setr.add_argument('val', type=str)
parser_getr = subparsers.add_parser('getr')
parser_getr.add_argument('key', type=str)
args = parser.parse_args()
if args.cmd in ['query_servers', 'lock_get', 'lock_release']:
while True:
response = common.send_receive_range(args.viewleader,
common2.VIEWLEADER_LOW,
common2.VIEWLEADER_HIGH,
vars(args))
if response.get("status") == "retry":
print "Waiting on lock %s..." % args.lockid
time.sleep(5)
continue
else:
break
print response
elif args.cmd == 'setr':
key, val = args.key, args.val
print 'Trying to setr %s to %s' % (key, val)
msg = common.send_receive_range(args.viewleader,
common2.VIEWLEADER_LOW,
common2.VIEWLEADER_HIGH,
{'cmd': 'query_servers'})
# results are lockid, hash
hashes = [(entry[1], entry[0])
for entry in msg['result']]
# list just the lockids
svrs = [svr[1] for svr in find_svrs(key, hashes)]
ress = [setr_request(svr, key) for svr in svrs]
# we iterate through ress twice, which is a bit inefficient, but ress
# should only be around 3 items, so it should be ok.
if 'no' in [res['reply'] for res in ress]:
res = setr_deny(svrs, key)
print 'key already being set'
elif any([res['epoch'] != msg['epoch'] for res in ress]):
res = setr_deny(svrs, key)
print 'server with wrong epoch'
else:
res = setr_accept(svrs, key, val)
return res
elif args.cmd == 'getr':
key = args.key
msg = common.send_receive_range(args.viewleader,
common2.VIEWLEADER_LOW,
common2.VIEWLEADER_HIGH,
{'cmd': 'query_servers'})
# results are lockid, hash
hashes = [(entry[1], entry[0]) for entry in msg['result']]
# list the lockids
svrs = [svr[1] for svr in find_svrs(key, hashes)]
for svr in svrs:
host, port = svr.split(':')
new_msg = {'cmd': 'get', 'key': key}
try:
print common.send_receive(host, port, new_msg)
break
except socket.Timeouterror:
pass
print {'status': 'unable to retrieve %s from any server' % key}
else:
response = common.send_receive_range(args.server, common2.SERVER_LOW,
common2.SERVER_HIGH, vars(args))
print response
if __name__ == "__main__":
main()