This repository has been archived by the owner on Apr 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 455
/
ddqn.py
122 lines (103 loc) · 4.35 KB
/
ddqn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# -*- coding: utf-8 -*-
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from keras import backend as K
import tensorflow as tf
EPISODES = 5000
class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=2000)
self.gamma = 0.95 # discount rate
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.99
self.learning_rate = 0.001
self.model = self._build_model()
self.target_model = self._build_model()
self.update_target_model()
"""Huber loss for Q Learning
References: https://en.wikipedia.org/wiki/Huber_loss
https://www.tensorflow.org/api_docs/python/tf/losses/huber_loss
"""
def _huber_loss(self, y_true, y_pred, clip_delta=1.0):
error = y_true - y_pred
cond = K.abs(error) <= clip_delta
squared_loss = 0.5 * K.square(error)
quadratic_loss = 0.5 * K.square(clip_delta) + clip_delta * (K.abs(error) - clip_delta)
return K.mean(tf.where(cond, squared_loss, quadratic_loss))
def _build_model(self):
# Neural Net for Deep-Q learning Model
model = Sequential()
model.add(Dense(24, input_dim=self.state_size, activation='relu'))
model.add(Dense(24, activation='relu'))
model.add(Dense(self.action_size, activation='linear'))
model.compile(loss=self._huber_loss,
optimizer=Adam(lr=self.learning_rate))
return model
def update_target_model(self):
# copy weights from model to target_model
self.target_model.set_weights(self.model.get_weights())
def memorize(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model.predict(state)
return np.argmax(act_values[0]) # returns action
def replay(self, batch_size):
minibatch = random.sample(self.memory, batch_size)
for state, action, reward, next_state, done in minibatch:
target = self.model.predict(state)
if done:
target[0][action] = reward
else:
# a = self.model.predict(next_state)[0]
t = self.target_model.predict(next_state)[0]
target[0][action] = reward + self.gamma * np.amax(t)
# target[0][action] = reward + self.gamma * t[np.argmax(a)]
self.model.fit(state, target, epochs=1, verbose=0)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def load(self, name):
self.model.load_weights(name)
def save(self, name):
self.model.save_weights(name)
if __name__ == "__main__":
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)
# agent.load("./save/cartpole-ddqn.h5")
done = False
batch_size = 32
for e in range(EPISODES):
state = env.reset()
state = np.reshape(state, [1, state_size])
for time in range(500):
# env.render()
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
#reward = reward if not done else -10
x,x_dot,theta,theta_dot = next_state
r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
reward = r1 + r2
next_state = np.reshape(next_state, [1, state_size])
agent.memorize(state, action, reward, next_state, done)
state = next_state
if done:
agent.update_target_model()
print("episode: {}/{}, score: {}, e: {:.2}"
.format(e, EPISODES, time, agent.epsilon))
break
if len(agent.memory) > batch_size:
agent.replay(batch_size)
# if e % 10 == 0:
# agent.save("./save/cartpole-ddqn.h5")