diff --git a/fisher_jenks/fj_refactored.py b/fisher_jenks/fj_refactored.py index 17c177b..2ac5b01 100644 --- a/fisher_jenks/fj_refactored.py +++ b/fisher_jenks/fj_refactored.py @@ -1,4 +1,4 @@ -import numpy +import numpy as np import time import multiprocessing import ctypes @@ -6,7 +6,20 @@ #Suppress the divide by zero errors warnings.filterwarnings('ignore', category=RuntimeWarning) -numpy.set_printoptions(linewidth = 200, suppress = True) +np.set_printoptions(linewidth = 200, suppress = True) + +def fj_generate_sample(y, pct=0.10, random=True): + n = y.size + if random: + choicevector = np.arange(n) + ids = np.random.choice(choicevector,n * pct,replace=False) + #ids = np.random.random_integers(0, n - 1, n * pct) + else: + ids = np.arange(int(n*pct)) + yr = y[ids] + yr[-1] = max(y) # make sure we have the upper bound + yr[0] = min(y) # make sure we have the min + return yr def fisher_jenks(values, classes=5, cores=None, sort=True): '''Fisher-Jenks Optimal Partitioning of an ordered array into k classes @@ -46,13 +59,13 @@ class and the last value the start of the last class. def allocate(values, classes): '''This function allocates memory for the variance matrix, error matrix, and pivot matrix. It also moves the variance matrix and error matrix from - numpy types to a ctypes, shared memory array.''' + np types to a ctypes, shared memory array.''' numClass = classes numVal = len(values) varCtypes = multiprocessing.RawArray(ctypes.c_double, numVal*numVal) - varMat = numpy.frombuffer(varCtypes) + varMat = np.frombuffer(varCtypes) varMat.shape = (numVal,numVal) for x in range(0,len(values)): @@ -60,11 +73,11 @@ def allocate(values, classes): varMat[x][0:x] = 0 print varMat errCtypes = multiprocessing.RawArray(ctypes.c_double, classes*numVal) - errorMat = numpy.frombuffer(errCtypes) + errorMat = np.frombuffer(errCtypes) errorMat.shape = (classes, numVal) pivotShape = (classes, numVal) - pivotMat = numpy.ndarray(pivotShape, dtype=numpy.float) + pivotMat = np.ndarray(pivotShape, dtype=np.float) #Initialize the arrays as globals. initArr(varMat, errorMat) @@ -88,8 +101,8 @@ def fj(sharedVar,i, values, start): '''This function facilitates passing multiple rows to each process and then performing multiple vector calculations along individual rows.''' arr = sharedVar - arr[i] = numpy.apply_along_axis(calcVar, 1, arr[i], len(values)) - arr[i][numpy.isnan(arr[i])] = 0 + arr[i] = np.apply_along_axis(calcVar, 1, arr[i], len(values)) + arr[i][np.isnan(arr[i])] = 0 def calcVar(arrRow, lenValues): '''This function calculates the diameter matrix. It is called by fj. @@ -98,15 +111,15 @@ def calcVar(arrRow, lenValues): of elements summed for each index.''' lenN = (arrRow != 0).sum() - n = numpy.arange(1, lenN+1) + n = np.arange(1, lenN+1) if lenN != lenValues: n.resize(arrRow.shape[0]) n[arrRow.shape[0]-lenN:] = n[:lenN-arrRow.shape[0]] n[0:arrRow.shape[0]-lenN] = 0 print arrRow - return ((numpy.cumsum(numpy.square(arrRow))) - \ - ((numpy.cumsum(arrRow)*numpy.cumsum(arrRow)) / (n))) + return ((np.cumsum(np.square(arrRow))) - \ + ((np.cumsum(arrRow)*np.cumsum(arrRow)) / (n))) def err(row,y,step, lenrow): '''This function computes the error on a segment of each error row, from the error matrix. @@ -119,7 +132,7 @@ def err(row,y,step, lenrow): stop = lenrow-1 while y <= stop: print sharedVar[:,y+row][row:y+row+1] - sharedErrRow[y] = numpy.amin(sharedErr[row-1][row-1:y+row] + sharedVar[:,y+row][row:y+row+1]) + sharedErrRow[y] = np.amin(sharedErr[row-1][row-1:y+row] + sharedVar[:,y+row][row:y+row+1]) y+=1 if sort: diff --git a/fisher_jenks/test_fj.py b/fisher_jenks/test_fj.py index a7dbaf1..052880e 100644 --- a/fisher_jenks/test_fj.py +++ b/fisher_jenks/test_fj.py @@ -1,24 +1,82 @@ +import math import sys import time -import numpy -from fj_refactored import fisher_jenks - -cores = [1,2,4,16,32] -classes = [5,6,7] -data_sizes = [500, 1000, 2500, 5000, 7500, 10000, 12500, 15000, 17500, 20000, 22500, 25000] - -for c in cores: - for d in data_sizes: - for k in classes: - data = numpy.random.ranf(size=d) - try: +import numpy as np +from fj_refactored import fisher_jenks, fj_generate_sample + + +def testfull(): + """ + Tests the fully enumerated Fisher-Jenks implementation + """ + cores = [1,2,4,16,32] + classes = [5,6,7] + data_sizes = [500, 1000, 2500, 5000, 7500, 10000, 12500, 15000, 17500, 20000, 22500, 25000] + + for c in cores: + for d in data_sizes: + for k in classes: + data = np.random.ranf(size=d) + try: + t1 = time.time() + #wrapped in try since we will blow out RAM at some point + classification = fisher_jenks(data, k, c) + t2 = time.time() + print "Processed {0} data points in {1} classes using {2} cores. Total time: {3}".format(d, k, c, t2-t1) + data = None + except KeyboardInterrupt: + print "Aborting" + sys.exit(1) + except: + print "FAILURE: {0} data points.".format(d) + + +def testsample(): + """ + Tests the sampled Fisher-Jenks implementation + """ + cores = [1,2,4,16,32] + classes = [5,6,7] + data_sizes = [10000, 20000, 40000, 80000, 160000, 320000, 640000, 1280000, + 2560000, 5120000, 10240000, 20480000, 40960000, 81920000, + 163840000, 327680000, 655360000] + for c in cores: + for d in data_sizes: + for k in classes: + #Generate the test data and save to disk + data = np.random.ranf(size=d) + nobs = len(data) + np.save('testfile.npy', data) + data = None + + #Compute the sample size as the sqrt of nobs + sqrt = math.sqrt(nobs) + if sqrt > 40000: + sqrt = 40000 + pct = sqrt / float(d) + + #Load the data back into memory as a mmapped file + f = np.load('testfile.npy', mmap_mode='r+') t1 = time.time() - #wrapped in try since we will blow out RAM at some point - classification = fisher_jenks(data, k, c) + data = fj_generate_sample(f, pct=pct) t2 = time.time() - print "Processed {0} data points in {1} classes using {2} cores. Total time: {3}".format(d, k, c, t2-t1) - except KeyboardInterrupt: - print "Aborting" - sys.exit(1) - except: - print "FAILURE: {0} data points.".format(d) + print "Randomly sampling {0} percent of {1} observations for a run size of {2} observations took {3} seconds.".format(pct, nobs, sqrt, t2 - t1) + try: + t1 = time.time() + #wrapped in try since we will blow out RAM at some point + classification = fisher_jenks(data, k, c) + t2 = time.time() + print "Processed {0} data points in {1} classes using {2} cores. Total time: {3}".format(d, k, c, t2-t1) + except KeyboardInterrupt: + print "Aborting" + sys.exit(1) + except: + print "FAILURE: {0} data points.".format(d) + + +if __name__ =='__main__': + #Test the fully enumerated FJ + testfull() + + #Test FJ using sampling + testsample()