-
Notifications
You must be signed in to change notification settings - Fork 1
/
parse.py
125 lines (105 loc) · 4.9 KB
/
parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import copy
import json
import os
import os.path
from tqdm import tqdm
import zstd
import hlt
import config
ARBITRARY_ID = -1
def parse_replay_file(file_name):
#print("Load Replay: " + file_name)
player_name = get_winner_name(file_name).split(" ")[0]
with open(file_name, 'rb') as f:
data = json.loads(zstd.loads(f.read()))
#print("Load Basic Information")
player = [p for p in data['players'] if p['name'].split(" ")[0] == player_name][0]
player_id = int(player['player_id'])
my_shipyard = hlt.Shipyard(player_id, ARBITRARY_ID,
hlt.Position(player['factory_location']['x'], player['factory_location']['y']))
other_shipyards = [
hlt.Shipyard(p['player_id'], ARBITRARY_ID, hlt.Position(p['factory_location']['x'], p['factory_location']['y']))
for p in data['players'] if int(p['player_id']) != player_id]
width = data['production_map']['width']
height = data['production_map']['height']
#print("Load Cell Information")
first_cells = []
for x in range(len(data['production_map']['grid'])):
row = []
for y in range(len(data['production_map']['grid'][x])):
row += [hlt.MapCell(hlt.Position(x, y), data['production_map']['grid'][x][y]['energy'])]
first_cells.append(row)
frames = []
for f in data['full_frames']:
prev_cells = first_cells if len(frames) == 0 else frames[-1]._cells
new_cells = copy.deepcopy(prev_cells)
for c in f['cells']:
new_cells[c['y']][c['x']].halite_amount = c['production']
frames.append(hlt.GameMap(new_cells, width, height))
#print("Load Player Ships")
moves = [{} if str(player_id) not in f['moves'] else {m['id']: m['direction'] for m in f['moves'][str(player_id)] if
m['type'] == "m"} for f in data['full_frames']]
ships = [{} if str(player_id) not in f['entities'] else {
int(sid): hlt.Ship(player_id, int(sid), hlt.Position(ship['x'], ship['y']), ship['energy']) for sid, ship in
f['entities'][str(player_id)].items()} for f in data['full_frames']]
#print("Load Other Player Ships")
other_ships = [
{int(sid): hlt.Ship(int(pid), int(sid), hlt.Position(ship['x'], ship['y']), ship['energy']) for pid, p in
f['entities'].items() if
int(pid) != player_id for sid, ship in p.items()} for f in data['full_frames']]
#print("Load Droppoff Information")
first_my_dropoffs = [my_shipyard]
first_them_dropoffs = other_shipyards
my_dropoffs = []
them_dropoffs = []
for f in data['full_frames']:
new_my_dropoffs = copy.deepcopy(first_my_dropoffs if len(my_dropoffs) == 0 else my_dropoffs[-1])
new_them_dropoffs = copy.deepcopy(first_them_dropoffs if len(them_dropoffs) == 0 else them_dropoffs[-1])
for e in f['events']:
if e['type'] == 'construct':
if int(e['owner_id']) == player_id:
new_my_dropoffs.append(
hlt.Dropoff(player_id, ARBITRARY_ID, hlt.Position(e['location']['x'], e['location']['y'])))
else:
new_them_dropoffs.append(
hlt.Dropoff(e['owner_id'], ARBITRARY_ID, hlt.Position(e['location']['x'], e['location']['y'])))
my_dropoffs.append(new_my_dropoffs)
them_dropoffs.append(new_them_dropoffs)
return list(zip(frames, moves, ships, other_ships, my_dropoffs, them_dropoffs))
def process_f(folder_name,file_name):
parsed_data = parse_replay_file(os.path.join(folder_name, file_name))
return parsed_data
from multiprocessing import Pool
def parse_replay_folder(folder_name, max_files=None):
print("Parsing folder")
replay_buffer = []
result_list = []
pool = Pool(config.CORES)
for file_name in sorted(os.listdir(folder_name)):
if not file_name.endswith(".hlt"):
continue
elif max_files is not None and len(replay_buffer) >= max_files:
break
else:
result_list.append(pool.apply_async(process_f, args=(folder_name,file_name)))
for p in tqdm(result_list):
o = p.get()
if o:
replay_buffer.append(o)
print("Replay Length: {}".format(len(replay_buffer)))
print("Finished Parsing")
return replay_buffer
def get_winner_name(file_path):
#print("Loading:" + file_path)
with open(file_path, "rb") as f:
data = json.loads(zstd.loads(f.read()))
total_turns = data["game_statistics"]["number_turns"]
players = [{"id":item["player_id"],"rank":item["rank"]} for item in data["game_statistics"]["player_statistics"]]
for rank in players:
for player_object in data["players"]:
if player_object["player_id"] == rank["id"]:
rank = {**player_object, **rank}
if rank["rank"] == 1:
winner = rank
#print("winer:", winner["name"])
return winner["name"]