forked from R-O-C-K-E-T/Factorio-SAT
-
Notifications
You must be signed in to change notification settings - Fork 0
/
calculate_optimal.py
248 lines (202 loc) · 9.17 KB
/
calculate_optimal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import json, math, os, asyncio, argparse
import concurrent.futures
import numpy as np
from network import open_network, get_input_output_colours, deduplicate_network
import belt_balancer, blueprint, optimisations
from template import EdgeMode
MAXIMUM_UNDERGROUND_LENGTHS = {
'normal' : 4,
'fast' : 6,
'express' : 8,
}
def factors(value):
for test in reversed(range(1, math.floor(math.sqrt(value)) + 1)):
if value % test == 0:
a = test
b = value // test
yield a, b
if a != b:
yield b, a
def get_offsets(height, input_size, output_size):
if input_size > output_size:
for start_offset, end_offset in get_offsets(height, output_size, input_size):
yield end_offset, start_offset
return
for end_offset in range(height - output_size + 1):
for i in range(output_size - input_size + 1):
yield i + end_offset, end_offset
def solve_balancer(network, size, solver):
maximum_underground_length, width, height = size
network = deduplicate_network(network)
grid = belt_balancer.create_balancer(network, width, height, maximum_underground_length)
grid.block_belts_through_edges((False, True))
grid.prevent_intersection(EdgeMode.NO_WRAP)
belt_balancer.setup_balancer_ends(grid, network, True, False)
optimisations.expand_underground(grid, min_x=1, max_x=grid.width-2)
optimisations.apply_generic_optimisations(grid)
belt_balancer.enforce_edge_splitters(grid, network)
grid.enforce_maximum_underground_length(EdgeMode.NO_WRAP)
solution = grid.solve(solver)
if solution is None:
return None
return solution.tolist()
class NetworkSolutionStore:
def __init__(self, network_path):
self.network = open_network(network_path)
self.network_name = os.path.split(network_path)[1]
self.exist = dict()
self.solutions = dict()
def does_balancer_exist(self, size):
for other_size, exist in self.exist.items():
if exist:
if all(d1 >= d2 for d1, d2 in zip(size, other_size)):
return True
else:
if all(d1 <= d2 for d1, d2 in zip(size, other_size)):
return False
return None
def clean(self):
for size, exist in list(self.exist.items()):
if exist != False:
continue
del self.exist[size]
if self.does_balancer_exist(size) != False:
self.exist[size] = exist
def from_json(self, data):
self.exist = {}
for key, val in data.get('exist', {}).items():
underground_length, width, height = map(int, key.split(','))
self.exist[underground_length, width, height] = val
self.solutions = {}
for key, val in data.get('solutions', {}).items():
underground_length, width, height = map(int, key.split(','))
self.solutions[underground_length, width, height] = val
def to_json(self):
return {
'exist': dict((','.join(map(str, key)), val) for key, val in self.exist.items()),
'solutions': dict((','.join(map(str, key)), val) for key, val in self.solutions.items())
}
def add_solution(self, size, solution):
self.exist[size] = solution is not None
if solution is not None:
self.solutions[size] = solution
def best_current_solution(self, loss, underground_length):
return min(((size, solution) for size, solution in self.solutions.items() if size[0] <= underground_length and loss(size[1:]) != float('inf')), key=lambda v: loss(v[0][1:]), default=[None]*2)[1]
def next_length_size(self, underground_length):
(_, input_count), (_, output_count) = get_input_output_colours(self.network)
height = max(input_count, output_count)
width = 3
while True:
size = underground_length, width, height
existence = self.does_balancer_exist(size)
if existence:
return None
if existence is None:
return size
width += 1
def next_area_size(self, underground_length):
(_, input_count), (_, output_count) = get_input_output_colours(self.network)
min_height = max(input_count, output_count)
area = min_height
while True:
for width, height in factors(area):
if height < min_height or height > 2*min_height:
continue
size = underground_length, width + 2, height
existence = self.does_balancer_exist(size)
if existence:
return None
if existence is None:
return size
area += 1
def get_belt_level(underground_length: int):
for belt_level, length in sorted(MAXIMUM_UNDERGROUND_LENGTHS.items(), key=lambda i: i[1]):
if underground_length <= length:
return belt_level
return belt_level # If none work, then just use the biggest
if __name__ == '__main__':
base_path = 'networks'
result_file = 'optimal_balancers.json'
parser = argparse.ArgumentParser(description='Calculates optimal balancers')
parser.add_argument('mode', choices=['query', 'compute'])
parser.add_argument('underground_length', type=int, help='Maximum underground length')
parser.add_argument('objective', choices=['area', 'length'], help='Optimisation objective')
parser.add_argument('--export-blueprints', action='store_true', help='Return query results as blueprints')
parser.add_argument('--threads', type=int, help='Number of compute threads')
parser.add_argument('--solver', type=str, default='g4', help='Backend SAT solver to use')
args = parser.parse_args()
stores = []
for file in os.listdir(base_path):
stores.append(NetworkSolutionStore(os.path.join(base_path, file)))
try:
with open(result_file) as f:
data = json.load(f)
for store in stores:
item = data.get(store.network_name)
if item is None:
continue
store.from_json(item)
except FileNotFoundError:
pass
def save_progress():
data = {}
for store in stores:
data[store.network_name] = store.to_json()
with open(result_file, 'w') as f:
json.dump(data, f)
if args.mode == 'query':
if args.export_blueprints:
def encode_solution(solution, name):
label = ' to '.join(name.split('x'))
tiles = np.array(solution)
for i, row in enumerate(tiles):
for j, entry in enumerate(row):
tiles[i, j] = blueprint.read_tile(entry)
return blueprint.encode_blueprint(blueprint.make_blueprint(tiles, label, get_belt_level(args.underground_length)))
else:
encode_solution = lambda solution, _: json.dumps(solution)
if args.objective == 'length':
get_next_size = store.next_length_size
elif args.objective == 'area':
get_next_size = store.next_area_size
for store in sorted(stores, key=lambda store: store.network_name):
if get_next_size(args.underground_length) is not None:
continue
if args.objective == 'length':
(_, input_count), (_, output_count) = get_input_output_colours(store.network)
min_height = max(input_count, output_count)
loss = lambda size: size[0] if size[1] == min_height else float('inf')
elif args.objective == 'area':
loss = lambda size: (size[0]-2) * size[1]
else:
assert False
solution = store.best_current_solution(loss, args.underground_length)
if solution is None:
continue
print(encode_solution(solution, store.network_name))
elif args.mode == 'compute':
async def optimise(executor, store):
next_size = None
while True:
if args.objective == 'length':
next_size = store.next_length_size(args.underground_length)
elif args.objective == 'area':
next_size = store.next_area_size(args.underground_length)
else:
assert False
if next_size is None:
break
print(f'{store.network_name}: Start {next_size}')
solution = await loop.run_in_executor(executor, solve_balancer, store.network, next_size, args.solver)
store.add_solution(next_size, solution)
store.clean()
save_progress()
print(f'{store.network_name}: Solution found')
async def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=args.threads) as executor:
tasks = [optimise(executor, store) for store in stores]
await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
else:
assert False