-
Notifications
You must be signed in to change notification settings - Fork 0
/
Monitor.py
162 lines (138 loc) · 5.7 KB
/
Monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
import pandas as pd
import pickle
import SG_Primitives as P
import SG_Utils as utils
import Property
# from properties import phi1, phi2, phi3, phi4, phi5, phi6, phi7, phi8, phi9
from properties import all_properties
from time import time
from PIL import Image
from functools import partial
from pathlib import Path
class Monitor:
def __new__(cls, *args, **kwargs):
if (len(args) > 0 or len(kwargs) > 0) and hasattr(cls,
'monitor_instance'):
del cls.monitor_instance
if not hasattr(cls, 'monitor_instance'):
cls.monitor_instance = super(Monitor, cls).__new__(cls)
cls.monitor_instance.initialize(*args, **kwargs)
return cls.monitor_instance
def initialize(
self, log_path, route_path,
save_sg=False):
# self.properties = [phi1, phi2, phi3, phi4, phi5, phi6, phi7, phi8,
# phi9]
self.properties = all_properties
self.timestep = 0
self.save_sg = save_sg
# Create log directory
self.log_path = Path(log_path)
self.log_path.mkdir(parents=True, exist_ok=True)
# Create route directory
self.route_path = self.log_path / route_path
self.route_path.mkdir(parents=True, exist_ok=True)
# # Create rsv directory
# self.rsv_path = self.route_path / "rsv"
# self.rsv_path.mkdir(parents=True, exist_ok=True)
# # Create image directory
# self.img_path = self.route_path / "images"
# self.img_path.mkdir(parents=True, exist_ok=True)
def add_property(self, property):
self.properties.append(property)
def get_property(self, name):
ret = [prop for prop in self.properties if prop.name == name]
if len(ret) == 0:
return None
return ret[0]
def save_all_relevant_subgraphs(self, sg, sg_name):
save_dir = self.route_path / sg_name
save_dir.mkdir(parents=True, exist_ok=True)
for prop in self.properties:
prop.save_relevant_subgraph(sg, str(save_dir/f'{prop.name}.png'))
def get_all_relevant_subgraphs(self, sg):
return {prop.name: prop.save_relevant_subgraph(sg, None) for prop in self.properties}
def check(self, sg, save_usage_information=False):
# Save sg
if self.save_sg:
self.save_rsv(sg)
# Check property
for i, prop in enumerate(self.properties):
# Update property data
prop.update_data(sg, save_usage_information=save_usage_information)
# Check property
# start = time()
prop_eval, state = prop.check_step(return_state=True)
if save_usage_information:
sg.graph[f'state_{prop.name}'] = state
sg.graph[f'acc_{prop.name}'] = prop_eval
pred_eval = prop.get_last_predicates()
# end = time()
# print(
# f"Property {prop.name} evaluated in {end-start:.2f}",
# f"seconds: {prop_eval}")
self.log(prop_name=prop.name,
pred_eval=pred_eval, prop_eval=prop_eval)
self.timestep += 1
def save_rsv(self, rsv):
# Save pkl file
with open(self.rsv_path / f"{self.timestep}.pkl", 'wb') as f:
pickle.dump(rsv, f)
def save_images(self, images):
for i, image in enumerate(images):
pil_img = Image.fromarray(image)
pil_img.save(self.img_path / f"ts_{self.timestep}_img_{i}.jpg")
def log(self, prop_name, pred_eval, prop_eval):
csv_path = self.route_path / f"{prop_name}.csv"
keys = ["ts"]
keys.extend(list(pred_eval.keys()))
keys.append("prop_eval")
values = [self.timestep]
values.extend(list(pred_eval.values()))
values.append(prop_eval)
if self.timestep > 0:
# Load csv file
df = pd.read_csv(csv_path)
df.loc[len(df)] = values
df.to_csv(csv_path, index=False)
else:
# Create new csv file
df = pd.DataFrame(columns=keys)
df.loc[0] = values
df.to_csv(csv_path, index=False)
def save_final_output(self):
for i, prop in enumerate(self.properties):
save_path = self.route_path / f"violations_{prop.name}.csv"
violation_data = [(start, end if end > 0 else prop.time) for start, end in prop.violations]
df = pd.DataFrame(data=violation_data, columns=['start', 'end'])
df.to_csv(save_path, index=True)
def main():
# # Property 1
# a = partial(
# P.filterByAttr, partial(P.relSet, "Ego", "isIn"),
# "name", "Opposing Lane")
# # a = partial(P.filterByAttr, partial(
# # P.relSet, "Ego", "isIn"), "name", "Ego Lane")
# a = partial(P.gt, partial(P.size, a), 0)
# phi1 = Property("phi1", "G(~a)", [("a", a)])
# # # Property 2
# # a = partial(P.relSet, partial(P.filterByAttr, "G",
# # "name", "Root Lane", edge_type="incoming"))
# # a = partial(P.difference, a, partial(P.relSet, "Ego", "isIn"))
# # a = partial(P.filterByAttr, a, "name", "Right*")
# # a = partial(P.eq, partial(P.size, a), 0)
# # s = partial(getattr, __name="Steering")
# # s = partial(P.gt, s, 0)
# # phi2 = Property("G(a -> s)", [("a", a), ("s", s)])
# m = Monitor(log_path="./monitor_log/")
# m.add_property(phi1)
# # m.add_property(phi2)
# base_path = Path("./rsv_sg_town04_max_car/")
# sg_name_list = sorted(os.listdir(base_path))
# for sg_name in sg_name_list[:10]:
# sg = utils.load_sg(base_path / sg_name)
# m.check(sg)
m = Monitor()
if __name__ == '__main__':
main()