-
Notifications
You must be signed in to change notification settings - Fork 14
/
splitter.py
executable file
·182 lines (161 loc) · 7.43 KB
/
splitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python
import pymongo
import logging
import time
"""
Chunk splitter that tries to balance the number of objects per chunk rather
than their size.
MongoDB balancer splits chunks based on the total size of each chunk.
Sometimes this is not what you need and you would rather balance your chunks
depending on the number of objects they contain, not their bytesize.
This script goes through each shard of your cluster, counts the number
of items in each chunk (which is quite expensive operation when you
have a large number of chunks) and splits each chunk. It repeats this
until all your chunks have less than a given number of objects in them.
"""
logging.basicConfig(format="%(asctime)-15s %(message)s", level=logging.INFO)
class Shard():
""" Represents a single shard of your cluster """
def __init__(self, database, collections, mongodata):
self.database = database
self.collections = collections
self.name = mongodata['_id']
self.host = mongodata['host']
if '/' in self.host:
# replicaset
hosts = self.host.split('/')[1]
self.connection = pymongo.Connection(hosts.split(','))
else:
self.connection = pymongo.Connection(self.host)
self.count = {}
self.chunks = {}
# count the number of objects this shard has in each collection
for collection in self.collections:
self.count[collection] = \
self.connection[self.database][collection].count()
# the chunks array will get populated by Cluster.update()
# it's ugly, I know
self.chunks[collection] = []
def refresh_chunk_count(self, collection):
""" Refresh per-chunk object count. Pretty expensive operation. """
for chunk in self.chunks[collection]:
shard_key = self.collections[collection]
count = self.connection[self.database][collection].find({
shard_key: {
"$gte": chunk['min'][shard_key],
"$lt": chunk['max'][shard_key],
}}).count()
chunk['count'] = count
class Cluster():
def __init__(self, host, database, collections):
self.connection = pymongo.Connection(host)
self.database = database
self.collections = collections
self.update()
def update(self):
""" Update info about the cluster - all shards and chunks it contains.
"""
logging.info("Updating cluster info")
self.shards = {}
for shard in self.connection['config']['shards'].find():
self.shards[shard['_id']] = Shard(self.database,
self.collections,
shard)
for collection in self.collections:
for chunk in self.connection['config']['chunks'].find(
{"ns": "{}.{}".format(self.database, collection)}):
self.shards[chunk['shard']].chunks[collection].append(chunk)
def get_locks(self):
return [lock for lock in
self.connection['config']['locks'].find({"state": 2})]
def balancer_stopped(self):
return self.connection['config']['settings'].find({
"_id": "balancer"
})[0]["stopped"]
def stop_balancer(self):
""" Stop the balancer and wait for locks to be released.
Give up after 10 minutes """
logging.info("Stopping balancer")
self.connection['config']['settings'].update(
{"_id":"balancer"}, {"$set": { "stopped": True }})
if not self.balancer_stopped():
raise Exception("Could not stop balancer")
logging.info("Waiting for locks")
retries = 0
while len(self.get_locks()) and retries < 120:
logging.info("Waiting for locks to be released: %s" % retries)
time.sleep(5)
retries += 1
if len(self.get_locks()):
self.start_balancer()
raise Exception("Could not wait for locks, aborting")
def start_balancer(self):
logging.info("Starting balancer")
retries = 0
while self.balancer_stopped() and retries < 5:
self.connection['config']['settings'].update(
{"_id":"balancer"}, {"$set": { "stopped": False }})
if self.balancer_stopped():
raise Exception("Could not start balancer")
def split_chunks(self, collection, max_count):
logging.info("Starting recursive split")
shard_key = self.collections[collection]
need_to_split = True
while need_to_split:
need_to_split = False
for shardname, shard in self.shards.iteritems():
if len(shard.chunks[collection]):
perchunk = shard.count[collection] / len(shard.chunks[collection])
else:
perchunk = 0
logging.info("Shard {} has {} chunks in collection {},"
" {} objects, {} objects/chunk".format(
shard.name,
len(shard.chunks[collection]),
collection,
shard.count[collection],
perchunk
))
shard.refresh_chunk_count(collection)
for chunk in shard.chunks[collection]:
if chunk['count'] > max_count:
logging.info("- Chunk {} has {} objects,"
" need to split...".format(
chunk['_id'], chunk['count']))
try:
split = self.connection['admin'].command(
'split',
'{}.{}'.format(self.database, collection),
find={shard_key: chunk['min'][shard_key]}
)
if 'ok' in split:
# If we succeed in splitting at least one chunk
# we set need_to_split to True again, forcing
# the whole process from scratch. A smarter
# way would be to keep recursively splitting a
# single chunk until it's broken down into
# smaller ones, but this works fine as well.
need_to_split = True
logging.info("Split OK")
else:
logging.error(split)
except pymongo.errors.OperationFailure as e:
# gotta keep splitting
logging.error(e)
if need_to_split:
self.update()
if __name__ == '__main__':
logging.info("Starting")
cluster = Cluster('localhost:27017', # your mongos connection
'database5', # database name
{ # dictionary of 'collection': 'sharding key'
'cities': 'country_id',
'cars': 'city_id',
})
cluster.stop_balancer()
try:
cluster.split_chunks('cities', 600) # maximum number of objects
cluster.split_chunks('cars', 15000) # per chunk we'd like to get
finally:
cluster.start_balancer()
logging.info("All done")