-
Notifications
You must be signed in to change notification settings - Fork 1
/
EnvWrapper.py
153 lines (126 loc) · 5.81 KB
/
EnvWrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import numpy as np
from highway.dqn.agent import Agent as HighwayAgent
from highway.model.model import Highway
from mod_gym import gym
from mod_stable_baselines3.stable_baselines3 import PPO
from mod_stable_baselines3.stable_baselines3.common.policies import ActorCriticPolicy
class Wrapper():
def __init__(self, env_identifier):
self.env_iden = env_identifier
self.env = None
self.initial_state = None
self.model = None
self.seed_policy = None
def create_seed_policy(self, load_path):
model = None
if self.env_iden == "bipedal" or self.env_iden == "bipedal-hc":
model = PPO.load(load_path, env=self.env)
elif self.env_iden == "lunar":
model = PPO.load(load_path, env=self.env)
self.seed_policy = model
def create_bipedal_environment(self, seed, hardcore=False):
if hardcore:
env = gym.make('BipedalWalkerHardcore-v3')
else:
env = gym.make('BipedalWalker-v3')
env.seed(seed)
self.env = env
def create_bipedal_model(self, load_path, r_seed):
ppo = PPO(env=self.env, seed=r_seed, policy=ActorCriticPolicy)
model = ppo.load(load_path, env=self.env)
self.model = model
def create_lunar_model(self, load_path, r_seed):
ppo = PPO(env=self.env, seed=r_seed, policy=ActorCriticPolicy)
model = ppo.load(load_path, env=self.env)
self.model = model
def create_lunar_environment(self, seed):
env = gym.make('LunarLander-v2')
env.seed(seed)
self.env = env
self.action_space = range(env.action_space.n) # Discrete(4)
def create_highway_model(self, load_path, r_seed):
ag = HighwayAgent(self.env, r_seed, n_episodes=10000, l_episodes=300, checkpoint_name='Unnamed', eps_start=1.0, eps_end=0.0001,
eps_decay=0.999, learning_count=0)
ag.load(load_path, None) # second parameter is useless here
self.model = ag
def create_highway_environment(self, rng):
# environment parameters
num_lines = 2
length_lines = 100
ratios = [0.02, 0.1]
env = Highway(num_lines=num_lines, length_lines=length_lines, rng=rng, mode='line_ratio', ratios=ratios, input_stripe=True)
self.env = env
self.action_space = env.action_space
def create_environment(self, env_seed=None):
if self.env_iden == "lunar":
self.create_lunar_environment(env_seed)
elif self.env_iden == "bipedal":
self.create_bipedal_environment(env_seed)
elif self.env_iden == "bipedal-hc":
self.create_bipedal_environment(env_seed, hardcore=True)
elif self.env_iden == "highway":
rng = np.random.default_rng(env_seed)
self.create_highway_environment(rng)
def create_model(self, load_path, r_seed=None):
if self.env_iden == "lunar":
self.create_lunar_model(load_path, r_seed)
elif self.env_iden == "bipedal" or self.env_iden == "bipedal-hc":
self.create_bipedal_model(load_path, r_seed)
elif self.env_iden == "highway":
self.create_highway_model(load_path, r_seed)
def get_state(self):
if self.env_iden == "lunar" or self.env_iden == "bipedal" or self.env_iden == "bipedal-hc":
nn_state, hi_lvl_state, rand_state = self.env.get_state()
elif self.env_iden == "highway":
# in highway, rand_state is default_rng
nn_state, street, rand_state = self.env.get_state(one_hot=True, linearize=True, window=True, distance=True)
hi_lvl_state = [street, nn_state[-1]]
return nn_state, hi_lvl_state, rand_state
def set_state(self, hi_lvl_state, rand_state=None):
if self.env_iden == "lunar" or self.env_iden == "bipedal" or self.env_iden == "bipedal-hc":
self.env.reset(hi_lvl_state=hi_lvl_state, rand_state=rand_state)
elif self.env_iden == "highway":
# in highway, rand_state is default_rng
self.env.set_state(hi_lvl_state, rand_state)
def model_step(self, state, deterministic=True):
act = None
if self.env_iden == "lunar" or self.env_iden == "bipedal" or self.env_iden == "bipedal-hc":
act, _ = self.model.predict(state, deterministic=deterministic)
elif self.env_iden == "highway":
act = self.model.act(state)
return act
def env_step(self, action):
reward, next_state, done = None, None, None
if self.env_iden == "lunar" or self.env_iden == "bipedal" or self.env_iden == "bipedal-hc":
next_state, reward, done, info = self.env.step(action)
elif self.env_iden == "highway":
reward, next_state, done = self.env.step(action)
return reward, next_state, done
def play(self, init_state):
next_state = init_state
full_play = []
all_rews = []
while True:
act = self.model_step(next_state)
reward, next_state, done = self.env_step(act)
all_rews.append(reward)
full_play.append(act)
if done:
# walker fell before reaching end, lander crashed, car crashed
if -100 in all_rews:
final_rew = 0
# walker reached end, lander didnt crash, car didnt crash
else:
final_rew = 100
return final_rew, full_play, all_rews
def eval(self, eval_budget=100):
tot_rew = 0
for _ in range(eval_budget):
self.env.reset()
next_state, _, _ = self.get_state()
done = False
while not done:
act = self.model_step(next_state)
reward, next_state, done = self.env_step(act)
tot_rew += reward
print(tot_rew / eval_budget)