-
Notifications
You must be signed in to change notification settings - Fork 0
/
finder_gym.py
271 lines (213 loc) · 9.95 KB
/
finder_gym.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
import gym
from gym import spaces
import numpy as np
import pygame
from map_tester import *
fog_dist = 7
class nodeFindEnv(gym.Env):
def __init__(self):
super(nodeFindEnv, self).__init__()
# self.world = np.array(world) # world represented as a 2D numpy array
# # self.start_pos = np.where(self.world == 'S') # Starting position
# # self.goal_pos = np.where(self.world == 'G') # Goal position
# self.start_pos = start_pos
# self.current_pos = start_pos #starting position is current posiiton of agent
self.num_rows, self.num_cols = 20, 20
# row*col number of actions
self.action_space = spaces.Discrete(self.num_rows*self.num_cols)
# Observation space is grid of size:rows x columns
#self.observation_space = spaces.Tuple((spaces.Discrete(self.num_rows), spaces.Discrete(self.num_cols)))
self.observation_space = spaces.Box(
np.ones((1, 20, 20)) * -1,
np.ones((1, 20, 20)),
(1, 20, 20),
dtype=np.float32,
)
prob = np.random.randint(15,45)*0.01
size = 20
world = -(np.random.rand(int(size), int(size)) < prob).astype(int) # -1 obstacle,0 nothing, >0 agent id
#for PRIMAL1 map
# world = random_generator(SIZE_O=self.SIZE, PROB_O=self.PROB)
world = padding_world(world)
# for index, x in np.ndenumerate(world):
# if x == 0:
# result, self.world = ray_cast(world, index, 7, True)
# break
self.density_square = [[0]*self.num_rows]*self.num_cols
while True:
x = np.random.randint(0, self.num_rows)
y = np.random.randint(0, self.num_cols)
if(world[x][y] == 0):
result, self.world, visible_nodes = ray_cast(world, (x,y), fog_dist, True)
break
self.iteration = 0
# Initialize Pygame
pygame.init()
self.cell_size = 30
# setting display size
self.screen = pygame.display.set_mode((self.num_cols * self.cell_size, self.num_rows * self.cell_size))
def reset(self):
# prob = np.random.triangular(self.PROB[0], .33 * self.PROB[0] + .66 * self.PROB[1],
# self.PROB[1]) # sample a value from triangular distribution
# size = np.random.choice([self.SIZE[0], self.SIZE[0] * .5 + self.SIZE[1] * .5, self.SIZE[1]],
# p=[.5, .25, .25]) # sample a value according to the given probability
prob = np.random.rand()*0.3
size = 20
# prob = self.PROB
# size = self.SIZE # fixed world0 size and obstacle density for evaluation
# here is the map without any agents nor goals
world = -(np.random.rand(int(size), int(size)) < prob).astype(int) # -1 obstacle,0 nothing, >0 agent id
#for PRIMAL1 map
# world = random_generator(SIZE_O=self.SIZE, PROB_O=self.PROB)
world = padding_world(world)
# for index, x in np.ndenumerate(world):
# if x == 0:
# result, self.world = ray_cast(world, index, 7, True)
# break
self.density_square = [[0]*self.num_rows]*self.num_cols
while True:
x = np.random.randint(0, self.num_rows)
y = np.random.randint(0, self.num_cols)
if(world[x][y] == 0):
result, self.world, visible_nodes = ray_cast(world, (x,y), fog_dist, True)
break
self.iteration = 0
pygame.display.update()
return np.expand_dims(self.world,0), {}
def step(self, action):
#choose new position based on the selected action
new_pos = [0,0]
# new_pos[0] = action % self.num_rows
# new_pos[1] = action // self.num_rows
new_pos[1] = action % self.num_rows
new_pos[0] = action // self.num_rows
green_count = 0
empty_space = 0
node_space = 0
for index, x in np.ndenumerate(self.world):
if x == 2:
green_count -= 1
#check if new node is in valid location
if(self.world[new_pos[0]][new_pos[1]] == 2):
result = 1
_, self.world, visible_nodes_count = ray_cast(self.world,new_pos, fog_dist, False)
elif(self.world[new_pos[0]][new_pos[1]] == -1):
result = -1
elif(self.world[new_pos[0]][new_pos[1]] == 1):
result = -2
elif(self.world[new_pos[0]][new_pos[1]] == 0):
result = -3
# # Check if the new position is valid
# if self._is_valid_position(new_pos):
# self.current_pos = new_pos
#TODO:
#increase penalty for invalid moves
#increase reward for completed view
#reduce fog distance
#reward based on node connectivity
# Reward function
#node is placed on obstacle
if result == -1:
reward = -8.0
#node placed on another node
elif result == -2:
reward = -8.0
#node is not connected
elif result == -3:
reward = -8.0
#node placed successfully
else:
#reward based on increasing map coverage and node-space density ratio
for index, x in np.ndenumerate(self.world):
if x == 2:
#green_count starts at negative index (see above)
#only counts the new green nodes
green_count += 1
if x == 2 or x == 0:
empty_space += 1
elif x == 1:
node_space += 1
#account for new node taking up 1 green space
green_count += 1
reward = green_count*0.5 - 7
# density = node_space / (node_space + empty_space)
# if reward > 0:
# reward = reward * (1-density)
# else:
# reward = reward * density
#discourage nodes from being placed next to each other
x, y = new_pos
node_proximity_penalty = 3
nearby_node_count = 0
if (x > 0 and self.world[x-1][y] == 1):
reward -= node_proximity_penalty
nearby_node_count += 1 + self.density_square[x-1][y]
if (x < len(self.world)-1 and self.world[x+1][y] == 1):
reward -= node_proximity_penalty
nearby_node_count += 1 + self.density_square[x+1][y]
if (y > 0 and self.world[x][y-1] == 1):
reward -= node_proximity_penalty
nearby_node_count += 1 + self.density_square[x][y-1]
if (y < len(self.world[0])-1 and self.world[x][y+1] == 1):
reward -= node_proximity_penalty
nearby_node_count += 1 + self.density_square[x][y+1]
if (x > 0 and y > 0 and self.world[x-1][y-1] == 1):
reward -= node_proximity_penalty
nearby_node_count += 1 + self.density_square[x-1][y-1]
if (x < len(self.world)-1 and y > 0 and self.world[x+1][y-1] == 1):
reward -= node_proximity_penalty
nearby_node_count += 1 + self.density_square[x-1][y-1]
if (x > 0 and y < len(self.world[0])-1 and self.world[x-1][y+1] == 1):
reward -= node_proximity_penalty
nearby_node_count += 1 + self.density_square[x-1][y+1]
if (x < len(self.world)-1 and y < len(self.world[0])-1 and self.world[x+1][y+1] == 1):
reward -= node_proximity_penalty
nearby_node_count += 1 + self.density_square[x+1][y+1]
self.density_square[x][y] = nearby_node_count
#encourage nodes from having line of sight with one another
reward += (visible_nodes_count - nearby_node_count)*1.2
#print("result = ", result)
if 0 in self.world:
done = False
else:
done = True
reward += 150
self.current_pos = new_pos
self.iteration += 1
if(self.iteration > 250):
done = True
#clip reward
if reward < -2000:
reward = -2000
return np.expand_dims(self.world,0), reward, done,False, {'isValid': result}
def _is_valid_position(self, pos):
row, col = pos
# If agent goes out of the grid
if row < 0 or col < 0 or row >= self.num_rows or col >= self.num_cols:
return False
# If the agent hits an obstacle
if self.world[row, col] == -1:
return False
return True
def render(self):
# Clear the screen
self.screen.fill((255, 255, 255))
#print(self.world)
# Draw env elements one cell at a time
for row in range(self.num_rows):
for col in range(self.num_cols):
cell_left = col * self.cell_size
cell_top = row * self.cell_size
# try:
# print(np.array(self.current_pos)==np.array([row,col]).reshape(-1,1))
# except Exception as e:
# print('Initial state')
if self.world[row, col] == -1: # Obstacle
pygame.draw.rect(self.screen, (0, 0, 0), (cell_left, cell_top, self.cell_size, self.cell_size))
elif self.world[row, col] == 1: # node
pygame.draw.rect(self.screen, (0, 0, 255), (cell_left, cell_top, self.cell_size, self.cell_size))
elif self.world[row, col] == 2: # visible range
pygame.draw.rect(self.screen, (0, 255, 0), (cell_left, cell_top, self.cell_size, self.cell_size))
# if np.array_equal(np.array(self.current_pos), np.array([row, col]).reshape(-1,1)): # Agent position
# pygame.draw.rect(self.screen, (0, 0, 255), (cell_left, cell_top, self.cell_size, self.cell_size))
pygame.display.update() # Update the display