-
Notifications
You must be signed in to change notification settings - Fork 8
/
myPyNN.py
399 lines (318 loc) · 13.7 KB
/
myPyNN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
import numpy as np
DEBUG = 0
class MyPyNN(object):
def __init__(self, layers=[3, 4, 2]):
self.layers = layers
# Network
self.weights = [np.random.randn(x+1, y)
for x, y in zip(self.layers[:-1], self.layers[1:])]
# For mean-centering
self.meanX = np.zeros((1, self.layers[0]))
# Default options
self.learningRate = 1.0
self.regLambda = 0
self.adaptLearningRate = False
self.normalizeInputs = False
self.meanCentering = False
self.visible = False
def predict(self, X, visible=False):
self.visible = visible
# mean-centering
inputs = self.preprocessTestingInputs(X) - self.meanX
if inputs.ndim!=1 and inputs.ndim!=2:
print "X is not one or two dimensional, please check."
return
if DEBUG or self.visible:
print "PREDICT:"
print inputs
for l, w in enumerate(self.weights):
inputs = self.addBiasTerms(inputs)
inputs = self.sigmoid(np.dot(inputs, w))
if DEBUG or self.visible:
print "Layer "+str(l+1)
print inputs
return inputs
def trainUsingMinibatchGD(self, X, y, nEpochs=1000, minibatchSize=100,
learningRate=0.05, regLambda=0, adaptLearningRate=False,
normalizeInputs=False, meanCentering=False,
printTestAccuracy=False, testX=None, testY=None,
visible=False):
self.learningRate = float(learningRate)
self.regLambda = regLambda
self.adaptLearningRate = adaptLearningRate
self.normalizeInputs = normalizeInputs
self.meanCentering = meanCentering
self.visible = visible
X = self.preprocessTrainingInputs(X)
y = self.preprocessOutputs(y)
yPred = self.predict(X, visible=self.visible)
if yPred.shape != y.shape:
print "Shape of y ("+str(y.shape)+") does not match what shape of y is supposed to be: "+str(yPred.shape)
return
self.trainAccuracy = (np.sum([np.argmax(yPred[k])==np.argmax(y[k])
for k in range(len(y))])).astype(float)/len(y)
print "train accuracy = " + str(self.trainAccuracy)
self.prevCost = 0.5*np.sum((yPred-y)**2)/len(y)
print "cost = " + str(self.prevCost)
self.cost = self.prevCost
# mean-centering
if self.meanCentering:
X = X - self.meanX
else:
X = X
self.inputs = X
if DEBUG or self.visible:
print "train input:"+str(inputs)
# Just to ensure minibatchSize !> len(X)
if minibatchSize > len(X):
minibatchSize = int(len(X)/10)+1
# Test data
if printTestAccuracy:
if testX==None and testY==None:
print "No test data given"
testX = np.zeros((1, len(X)))
testY = np.zeros((1,1))
elif testX==None or testY==None:
print "One of testData not available"
return
else:
testX = self.preprocessTrainingInputs(testX)
testY = self.preprocessOutputs(testY)
if len(testX)!=len(testY):
print "Test Datas not of same length"
return
yTestPred = self.predict(testX, visible=self.visible)
self.testAccuracy = np.sum([np.argmax(yTestPred[k])==np.argmax(testY[k])
for k in range(len(testY))])/float(len(testY))
print "test accuracy = " + str(self.testAccuracy)
# Randomly initialize old weights (for adaptive learning), will copy values later
if adaptLearningRate:
self.oldWeights = [np.random.randn(i+1, j)
for i, j in zip(self.layers[:-1], self.layers[1:])]
# For each epoch
for i in range(nEpochs):
print "Epoch "+str(i)+" of "+str(nEpochs)
## Find minibatches
# Generate list of indices of full training data
fullIdx = list(range(len(X)))
# Shuffle the list
np.random.shuffle(fullIdx)
# Make list of mininbatches
minibatches = [fullIdx[k:k+minibatchSize]
for k in xrange(0, len(X), minibatchSize)]
# For each minibatch
for mininbatch in mininbatches:
# Find X and y for each minibatch
miniX = X[idx]
miniY = y[idx]
# Forward propagate through miniX
a = self.forwardProp(miniX)
# Check if Forward Propagation was successful
if a==False:
return
# Save old weights before backProp in case of adaptLR
if adaptLearningRate:
for i in range(len(self.weights)):
self.oldWeights[i] = np.array(self.weights[i])
# Back propagate, update weights for minibatch
self.backPropGradDescent(miniX, miniY)
yPred = self.predict(X, visible=self.visible)
self.trainAccuracy = (np.sum([np.argmax(yPred[k])==np.argmax(y[k])
for k in range(len(y))])).astype(float)/len(y)
print "train accuracy = " + str(self.trainAccuracy)
if printTestAccuracy:
yTestPred = self.predict(testX, visible=self.visible)
self.testAccuracy = (np.sum([np.argmax(yTestPred[k])==np.argmax(testY[k])
for k in range(len(testY))])).astype(float)/len(testY)
print "test accuracy = " + str(self.testAccuracy)
self.cost = 0.5*np.sum((yPred-y)**2)/len(y)
print "cost = " + str(self.cost)
if adaptLearningRate:
self.adaptLR()
self.evaluate(X, y)
self.prevCost = self.cost
def forwardProp(self, inputs):
inputs = self.preprocessInputs(inputs)
print "Forward..."
if inputs.ndim!=1 and inputs.ndim!=2:
print "Input argument " + str(inputs.ndim) + \
"is not one or two dimensional, please check."
return False
if (inputs.ndim==1 and len(inputs)!=self.layers[0]) or \
(inputs.ndim==2 and inputs.shape[1]!=self.layers[0]):
print "Input argument does not match input dimensions (" + \
str(self.layers[0]) + ") of network."
return False
if DEBUG or self.visible:
print inputs
# Save the outputs of each layer
self.outputs = []
# For each layer
for l, w in enumerate(self.weights):
# Add bias term to the input
inputs = self.addBiasTerms(inputs)
# Calculate the output
self.outputs.append(self.sigmoid(np.dot(inputs, w)))
# Set this as the input to the next layer
inputs = np.array(self.outputs[-1])
if DEBUG or self.visible:
print "Layer "+str(l+1)
print "inputs: "+str(inputs)
print "weights: "+str(w)
print "output: "+str(inputs)
del inputs
return True
def backPropGradDescent(self, X, y):
print "...Backward"
# Correct the formats of inputs and outputs
X = self.preprocessInputs(X)
y = self.preprocessOutputs(y)
# Compute first error
bpError = self.outputs[-1] - y
if DEBUG or self.visible:
print "error = self.outputs[-1] - y:"
print error
# For each layer in reverse order (last layer to first layer)
for l, w in enumerate(reversed(self.weights)):
if DEBUG or self.visible:
print "LAYER "+str(len(self.weights)-l)
# The calculated output "z" of that layer
predOutputs = self.outputs[-l-1]
if DEBUG or self.visible:
print "predOutputs"
print predOutputs
# delta = error*(z*(1-z)) === nxneurons
delta = np.multiply(error, np.multiply(predOutputs, 1 - predOutputs))
if DEBUG or self.visible:
print "To compute error to be backpropagated:"
print "del = predOutputs*(1 - predOutputs)*error :"
print delta
print "weights:"
print w
# Compute new error to be propagated back (bias term neglected in backpropagation)
bpError = np.dot(delta, w[1:,:].T)
if DEBUG or self.visible:
print "backprop error = np.dot(del, w[1:,:].T) :"
print error
# If we are at first layer, inputs are data points
if l==len(self.weights)-1:
inputs = self.addBiasTerms(X)
# Else, inputs === outputs from previous layer
else:
inputs = self.addBiasTerms(self.outputs[-l-2])
if DEBUG or self.visible:
print "To compute errorTerm:"
print "inputs:"
print inputs
print "del:"
print delta
# errorTerm = (inputs.T).*(delta)/n
# delta === nxneurons, inputs === nxprev, W === prevxneurons
errorTerm = np.dot(inputs.T, delta)/len(y)
if errorTerm.ndim==1:
errorTerm.reshape((len(errorTerm), 1))
if DEBUG or self.visible:
print "errorTerm = np.dot(inputs.T, del) :"
print errorTerm
# regularization term
regWeight = np.zeros(w.shape)
regWeight[1:,:] = self.regLambda #bias term neglected
if DEBUG or self.visible:
print "To update weights:"
print "learningRate*errorTerm:"
print self.learningRate*errorTerm
print "regWeight:"
print regWeight
print "weights:"
print w
print "regTerm = regWeight*w :"
print regWeight*w
# Update weights
self.weights[-l-1] = w - \
(self.learningRate*errorTerm + np.multiply(regWeight,w))
if DEBUG or self.visible:
print "Updated 'weights' = learningRate*errorTerm + regTerm :"
print self.weights[len(self.weights)-l-1]
def adaptLR(self):
if self.cost > self.prevCost:
print "Cost increased!!"
self.learningRate /= 2.0
print " - learningRate halved to: "+str(self.learningRate)
for i in range(len(self.weights)):
self.weights[i] = self.oldWeights[i]
print " - weights reverted back"
# good function
else:
self.learningRate *= 1.05
print " - learningRate increased by 5% to: "+str(self.learningRate)
def preprocessTrainingInputs(self, X):
X = self.preprocessInputs(X)
if self.normalizeInputs and np.max(X) > 1.0:
X = X/255.0
if np.all(self.meanX == np.zeros((1, self.layers[0]))) and self.meanCentering:
self.meanX = np.reshape(np.mean(X, axis=0), (1, X.shape[1]))
return X
def preprocessTestingInputs(self, X):
X = self.preprocessInputs(X)
if self.normalizeInputs and np.max(X) > 1.0:
X = X/255.0
return X
def preprocessInputs(self, X):
X = np.array(X, dtype=float)
# if X is int
if X.ndim==0:
X = np.array([X])
# if X is 1D
if X.ndim==1:
if self.layers[0]==1: #if ndim=1
X = np.reshape(X, (len(X),1))
else: #if X is only 1 nd-ndimensional vector
X = np.reshape(X, (1,len(X)))
return X
def preprocessOutputs(self, Y):
Y = np.array(Y, dtype=float)
# if Y is int
if Y.ndim==0:
Y = np.array([Y])
# if Y is 1D
if Y.ndim==1:
if self.layers[-1]==1:
Y = np.reshape(Y, (len(Y),1))
else:
Y = np.reshape(Y, (1,len(Y)))
return Y
def addBiasTerms(self, X):
if X.ndim==0 or X.ndim==1:
X = np.insert(X, 0, 1)
elif X.ndim==2:
X = np.insert(X, 0, 1, axis=1)
return X
def sigmoid(self, z):
return 1/(1 + np.exp(-z))
def evaluate(self, X, Y):
yPreds = forwardProp(X, self.weights)[-1]
test_results = [(np.argmax(yPreds[i]), np.argmax(Y[i]))
for i in range(len(Y))]
yes = sum(int(x == y) for (x, y) in test_results)
print(str(yes)+" out of "+str(len(Y)))
def loadMNISTData(self, path='/Users/vikram.v/Downloads/mnist.npz'):
# Use numpy.load() to load the .npz file
f = np.load(path)
# To check files stored in .npz file
f.files
# Saving the files
x_train = f['x_train']
y_train = f['y_train']
x_test = f['x_test']
y_test = f['y_test']
f.close()
# Preprocess inputs
x_train_new = np.array([x.flatten() for x in x_train])
y_train_new = np.zeros((len(y_train), 10))
for i in range(len(y_train)):
y_train_new[i][y_train[i]] = 1
x_test_new = np.array([x.flatten() for x in x_test])
y_test_new = np.zeros((len(y_test), 10))
for i in range(len(y_test)):
y_test_new[i][y_test[i]] = 1
return [x_train_new, y_train_new, x_test_new, y_test_new]