diff --git a/examples/src/main/resources/LogisticRegression.cu b/examples/src/main/resources/LogisticRegression.cu new file mode 100644 index 0000000..6dc2cba --- /dev/null +++ b/examples/src/main/resources/LogisticRegression.cu @@ -0,0 +1,179 @@ + + +extern "C" +// GPU function for calculating the hypothesis function and individual gradient update for each feature of each sample +__global__ void map(int m, double *xs, double *ys, double *gradvec, double *params, int d){ // m is the no. of samples and d is the number of features in xs(input data) + //double *h; + //cudaMalloc (&h, m*sizeof(float)); + int index = blockIdx.x * blockDim.x + threadIdx.x; + + if (index 0; offset /= 2) { + val += __shfl_double(val, (i + offset) % WARPSIZE); + } + return val; +} + +__device__ inline double4 __shfl_double4(double4 d, int lane) { + // Split the double number into 2 32b registers. + int lox, loy, loz, low, hix, hiy, hiz, hiw; + asm volatile("mov.b64 {%0,%1}, %2;" : "=r"(lox), "=r"(hix) : "d"(d.x)); + asm volatile("mov.b64 {%0,%1}, %2;" : "=r"(loy), "=r"(hiy) : "d"(d.y)); + asm volatile("mov.b64 {%0,%1}, %2;" : "=r"(loz), "=r"(hiz) : "d"(d.z)); + asm volatile("mov.b64 {%0,%1}, %2;" : "=r"(low), "=r"(hiw) : "d"(d.w)); + + // Shuffle the two 32b registers. + lox = __shfl(lox, lane); + hix = __shfl(hix, lane); + loy = __shfl(loy, lane); + hiy = __shfl(hiy, lane); + loz = __shfl(loz, lane); + hiz = __shfl(hiz, lane); + low = __shfl(low, lane); + hiw = __shfl(hiw, lane); + + // Recreate the 64b number. + asm volatile("mov.b64 %0, {%1,%2};" : "=d"(d.x) : "r"(lox), "r"(hix)); + asm volatile("mov.b64 %0, {%1,%2};" : "=d"(d.y) : "r"(loy), "r"(hiy)); + asm volatile("mov.b64 %0, {%1,%2};" : "=d"(d.z) : "r"(loz), "r"(hiz)); + asm volatile("mov.b64 %0, {%1,%2};" : "=d"(d.w) : "r"(low), "r"(hiw)); + return d; +} + +__device__ inline double4 warpReduceVSum(double4 val4) { + int i = blockIdx.x * blockDim.x + threadIdx.x; +#pragma unroll + for (int offset = WARPSIZE / 2; offset > 0; offset /= 2) { + double4 shiftedVal4 = __shfl_double4(val4, (i + offset) % WARPSIZE); + val4.x += shiftedVal4.x; + val4.y += shiftedVal4.y; + val4.z += shiftedVal4.z; + val4.w += shiftedVal4.w; + } + return val4; +} + + + +__device__ double* deviceReduceKernelj(double * inArray, double *out, long i, long n, long length) { + double sum = 0; + double *inArrayBody; + int index = blockIdx.x * blockDim.x + threadIdx.x; + for (long idx = index; idx < n; idx += blockDim.x * gridDim.x) { + inArrayBody = &inArray[idx*length]; + sum += inArrayBody[i]; + } + + sum = warpReduceSum(sum); + + if ((threadIdx.x & (WARPSIZE -1)) == 0){ + atomicAddDouble(out, sum); + } + return out; +} + + + + +__device__ void deviceReduceArrayKernelj(double * inArray, double *outputArrayBody, long length, long n) { + long i = 0; + double *inArrayBody; + + // unrolled version + while ((length - i) >= 4) { + double4 sum4; + sum4.x = 0; sum4.y = 0; sum4.z = 0; sum4.w = 0; + for (long idx = blockIdx.x * blockDim.x + threadIdx.x; idx < n; idx += blockDim.x * gridDim.x) { + inArrayBody = &inArray[idx*length]; + sum4.x += inArrayBody[i]; + sum4.y += inArrayBody[i+1]; + sum4.z += inArrayBody[i+2]; + sum4.w += inArrayBody[i+3]; + } + + sum4 = warpReduceVSum(sum4); + + if ((threadIdx.x & (WARPSIZE - 1)) == 0) { + + double *outx = &outputArrayBody[i]; + double *outy = &outputArrayBody[i+1]; + double *outz = &outputArrayBody[i+2]; + double *outw = &outputArrayBody[i+3]; + atomicAddDouble(outx, sum4.x); + atomicAddDouble(outy, sum4.y); + atomicAddDouble(outz, sum4.z); + atomicAddDouble(outw, sum4.w); + } + i += 4; + } + + for (; i < length; i++) { + deviceReduceKernelj(inArray, &outputArrayBody[i], i, n, length); + } +} + +// Finds the final gradient by summing up the element-wise gradients columnwise +extern "C" +__global__ +void reducegrad(int m, double *gradvec, double * sumgradvec, int d) { + + int idx = blockDim.x * blockIdx.x + threadIdx.x; + + if (idx < m) + deviceReduceArrayKernelj(gradvec, sumgradvec, d, m); + +} + + diff --git a/examples/src/main/resources/LogisticRegression.ptx b/examples/src/main/resources/LogisticRegression.ptx new file mode 100644 index 0000000..c258918 --- /dev/null +++ b/examples/src/main/resources/LogisticRegression.ptx @@ -0,0 +1,745 @@ +// +// Generated by NVIDIA NVVM Compiler +// +// Compiler Build ID: CL-21313570 +// Cuda compilation tools, release 8.0, V8.0.53 +// Based on LLVM 3.4svn +// + +.version 5.0 +.target sm_35 +.address_size 64 + + // .globl map + +.visible .entry map( + .param .u32 map_param_0, + .param .u64 map_param_1, + .param .u64 map_param_2, + .param .u64 map_param_3, + .param .u64 map_param_4, + .param .u32 map_param_5 +) +{ + .reg .pred %p<9>; + .reg .f32 %f<3>; + .reg .b32 %r<34>; + .reg .f64 %fd<56>; + .reg .b64 %rd<28>; + + + ld.param.u32 %r13, [map_param_0]; + ld.param.u64 %rd18, [map_param_1]; + ld.param.u64 %rd15, [map_param_2]; + ld.param.u64 %rd16, [map_param_3]; + ld.param.u64 %rd17, [map_param_4]; + ld.param.u32 %r12, [map_param_5]; + cvta.to.global.u64 %rd1, %rd18; + mov.u32 %r1, %ntid.x; + mov.u32 %r2, %ctaid.x; + mov.u32 %r3, %tid.x; + mad.lo.s32 %r4, %r1, %r2, %r3; + setp.ge.s32 %p1, %r4, %r13; + @%p1 bra BB0_10; + + mov.f64 %fd54, 0d0000000000000000; + setp.lt.s32 %p2, %r12, 1; + @%p2 bra BB0_4; + + cvta.to.global.u64 %rd25, %rd17; + mul.lo.s32 %r16, %r12, %r4; + mul.wide.s32 %rd19, %r16, 8; + add.s64 %rd24, %rd1, %rd19; + mov.f64 %fd54, 0d0000000000000000; + mov.u32 %r32, 0; + +BB0_3: + ld.global.f64 %fd11, [%rd25]; + ld.global.f64 %fd12, [%rd24]; + fma.rn.f64 %fd54, %fd12, %fd11, %fd54; + add.s64 %rd25, %rd25, 8; + add.s64 %rd24, %rd24, 8; + add.s32 %r32, %r32, 1; + setp.lt.s32 %p3, %r32, %r12; + @%p3 bra BB0_3; + +BB0_4: + neg.f64 %fd13, %fd54; + mov.f64 %fd14, 0d4338000000000000; + mov.f64 %fd15, 0d3FF71547652B82FE; + fma.rn.f64 %fd16, %fd13, %fd15, %fd14; + { + .reg .b32 %temp; + mov.b64 {%r7, %temp}, %fd16; + } + mov.f64 %fd17, 0dC338000000000000; + add.rn.f64 %fd18, %fd16, %fd17; + mov.f64 %fd19, 0dBFE62E42FEFA39EF; + fma.rn.f64 %fd20, %fd18, %fd19, %fd13; + mov.f64 %fd21, 0dBC7ABC9E3B39803F; + fma.rn.f64 %fd22, %fd18, %fd21, %fd20; + mov.f64 %fd23, 0d3E928AF3FCA213EA; + mov.f64 %fd24, 0d3E5ADE1569CE2BDF; + fma.rn.f64 %fd25, %fd24, %fd22, %fd23; + mov.f64 %fd26, 0d3EC71DEE62401315; + fma.rn.f64 %fd27, %fd25, %fd22, %fd26; + mov.f64 %fd28, 0d3EFA01997C89EB71; + fma.rn.f64 %fd29, %fd27, %fd22, %fd28; + mov.f64 %fd30, 0d3F2A01A014761F65; + fma.rn.f64 %fd31, %fd29, %fd22, %fd30; + mov.f64 %fd32, 0d3F56C16C1852B7AF; + fma.rn.f64 %fd33, %fd31, %fd22, %fd32; + mov.f64 %fd34, 0d3F81111111122322; + fma.rn.f64 %fd35, %fd33, %fd22, %fd34; + mov.f64 %fd36, 0d3FA55555555502A1; + fma.rn.f64 %fd37, %fd35, %fd22, %fd36; + mov.f64 %fd38, 0d3FC5555555555511; + fma.rn.f64 %fd39, %fd37, %fd22, %fd38; + mov.f64 %fd40, 0d3FE000000000000B; + fma.rn.f64 %fd41, %fd39, %fd22, %fd40; + mov.f64 %fd42, 0d3FF0000000000000; + fma.rn.f64 %fd43, %fd41, %fd22, %fd42; + fma.rn.f64 %fd44, %fd43, %fd22, %fd42; + { + .reg .b32 %temp; + mov.b64 {%r8, %temp}, %fd44; + } + { + .reg .b32 %temp; + mov.b64 {%temp, %r9}, %fd44; + } + shl.b32 %r17, %r7, 20; + add.s32 %r18, %r9, %r17; + mov.b64 %fd55, {%r8, %r18}; + { + .reg .b32 %temp; + mov.b64 {%temp, %r19}, %fd13; + } + mov.b32 %f2, %r19; + abs.f32 %f1, %f2; + setp.lt.f32 %p4, %f1, 0f4086232B; + @%p4 bra BB0_7; + + setp.gt.f64 %p5, %fd54, 0d8000000000000000; + mov.f64 %fd45, 0d7FF0000000000000; + sub.f64 %fd46, %fd45, %fd54; + selp.f64 %fd55, 0d0000000000000000, %fd46, %p5; + setp.geu.f32 %p6, %f1, 0f40874800; + @%p6 bra BB0_7; + + shr.u32 %r20, %r7, 31; + add.s32 %r21, %r7, %r20; + shr.s32 %r22, %r21, 1; + shl.b32 %r23, %r22, 20; + add.s32 %r24, %r23, %r9; + mov.b64 %fd47, {%r8, %r24}; + sub.s32 %r25, %r7, %r22; + shl.b32 %r26, %r25, 20; + add.s32 %r27, %r26, 1072693248; + mov.u32 %r28, 0; + mov.b64 %fd48, {%r28, %r27}; + mul.f64 %fd55, %fd47, %fd48; + +BB0_7: + @%p2 bra BB0_10; + + cvta.to.global.u64 %rd20, %rd16; + cvta.to.global.u64 %rd21, %rd15; + add.f64 %fd49, %fd55, 0d3FF0000000000000; + rcp.rn.f64 %fd8, %fd49; + mul.wide.s32 %rd22, %r4, 8; + add.s64 %rd8, %rd21, %rd22; + mul.lo.s32 %r31, %r12, %r4; + mul.wide.s32 %rd23, %r31, 8; + add.s64 %rd27, %rd20, %rd23; + add.s64 %rd26, %rd1, %rd23; + mov.u32 %r33, 0; + +BB0_9: + ld.global.f64 %fd50, [%rd8]; + sub.f64 %fd51, %fd8, %fd50; + ld.global.f64 %fd52, [%rd26]; + mul.f64 %fd53, %fd51, %fd52; + st.global.f64 [%rd27], %fd53; + add.s64 %rd27, %rd27, 8; + add.s64 %rd26, %rd26, 8; + add.s32 %r33, %r33, 1; + setp.lt.s32 %p8, %r33, %r12; + @%p8 bra BB0_9; + +BB0_10: + ret; +} + + // .globl reducegrad +.visible .entry reducegrad( + .param .u32 reducegrad_param_0, + .param .u64 reducegrad_param_1, + .param .u64 reducegrad_param_2, + .param .u32 reducegrad_param_3 +) +{ + .reg .pred %p<16>; + .reg .b32 %r<373>; + .reg .f64 %fd<99>; + .reg .b64 %rd<66>; + + + ld.param.u32 %r16, [reducegrad_param_0]; + ld.param.u64 %rd35, [reducegrad_param_1]; + ld.param.u64 %rd36, [reducegrad_param_2]; + ld.param.u32 %r17, [reducegrad_param_3]; + cvta.to.global.u64 %rd1, %rd36; + cvta.to.global.u64 %rd2, %rd35; + mov.u32 %r18, %ctaid.x; + mov.u32 %r1, %ntid.x; + mov.u32 %r2, %tid.x; + mad.lo.s32 %r3, %r18, %r1, %r2; + setp.ge.s32 %p1, %r3, %r16; + @%p1 bra BB1_24; + + cvt.s64.s32 %rd3, %r17; + cvt.s64.s32 %rd4, %r16; + mov.u64 %rd63, 0; + setp.lt.s32 %p2, %r17, 4; + @%p2 bra BB1_16; + + cvt.u64.u32 %rd5, %r3; + and.b32 %r4, %r2, 31; + mov.u32 %r19, %nctaid.x; + mul.lo.s32 %r20, %r19, %r1; + cvt.u64.u32 %rd6, %r20; + add.s32 %r21, %r3, 16; + shr.s32 %r22, %r21, 31; + shr.u32 %r23, %r22, 27; + add.s32 %r24, %r21, %r23; + and.b32 %r25, %r24, -32; + sub.s32 %r5, %r21, %r25; + add.s32 %r26, %r3, 8; + shr.s32 %r27, %r26, 31; + shr.u32 %r28, %r27, 27; + add.s32 %r29, %r26, %r28; + and.b32 %r30, %r29, -32; + sub.s32 %r6, %r26, %r30; + add.s32 %r31, %r3, 4; + shr.s32 %r32, %r31, 31; + shr.u32 %r33, %r32, 27; + add.s32 %r34, %r31, %r33; + and.b32 %r35, %r34, -32; + sub.s32 %r7, %r31, %r35; + add.s32 %r36, %r3, 2; + shr.s32 %r37, %r36, 31; + shr.u32 %r38, %r37, 27; + add.s32 %r39, %r36, %r38; + and.b32 %r40, %r39, -32; + sub.s32 %r8, %r36, %r40; + add.s32 %r41, %r3, 1; + shr.s32 %r42, %r41, 31; + shr.u32 %r43, %r42, 27; + add.s32 %r44, %r41, %r43; + and.b32 %r45, %r44, -32; + sub.s32 %r9, %r41, %r45; + mov.u64 %rd63, 0; + +BB1_3: + mov.f64 %fd97, 0d0000000000000000; + mov.f64 %fd96, %fd97; + mov.f64 %fd95, %fd97; + mov.f64 %fd94, %fd97; + setp.ge.s64 %p3, %rd5, %rd4; + @%p3 bra BB1_6; + + mov.f64 %fd97, 0d0000000000000000; + mov.f64 %fd96, %fd97; + mov.f64 %fd95, %fd97; + mov.f64 %fd94, %fd97; + mov.u64 %rd58, %rd5; + +BB1_5: + mov.u64 %rd8, %rd58; + mul.lo.s64 %rd39, %rd8, %rd3; + add.s64 %rd40, %rd39, %rd63; + shl.b64 %rd41, %rd40, 3; + add.s64 %rd42, %rd2, %rd41; + ld.global.f64 %fd28, [%rd42]; + add.f64 %fd94, %fd94, %fd28; + ld.global.f64 %fd29, [%rd42+8]; + add.f64 %fd95, %fd95, %fd29; + ld.global.f64 %fd30, [%rd42+16]; + add.f64 %fd96, %fd96, %fd30; + ld.global.f64 %fd31, [%rd42+24]; + add.f64 %fd97, %fd97, %fd31; + add.s64 %rd9, %rd6, %rd8; + setp.lt.s64 %p4, %rd9, %rd4; + mov.u64 %rd58, %rd9; + @%p4 bra BB1_5; + +BB1_6: + // inline asm + mov.b64 {%r46,%r47}, %fd94; + // inline asm + // inline asm + mov.b64 {%r48,%r49}, %fd95; + // inline asm + // inline asm + mov.b64 {%r50,%r51}, %fd96; + // inline asm + // inline asm + mov.b64 {%r52,%r53}, %fd97; + // inline asm + mov.u32 %r277, 31; + // inline asm + shfl.idx.b32 %r54, %r46, %r5, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r58, %r47, %r5, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r62, %r48, %r5, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r66, %r49, %r5, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r70, %r50, %r5, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r74, %r51, %r5, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r78, %r52, %r5, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r82, %r53, %r5, %r277; + // inline asm + // inline asm + mov.b64 %fd36, {%r54,%r58}; + // inline asm + // inline asm + mov.b64 %fd37, {%r62,%r66}; + // inline asm + // inline asm + mov.b64 %fd38, {%r70,%r74}; + // inline asm + // inline asm + mov.b64 %fd39, {%r78,%r82}; + // inline asm + add.f64 %fd40, %fd94, %fd36; + add.f64 %fd41, %fd95, %fd37; + add.f64 %fd42, %fd96, %fd38; + add.f64 %fd43, %fd97, %fd39; + // inline asm + mov.b64 {%r94,%r95}, %fd40; + // inline asm + // inline asm + mov.b64 {%r96,%r97}, %fd41; + // inline asm + // inline asm + mov.b64 {%r98,%r99}, %fd42; + // inline asm + // inline asm + mov.b64 {%r100,%r101}, %fd43; + // inline asm + // inline asm + shfl.idx.b32 %r102, %r94, %r6, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r106, %r95, %r6, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r110, %r96, %r6, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r114, %r97, %r6, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r118, %r98, %r6, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r122, %r99, %r6, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r126, %r100, %r6, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r130, %r101, %r6, %r277; + // inline asm + // inline asm + mov.b64 %fd44, {%r102,%r106}; + // inline asm + // inline asm + mov.b64 %fd45, {%r110,%r114}; + // inline asm + // inline asm + mov.b64 %fd46, {%r118,%r122}; + // inline asm + // inline asm + mov.b64 %fd47, {%r126,%r130}; + // inline asm + add.f64 %fd48, %fd40, %fd44; + add.f64 %fd49, %fd41, %fd45; + add.f64 %fd50, %fd42, %fd46; + add.f64 %fd51, %fd43, %fd47; + // inline asm + mov.b64 {%r142,%r143}, %fd48; + // inline asm + // inline asm + mov.b64 {%r144,%r145}, %fd49; + // inline asm + // inline asm + mov.b64 {%r146,%r147}, %fd50; + // inline asm + // inline asm + mov.b64 {%r148,%r149}, %fd51; + // inline asm + // inline asm + shfl.idx.b32 %r150, %r142, %r7, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r154, %r143, %r7, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r158, %r144, %r7, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r162, %r145, %r7, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r166, %r146, %r7, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r170, %r147, %r7, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r174, %r148, %r7, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r178, %r149, %r7, %r277; + // inline asm + // inline asm + mov.b64 %fd52, {%r150,%r154}; + // inline asm + // inline asm + mov.b64 %fd53, {%r158,%r162}; + // inline asm + // inline asm + mov.b64 %fd54, {%r166,%r170}; + // inline asm + // inline asm + mov.b64 %fd55, {%r174,%r178}; + // inline asm + add.f64 %fd56, %fd48, %fd52; + add.f64 %fd57, %fd49, %fd53; + add.f64 %fd58, %fd50, %fd54; + add.f64 %fd59, %fd51, %fd55; + // inline asm + mov.b64 {%r190,%r191}, %fd56; + // inline asm + // inline asm + mov.b64 {%r192,%r193}, %fd57; + // inline asm + // inline asm + mov.b64 {%r194,%r195}, %fd58; + // inline asm + // inline asm + mov.b64 {%r196,%r197}, %fd59; + // inline asm + // inline asm + shfl.idx.b32 %r198, %r190, %r8, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r202, %r191, %r8, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r206, %r192, %r8, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r210, %r193, %r8, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r214, %r194, %r8, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r218, %r195, %r8, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r222, %r196, %r8, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r226, %r197, %r8, %r277; + // inline asm + // inline asm + mov.b64 %fd60, {%r198,%r202}; + // inline asm + // inline asm + mov.b64 %fd61, {%r206,%r210}; + // inline asm + // inline asm + mov.b64 %fd62, {%r214,%r218}; + // inline asm + // inline asm + mov.b64 %fd63, {%r222,%r226}; + // inline asm + add.f64 %fd64, %fd56, %fd60; + add.f64 %fd65, %fd57, %fd61; + add.f64 %fd66, %fd58, %fd62; + add.f64 %fd67, %fd59, %fd63; + // inline asm + mov.b64 {%r238,%r239}, %fd64; + // inline asm + // inline asm + mov.b64 {%r240,%r241}, %fd65; + // inline asm + // inline asm + mov.b64 {%r242,%r243}, %fd66; + // inline asm + // inline asm + mov.b64 {%r244,%r245}, %fd67; + // inline asm + // inline asm + shfl.idx.b32 %r246, %r238, %r9, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r250, %r239, %r9, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r254, %r240, %r9, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r258, %r241, %r9, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r262, %r242, %r9, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r266, %r243, %r9, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r270, %r244, %r9, %r277; + // inline asm + // inline asm + shfl.idx.b32 %r274, %r245, %r9, %r277; + // inline asm + // inline asm + mov.b64 %fd68, {%r246,%r250}; + // inline asm + // inline asm + mov.b64 %fd69, {%r254,%r258}; + // inline asm + // inline asm + mov.b64 %fd70, {%r262,%r266}; + // inline asm + // inline asm + mov.b64 %fd71, {%r270,%r274}; + // inline asm + add.f64 %fd13, %fd64, %fd68; + add.f64 %fd14, %fd65, %fd69; + add.f64 %fd15, %fd66, %fd70; + add.f64 %fd16, %fd67, %fd71; + setp.ne.s32 %p5, %r4, 0; + @%p5 bra BB1_15; + + shl.b64 %rd43, %rd63, 3; + add.s64 %rd10, %rd1, %rd43; + ld.global.u64 %rd59, [%rd10]; + +BB1_8: + mov.u64 %rd12, %rd59; + mov.b64 %fd72, %rd12; + add.f64 %fd73, %fd13, %fd72; + mov.b64 %rd44, %fd73; + atom.global.cas.b64 %rd59, [%rd10], %rd12, %rd44; + setp.ne.s64 %p6, %rd12, %rd59; + @%p6 bra BB1_8; + + ld.global.u64 %rd60, [%rd10+8]; + +BB1_10: + mov.u64 %rd15, %rd60; + add.s64 %rd45, %rd10, 8; + mov.b64 %fd74, %rd15; + add.f64 %fd75, %fd14, %fd74; + mov.b64 %rd46, %fd75; + atom.global.cas.b64 %rd60, [%rd45], %rd15, %rd46; + setp.ne.s64 %p7, %rd15, %rd60; + @%p7 bra BB1_10; + + ld.global.u64 %rd61, [%rd10+16]; + +BB1_12: + mov.u64 %rd18, %rd61; + add.s64 %rd47, %rd10, 16; + mov.b64 %fd76, %rd18; + add.f64 %fd77, %fd15, %fd76; + mov.b64 %rd48, %fd77; + atom.global.cas.b64 %rd61, [%rd47], %rd18, %rd48; + setp.ne.s64 %p8, %rd18, %rd61; + @%p8 bra BB1_12; + + ld.global.u64 %rd62, [%rd10+24]; + +BB1_14: + mov.u64 %rd21, %rd62; + add.s64 %rd49, %rd10, 24; + mov.b64 %fd78, %rd21; + add.f64 %fd79, %fd16, %fd78; + mov.b64 %rd50, %fd79; + atom.global.cas.b64 %rd62, [%rd49], %rd21, %rd50; + setp.ne.s64 %p9, %rd21, %rd62; + @%p9 bra BB1_14; + +BB1_15: + add.s64 %rd63, %rd63, 4; + sub.s64 %rd51, %rd3, %rd63; + setp.gt.s64 %p10, %rd51, 3; + @%p10 bra BB1_3; + +BB1_16: + setp.ge.s64 %p11, %rd63, %rd3; + @%p11 bra BB1_24; + + cvt.s64.s32 %rd25, %r3; + mov.u32 %r286, %nctaid.x; + mul.lo.s32 %r287, %r286, %r1; + cvt.u64.u32 %rd26, %r287; + and.b32 %r10, %r2, 31; + add.s32 %r288, %r3, 16; + shr.s32 %r289, %r288, 31; + shr.u32 %r290, %r289, 27; + add.s32 %r291, %r288, %r290; + and.b32 %r292, %r291, -32; + sub.s32 %r11, %r288, %r292; + add.s32 %r293, %r3, 8; + shr.s32 %r294, %r293, 31; + shr.u32 %r295, %r294, 27; + add.s32 %r296, %r293, %r295; + and.b32 %r297, %r296, -32; + sub.s32 %r12, %r293, %r297; + add.s32 %r298, %r3, 4; + shr.s32 %r299, %r298, 31; + shr.u32 %r300, %r299, 27; + add.s32 %r301, %r298, %r300; + and.b32 %r302, %r301, -32; + sub.s32 %r13, %r298, %r302; + add.s32 %r303, %r3, 2; + shr.s32 %r304, %r303, 31; + shr.u32 %r305, %r304, 27; + add.s32 %r306, %r303, %r305; + and.b32 %r307, %r306, -32; + sub.s32 %r14, %r303, %r307; + add.s32 %r308, %r3, 1; + shr.s32 %r309, %r308, 31; + shr.u32 %r310, %r309, 27; + add.s32 %r311, %r308, %r310; + and.b32 %r312, %r311, -32; + sub.s32 %r15, %r308, %r312; + +BB1_18: + shl.b64 %rd52, %rd63, 3; + add.s64 %rd28, %rd1, %rd52; + mov.f64 %fd98, 0d0000000000000000; + mov.u64 %rd64, %rd25; + +BB1_19: + mov.u64 %rd29, %rd64; + mul.lo.s64 %rd53, %rd29, %rd3; + add.s64 %rd54, %rd53, %rd63; + shl.b64 %rd55, %rd54, 3; + add.s64 %rd56, %rd2, %rd55; + ld.global.f64 %fd81, [%rd56]; + add.f64 %fd98, %fd98, %fd81; + add.s64 %rd30, %rd26, %rd29; + setp.lt.s64 %p12, %rd30, %rd4; + mov.u64 %rd64, %rd30; + @%p12 bra BB1_19; + + // inline asm + mov.b64 {%r313,%r314}, %fd98; + // inline asm + mov.u32 %r370, 31; + // inline asm + shfl.idx.b32 %r315, %r313, %r11, %r370; + // inline asm + // inline asm + shfl.idx.b32 %r319, %r314, %r11, %r370; + // inline asm + // inline asm + mov.b64 %fd83, {%r315,%r319}; + // inline asm + add.f64 %fd84, %fd98, %fd83; + // inline asm + mov.b64 {%r325,%r326}, %fd84; + // inline asm + // inline asm + shfl.idx.b32 %r327, %r325, %r12, %r370; + // inline asm + // inline asm + shfl.idx.b32 %r331, %r326, %r12, %r370; + // inline asm + // inline asm + mov.b64 %fd85, {%r327,%r331}; + // inline asm + add.f64 %fd86, %fd84, %fd85; + // inline asm + mov.b64 {%r337,%r338}, %fd86; + // inline asm + // inline asm + shfl.idx.b32 %r339, %r337, %r13, %r370; + // inline asm + // inline asm + shfl.idx.b32 %r343, %r338, %r13, %r370; + // inline asm + // inline asm + mov.b64 %fd87, {%r339,%r343}; + // inline asm + add.f64 %fd88, %fd86, %fd87; + // inline asm + mov.b64 {%r349,%r350}, %fd88; + // inline asm + // inline asm + shfl.idx.b32 %r351, %r349, %r14, %r370; + // inline asm + // inline asm + shfl.idx.b32 %r355, %r350, %r14, %r370; + // inline asm + // inline asm + mov.b64 %fd89, {%r351,%r355}; + // inline asm + add.f64 %fd90, %fd88, %fd89; + // inline asm + mov.b64 {%r361,%r362}, %fd90; + // inline asm + // inline asm + shfl.idx.b32 %r363, %r361, %r15, %r370; + // inline asm + // inline asm + shfl.idx.b32 %r367, %r362, %r15, %r370; + // inline asm + // inline asm + mov.b64 %fd91, {%r363,%r367}; + // inline asm + add.f64 %fd19, %fd90, %fd91; + setp.ne.s32 %p13, %r10, 0; + @%p13 bra BB1_23; + + ld.global.u64 %rd65, [%rd28]; + +BB1_22: + mov.u64 %rd32, %rd65; + mov.b64 %fd92, %rd32; + add.f64 %fd93, %fd19, %fd92; + mov.b64 %rd57, %fd93; + atom.global.cas.b64 %rd65, [%rd28], %rd32, %rd57; + setp.ne.s64 %p14, %rd32, %rd65; + @%p14 bra BB1_22; + +BB1_23: + add.s64 %rd63, %rd63, 1; + setp.lt.s64 %p15, %rd63, %rd3; + @%p15 bra BB1_18; + +BB1_24: + ret; +} + + diff --git a/examples/src/main/resources/Makefile b/examples/src/main/resources/Makefile index 531e265..4470d91 100644 --- a/examples/src/main/resources/Makefile +++ b/examples/src/main/resources/Makefile @@ -6,7 +6,7 @@ CXXFLAGS ?= -m64 -O3 -Xcompiler -Wall --std=c++11 -g NVCCFLAGS ?= -arch=sm_$(COMPUTE_CAPABILITY) -Xptxas="-v" -all : SparkGPUExamples.ptx GpuEnablerExamples.ptx +all : SparkGPUExamples.ptx GpuEnablerExamples.ptx LogisticRegression.ptx SparkGPUKmeans.ptx SparkGPUExamples.ptx: SparkGPUExamples.cu $(NVCC) -ccbin $(CXX) $(CXXFLAGS) $(NVCCFLAGS) -ptx -o $@ -c $^ @@ -14,5 +14,11 @@ SparkGPUExamples.ptx: SparkGPUExamples.cu GpuEnablerExamples.ptx: GpuEnablerExamples.cu $(NVCC) -ccbin $(CXX) $(CXXFLAGS) $(NVCCFLAGS) -ptx -o $@ -c $^ +LogisticRegression.ptx: LogisticRegression.cu + $(NVCC) -ccbin $(CXX) $(CXXFLAGS) $(NVCCFLAGS) -ptx -o $@ -c $^ + +SparkGPUKmeans.ptx: SparkGPUKmeans.cu + $(NVCC) -ccbin $(CXX) $(CXXFLAGS) $(NVCCFLAGS) -ptx -o $@ -c $^ + clean: - rm -f SparkGPUExamples.ptx GpuEnablerExamples.ptx + rm -f SparkGPUExamples.ptx GpuEnablerExamples.ptx LogisticRegression.ptx SparkGPUKmeans.ptx diff --git a/examples/src/main/resources/SparkGPUKmeans.cu b/examples/src/main/resources/SparkGPUKmeans.cu new file mode 100644 index 0000000..95872e6 --- /dev/null +++ b/examples/src/main/resources/SparkGPUKmeans.cu @@ -0,0 +1,116 @@ +#include + +extern "C" +__global__ void getClusterCentroids(int n, double *xs, int *cluster_index, double *c, int k, int d){ + //xs indicates datapoints, c indicates initial centroids, k indicates no. of clusters; d - dimensions + + int index = blockIdx.x * blockDim.x + threadIdx.x; + if (indexn) end = n; + else end = end1; + + if (end > n ) return; + // loop for every K + for (int clust = threadIdx.y; clust < k; clust+= blockDim.y){ + // loop for every dimension(features) + for (int dim = threadIdx.x; dim < d; dim+= blockDim.x) { + + // Calculate intermediate S0 + // for counts we don't have dimensions + if (dim ==0) { + int count = 0; + for(int z=start; z 0) return; + + + // Only block is invoked. + // loop for every K + for (int clust = threadIdx.y; clust < k; clust+= blockDim.y){ + // loop for every dimension(features) + for (int dim = threadIdx.x; dim < d; dim+= blockDim.x) { + + // Calculate S0 + // for counts we don't have dimensions + if (dim == 0) { + //count = 0; + for(int z = clust; z < 450*k; z+=k){ + { + s0[clust] += intermediates0[z]; + } + } + } + + // Calculate S1 and S2 + int start = clust * d + dim; + int kd = k * d; + double *s1end = &intermediates1[450 * kd]; + double *s1cur = &intermediates1[start]; + double *s2cur = &intermediates2[start]; + + for (; s1cur < s1end; s1cur += kd, s2cur += kd) + { + s1[start] += *s1cur; + s2[start] += *s2cur; + } + } + } +} + diff --git a/examples/src/main/resources/SparkGPUKmeans.ptx b/examples/src/main/resources/SparkGPUKmeans.ptx new file mode 100644 index 0000000..428103d --- /dev/null +++ b/examples/src/main/resources/SparkGPUKmeans.ptx @@ -0,0 +1,397 @@ +// +// Generated by NVIDIA NVVM Compiler +// +// Compiler Build ID: CL-21313570 +// Cuda compilation tools, release 8.0, V8.0.53 +// Based on LLVM 3.4svn +// + +.version 5.0 +.target sm_35 +.address_size 64 + + // .globl getClusterCentroids + +.visible .entry getClusterCentroids( + .param .u32 getClusterCentroids_param_0, + .param .u64 getClusterCentroids_param_1, + .param .u64 getClusterCentroids_param_2, + .param .u64 getClusterCentroids_param_3, + .param .u32 getClusterCentroids_param_4, + .param .u32 getClusterCentroids_param_5 +) +{ + .reg .pred %p<7>; + .reg .b32 %r<28>; + .reg .f64 %fd<16>; + .reg .b64 %rd<19>; + + + ld.param.u32 %r14, [getClusterCentroids_param_0]; + ld.param.u64 %rd8, [getClusterCentroids_param_1]; + ld.param.u64 %rd9, [getClusterCentroids_param_2]; + ld.param.u64 %rd10, [getClusterCentroids_param_3]; + ld.param.u32 %r12, [getClusterCentroids_param_4]; + ld.param.u32 %r13, [getClusterCentroids_param_5]; + mov.u32 %r1, %ntid.x; + mov.u32 %r2, %ctaid.x; + mov.u32 %r3, %tid.x; + mad.lo.s32 %r4, %r1, %r2, %r3; + setp.ge.s32 %p1, %r4, %r14; + @%p1 bra BB0_7; + + mov.u32 %r25, 0; + setp.lt.s32 %p2, %r12, 1; + @%p2 bra BB0_6; + + cvta.to.global.u64 %rd1, %rd10; + cvta.to.global.u64 %rd11, %rd8; + mul.lo.s32 %r19, %r13, %r4; + mul.wide.s32 %rd12, %r19, 8; + add.s64 %rd2, %rd11, %rd12; + mov.f64 %fd12, 0d7FEFFFFFFFFFFFFF; + mov.u32 %r17, 0; + mov.u32 %r22, %r17; + mov.u32 %r27, %r17; + +BB0_3: + mul.lo.s32 %r21, %r13, %r22; + mul.wide.s32 %rd13, %r21, 8; + add.s64 %rd17, %rd1, %rd13; + mov.f64 %fd14, 0d0000000000000000; + mov.f64 %fd15, %fd14; + setp.lt.s32 %p3, %r13, 1; + mov.u64 %rd18, %rd2; + mov.u32 %r26, %r17; + @%p3 bra BB0_5; + +BB0_4: + mov.u32 %r7, %r26; + mov.u64 %rd5, %rd18; + ld.global.f64 %fd9, [%rd17]; + ld.global.f64 %fd10, [%rd5]; + sub.f64 %fd11, %fd10, %fd9; + fma.rn.f64 %fd15, %fd11, %fd11, %fd15; + add.s64 %rd6, %rd5, 8; + add.s64 %rd17, %rd17, 8; + add.s32 %r8, %r7, 1; + setp.lt.s32 %p4, %r8, %r13; + mov.u64 %rd18, %rd6; + mov.f64 %fd14, %fd15; + mov.u32 %r26, %r8; + @%p4 bra BB0_4; + +BB0_5: + setp.lt.f64 %p5, %fd14, %fd12; + selp.f64 %fd12, %fd14, %fd12, %p5; + selp.b32 %r27, %r22, %r27, %p5; + add.s32 %r22, %r22, 1; + setp.lt.s32 %p6, %r22, %r12; + mov.u32 %r25, %r27; + @%p6 bra BB0_3; + +BB0_6: + cvta.to.global.u64 %rd14, %rd9; + mul.wide.s32 %rd15, %r4, 4; + add.s64 %rd16, %rd14, %rd15; + st.global.u32 [%rd16], %r25; + +BB0_7: + ret; +} + + // .globl calculateIntermediates +.visible .entry calculateIntermediates( + .param .u32 calculateIntermediates_param_0, + .param .u64 calculateIntermediates_param_1, + .param .u64 calculateIntermediates_param_2, + .param .u64 calculateIntermediates_param_3, + .param .u64 calculateIntermediates_param_4, + .param .u64 calculateIntermediates_param_5, + .param .u32 calculateIntermediates_param_6, + .param .u32 calculateIntermediates_param_7 +) +{ + .reg .pred %p<12>; + .reg .b32 %r<49>; + .reg .f64 %fd<26>; + .reg .b64 %rd<25>; + + + ld.param.u32 %r24, [calculateIntermediates_param_0]; + ld.param.u64 %rd10, [calculateIntermediates_param_1]; + ld.param.u64 %rd14, [calculateIntermediates_param_2]; + ld.param.u64 %rd11, [calculateIntermediates_param_3]; + ld.param.u64 %rd12, [calculateIntermediates_param_4]; + ld.param.u64 %rd13, [calculateIntermediates_param_5]; + ld.param.u32 %r22, [calculateIntermediates_param_6]; + ld.param.u32 %r23, [calculateIntermediates_param_7]; + cvta.to.global.u64 %rd1, %rd14; + mul.hi.s32 %r25, %r24, -1851608123; + add.s32 %r26, %r25, %r24; + shr.u32 %r27, %r26, 31; + shr.s32 %r28, %r26, 8; + add.s32 %r1, %r28, %r27; + add.s32 %r29, %r1, 1; + mov.u32 %r2, %ctaid.x; + mul.lo.s32 %r3, %r2, %r29; + add.s32 %r30, %r3, %r29; + min.s32 %r4, %r24, %r30; + mov.u32 %r41, %tid.y; + setp.ge.s32 %p1, %r41, %r22; + @%p1 bra BB1_14; + + cvta.to.global.u64 %rd2, %rd13; + cvta.to.global.u64 %rd3, %rd12; + cvta.to.global.u64 %rd4, %rd10; + cvta.to.global.u64 %rd5, %rd11; + mov.u32 %r6, %tid.x; + mov.u32 %r7, %ntid.y; + mul.lo.s32 %r8, %r2, %r22; + mov.u32 %r9, %ntid.x; + mul.lo.s32 %r32, %r2, %r29; + mul.wide.s32 %rd15, %r32, 4; + add.s64 %rd6, %rd1, %rd15; + +BB1_2: + setp.ge.s32 %p2, %r6, %r23; + @%p2 bra BB1_13; + + add.s32 %r33, %r41, %r8; + mul.wide.u32 %rd16, %r33, 4; + add.s64 %rd7, %rd5, %rd16; + mul.lo.s32 %r11, %r33, %r23; + mov.u32 %r42, %r6; + +BB1_4: + mov.u32 %r12, %r42; + setp.ne.s32 %p3, %r12, 0; + @%p3 bra BB1_8; + + mov.u32 %r44, 0; + mov.u32 %r45, %r44; + setp.ge.s32 %p4, %r3, %r4; + mov.u64 %rd24, %rd6; + mov.u32 %r48, %r3; + @%p4 bra BB1_7; + +BB1_6: + mov.u32 %r13, %r48; + mov.u64 %rd8, %rd24; + ld.global.u32 %r36, [%rd8]; + setp.eq.s32 %p5, %r36, %r41; + selp.u32 %r37, 1, 0, %p5; + add.s32 %r45, %r37, %r45; + add.s64 %rd9, %rd8, 4; + add.s32 %r16, %r13, 1; + setp.lt.s32 %p6, %r16, %r4; + mov.u64 %rd24, %rd9; + mov.u32 %r44, %r45; + mov.u32 %r48, %r16; + @%p6 bra BB1_6; + +BB1_7: + st.global.u32 [%rd7], %r44; + +BB1_8: + mov.f64 %fd23, 0d0000000000000000; + mov.f64 %fd17, %fd23; + mov.f64 %fd24, %fd23; + mov.f64 %fd18, %fd23; + setp.ge.s32 %p7, %r3, %r4; + mov.u32 %r47, %r3; + @%p7 bra BB1_12; + +BB1_9: + mov.f64 %fd20, %fd24; + mov.f64 %fd25, %fd20; + mov.f64 %fd14, %fd18; + mov.f64 %fd19, %fd14; + mul.wide.s32 %rd17, %r47, 4; + add.s64 %rd18, %rd1, %rd17; + ld.global.u32 %r38, [%rd18]; + setp.ne.s32 %p8, %r38, %r41; + @%p8 bra BB1_11; + + mad.lo.s32 %r39, %r47, %r23, %r12; + mul.wide.s32 %rd19, %r39, 8; + add.s64 %rd20, %rd4, %rd19; + ld.global.f64 %fd13, [%rd20]; + add.f64 %fd25, %fd25, %fd13; + fma.rn.f64 %fd19, %fd13, %fd13, %fd19; + +BB1_11: + mov.f64 %fd24, %fd25; + mov.f64 %fd18, %fd19; + add.s32 %r47, %r47, 1; + setp.lt.s32 %p9, %r47, %r4; + mov.f64 %fd17, %fd18; + mov.f64 %fd23, %fd24; + @%p9 bra BB1_9; + +BB1_12: + add.s32 %r40, %r12, %r11; + mul.wide.s32 %rd21, %r40, 8; + add.s64 %rd22, %rd3, %rd21; + st.global.f64 [%rd22], %fd23; + add.s64 %rd23, %rd2, %rd21; + st.global.f64 [%rd23], %fd17; + add.s32 %r20, %r9, %r12; + setp.lt.s32 %p10, %r20, %r23; + mov.u32 %r42, %r20; + @%p10 bra BB1_4; + +BB1_13: + add.s32 %r41, %r7, %r41; + setp.lt.s32 %p11, %r41, %r22; + @%p11 bra BB1_2; + +BB1_14: + ret; +} + + // .globl calculateFinal +.visible .entry calculateFinal( + .param .u32 calculateFinal_param_0, + .param .u64 calculateFinal_param_1, + .param .u64 calculateFinal_param_2, + .param .u64 calculateFinal_param_3, + .param .u64 calculateFinal_param_4, + .param .u64 calculateFinal_param_5, + .param .u64 calculateFinal_param_6, + .param .u32 calculateFinal_param_7, + .param .u32 calculateFinal_param_8 +) +{ + .reg .pred %p<12>; + .reg .b32 %r<41>; + .reg .f64 %fd<7>; + .reg .b64 %rd<39>; + + + ld.param.u64 %rd23, [calculateFinal_param_1]; + ld.param.u64 %rd28, [calculateFinal_param_2]; + ld.param.u64 %rd24, [calculateFinal_param_3]; + ld.param.u64 %rd25, [calculateFinal_param_4]; + ld.param.u64 %rd26, [calculateFinal_param_5]; + ld.param.u64 %rd27, [calculateFinal_param_6]; + ld.param.u32 %r23, [calculateFinal_param_7]; + ld.param.u32 %r24, [calculateFinal_param_8]; + cvta.to.global.u64 %rd1, %rd28; + mov.u32 %r25, %ctaid.x; + setp.ne.s32 %p1, %r25, 0; + @%p1 bra BB2_13; + + mov.u32 %r1, %tid.y; + setp.ge.s32 %p2, %r1, %r23; + @%p2 bra BB2_13; + + cvta.to.global.u64 %rd2, %rd27; + cvta.to.global.u64 %rd3, %rd24; + cvta.to.global.u64 %rd4, %rd26; + cvta.to.global.u64 %rd5, %rd25; + cvta.to.global.u64 %rd6, %rd23; + mov.u32 %r2, %tid.x; + mul.lo.s32 %r3, %r23, 450; + mul.lo.s32 %r27, %r24, %r23; + mul.lo.s32 %r28, %r27, 450; + mul.wide.s32 %rd29, %r28, 8; + add.s64 %rd7, %rd1, %rd29; + mov.u32 %r4, %ntid.x; + mul.wide.s32 %rd8, %r23, 4; + mad.lo.s32 %r5, %r1, %r24, %r2; + mov.u32 %r6, %ntid.y; + mul.lo.s32 %r7, %r6, %r24; + mul.wide.s32 %rd9, %r27, 8; + mov.u32 %r34, 0; + mov.u32 %r40, %r1; + +BB2_3: + mov.u32 %r38, %r40; + mov.u32 %r9, %r38; + setp.ge.s32 %p3, %r2, %r24; + @%p3 bra BB2_12; + + mad.lo.s32 %r10, %r7, %r34, %r5; + mad.lo.s32 %r30, %r6, %r34, %r1; + mul.wide.s32 %rd30, %r30, 4; + add.s64 %rd10, %rd6, %rd30; + mul.wide.s32 %rd31, %r9, 4; + add.s64 %rd11, %rd5, %rd31; + mul.lo.s32 %r11, %r9, %r24; + mov.u32 %r35, 0; + mov.u32 %r36, %r2; + +BB2_5: + mov.u32 %r13, %r36; + setp.ge.s32 %p4, %r9, %r3; + setp.ne.s32 %p5, %r13, 0; + or.pred %p6, %p5, %p4; + @%p6 bra BB2_8; + + ld.global.u32 %r37, [%rd11]; + mov.u64 %rd36, %rd10; + mov.u32 %r39, %r9; + +BB2_7: + mov.u32 %r16, %r39; + mov.u64 %rd12, %rd36; + ld.global.u32 %r31, [%rd12]; + add.s32 %r37, %r37, %r31; + st.global.u32 [%rd11], %r37; + add.s64 %rd13, %rd12, %rd8; + add.s32 %r18, %r16, %r23; + setp.lt.s32 %p7, %r18, %r3; + mov.u64 %rd36, %rd13; + mov.u32 %r39, %r18; + @%p7 bra BB2_7; + +BB2_8: + add.s32 %r32, %r13, %r11; + cvt.s64.s32 %rd14, %r32; + mul.wide.s32 %rd32, %r32, 8; + add.s64 %rd33, %rd1, %rd32; + setp.ge.u64 %p8, %rd33, %rd7; + @%p8 bra BB2_11; + + mad.lo.s32 %r33, %r4, %r35, %r10; + mul.wide.s32 %rd34, %r33, 8; + add.s64 %rd38, %rd3, %rd34; + add.s64 %rd37, %rd1, %rd34; + shl.b64 %rd35, %rd14, 3; + add.s64 %rd17, %rd4, %rd35; + add.s64 %rd18, %rd2, %rd35; + +BB2_10: + ld.global.f64 %fd1, [%rd17]; + ld.global.f64 %fd2, [%rd37]; + add.f64 %fd3, %fd2, %fd1; + st.global.f64 [%rd17], %fd3; + ld.global.f64 %fd4, [%rd18]; + ld.global.f64 %fd5, [%rd38]; + add.f64 %fd6, %fd5, %fd4; + st.global.f64 [%rd18], %fd6; + add.s64 %rd38, %rd38, %rd9; + add.s64 %rd37, %rd37, %rd9; + setp.lt.u64 %p9, %rd37, %rd7; + @%p9 bra BB2_10; + +BB2_11: + add.s32 %r19, %r4, %r13; + setp.lt.s32 %p10, %r19, %r24; + add.s32 %r35, %r35, 1; + mov.u32 %r36, %r19; + @%p10 bra BB2_5; + +BB2_12: + add.s32 %r21, %r6, %r9; + setp.lt.s32 %p11, %r21, %r23; + add.s32 %r34, %r34, 1; + mov.u32 %r40, %r21; + @%p11 bra BB2_3; + +BB2_13: + ret; +} + + diff --git a/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeans.scala b/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeans.scala new file mode 100644 index 0000000..b537488 --- /dev/null +++ b/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeans.scala @@ -0,0 +1,346 @@ + +package com.ibm.gpuenabler + +import java.util.Random +import org.apache.log4j.{Level, Logger} +import org.apache.spark.sql.{Dataset, SparkSession} +import com.ibm.gpuenabler.CUDADSImplicits._ +import scala.collection.mutable + +object GpuKMeans { + case class DataPointKMeans(features: Array[Double]) + case class ClusterIndexes(features: Array[Double], index: Int) + case class Results(s0: Array[Int], s1: Array[Double], s2: Array[Double]) + + def timeit(msg: String, code: => Any): Any ={ + val now1 = System.nanoTime + code + val ms1 = (System.nanoTime - now1) / 1000000 + println("%s Elapsed time: %d ms".format(msg, ms1)) + } + + def maxPoints(d: Int, k: Int, part: Int, nGpu: Int): Long = { + val perCluster: Long = 4*450 + (8*450 + 1) *d * 2 // s0 + s1 + s2 + val clusterBytes: Long = perCluster * k * 2 * part // we allocate twice + + val perPoint = 8 * d + 2 * 4 + val available: Long = (10L * 1024 * 1024 * 1024 * nGpu) - clusterBytes // 10GB + val maxPoints = available / perPoint + if (maxPoints <= 0) throw new IllegalArgumentException(s"too big: k * dimensions for the partition count: ${k} * ${d} * ${part} ") + maxPoints + } + + def main(args: Array[String]):Unit = { + + val masterURL = if (args.length > 0) args(0) else "local[*]" + val k: Int = if (args.length > 1) args(1).toInt else 32 + val N: Long = if (args.length > 2) args(2).toLong else 100000 + val d: Int= if (args.length > 3) args(3).toInt else 32 + val numSlices = if (args.length > 4) args(4).toInt else 1 + val iters: Int = if (args.length > 5) args(5).toInt else 5 + val nGpu: Int = if (args.length > 6) args(6).toInt else 1 + + if (N > maxPoints(d, k, numSlices, nGpu)) { + println(s"N(${N}) is too high for the given d(${d}) & k(${k}) ; MAX ${maxPoints(d, k, numSlices,nGpu) }") + return + } + + println(s"KMeans (${k}) Algorithm on N: ${N} datasets; Dimension: ${d}; slices: ${numSlices} for ${iters} iterations") + val spark = SparkSession.builder().master(masterURL).appName("SparkDSKMeans").getOrCreate() + import spark.implicits._ + + Logger.getRootLogger().setLevel(Level.ERROR) + + val data: Dataset[DataPointKMeans] = getDataSet(spark, N, d, numSlices) + println("Data Generation Done") + + println(" ======= GPU ===========") + + val (centers, cost) = runGpu(data, d, k, iters) + // printCenters("Cluster centers:", centers) + println(s"Cost: ${cost}") + + println(" ======= CPU ===========") + + val (ccenters, ccost) = run(data, d, k, iters) + + // printCenters("Cluster centers:", ccenters) + println(s"Cost: ${ccost}") + } + + def runGpu(data: Dataset[DataPointKMeans], d: Int, k: Int, + maxIterations: Int): (Array[DataPointKMeans], Double) = { + + import data.sparkSession.implicits._ + + val epsilon = 0.5 + var changed = true + var iteration = 0 + var cost = 0.0 + val ptxURL = "/SparkGPUKmeans.ptx" + + val centroidFn = DSCUDAFunction( + "getClusterCentroids", + Array("features"), + Array("index"), + ptxURL) + + var limit = 1024 + var modD = Math.min(d, limit) + var modK = Math.min(k, limit) + if (modD * modK > limit) { + if (modD <= modK) + modD = Math.max(limit/modK, 1) + else + modK = Math.max(limit/modD, 1) + } + + println(s"Dimension are modD x modK : ${modD} x ${modK}") + + val dimensions1 = (size: Long, stage: Int) => stage match { + case 0 => (450, modD, 1, modK, 1, 1) + } + + val gpuParams1 = gpuParameters(dimensions1) + + val interFn = DSCUDAFunction( + "calculateIntermediates", + Array("features", "index"), + Array("s0", "s1", "s2"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams1), outputSize=Some(450)) + + + val dimensions2 = (size: Long, stage: Int) => stage match { + case 0 => (1, modD, 1, modK, 1, 1) + } + + val gpuParams2 = gpuParameters(dimensions2) + + val sumFn = DSCUDAFunction( + "calculateFinal", + Array("s0", "s1", "s2"), + Array("s0", "s1", "s2"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams2), outputSize=Some(1)) + + + def func1(p: DataPointKMeans): ClusterIndexes = { + ClusterIndexes(p.features, 0) + } + + def func2(c: ClusterIndexes): Results = { + Results(Array.empty, Array.empty, Array.empty) + } + + def func3(r1: Results, r2: Results): Results = { + Results(addArr(r1.s0, r2.s0), + addArr(r1.s1, r2.s1),addArr(r1.s2, r2.s2)) + } + + val means: Array[DataPointKMeans] = data.rdd.takeSample(true, k, 42) + + data.cacheGpu(true) + timeit("Data loaded in GPU ", { data.loadGpu() }) + + var oldMeans = means + + timeit("GPU :: ", { + // while (changed && iteration < maxIterations) { + while (iteration < maxIterations) { + val oldCentroids = oldMeans.flatMap(p => p.features) + + // this gets distributed + val centroidIndex = data.mapExtFunc(func1, + centroidFn, + Array(oldCentroids, k, d), outputArraySizes = Array(d) + ).cacheGpu(true) + + val interValues = centroidIndex + .mapExtFunc(func2, interFn, Array(k, d), + outputArraySizes = Array(k, k*d, k*d)).cacheGpu(true) + + val result = interValues + .reduceExtFunc(func3, sumFn, Array(k, d), + outputArraySizes = Array(k, k*d, k*d)) + + centroidIndex.unCacheGpu + interValues.unCacheGpu + + val newMeans = getCenters(k, d, result.s0, result.s1) + + val maxDelta = oldMeans.zip(newMeans) + .map(squaredDistance) + .max + + cost = getCost(k, d, result.s0, result.s1, result.s2) + + changed = maxDelta > epsilon + oldMeans = newMeans + //println(s"Cost @ iteration ${iteration} is ${cost}") + iteration += 1 + } + }) + data.unCacheGpu + println("Finished in " + iteration + " iterations") + (oldMeans, cost) + } + + + def train(means: Array[DataPointKMeans], pointItr: Iterator[DataPointKMeans]): + Iterator[Tuple3[Array[Int], Array[Double], Array[Double]]] = { + val d = means(0).features.size + val k = means.length + + val s0 = new Array[Int](k) + val s1 = new Array[Double](d * k) + val s2 = new Array[Double](d * k) + + pointItr.foreach(point => { + var bestCluster = 0 + var bestDistance = 0.0 + + for (c <- 0 until k) { + var dist = squaredDistance(point.features, means(c).features) + + if (c == 0 || bestDistance > dist) { + bestCluster = c + bestDistance = dist + } + } + + s0(bestCluster) += 1 + + for (dim <- 0 until d) { + var coord = point.features(dim) + + s1(bestCluster * d + dim) += coord + s2(bestCluster * d + dim) += coord * coord + } + + }) + + var flag = true + + new Iterator[Tuple3[Array[Int], Array[Double], Array[Double]]]{ + override def hasNext: Boolean = flag + + override def next: Tuple3[Array[Int], Array[Double], Array[Double]] = { + flag = false + (s0, s1, s2) + } + } + } + + def getCenters(k: Int, d: Int, s0: Array[Int], + s1: Array[Double]): Array[DataPointKMeans] = { + Array.tabulate(k)(i => DataPointKMeans(Array.tabulate(d)(j => s1(i * d + j) / s0(i).max(1)))) + } + + def getCost(k: Int, d: Int, s0: Array[Int], s1: Array[Double], s2: Array[Double]) = { + var cost: Double = 0 + + for (i <- 0 until d * k) { + val mean = i / d + val center = s1(i) / s0(mean).max(1) + + // TODO simplify this + cost += center * (center * s0(mean) - 2 * s1(i)) + s2(i) + } + + cost + } + + def run(data: Dataset[DataPointKMeans], d: Int, k: Int, + maxIterations: Int): (Array[DataPointKMeans], Double) = { + + import data.sparkSession.implicits._ + val means: Array[DataPointKMeans] = data.rdd.takeSample(true, k, 42) + + val epsilon = 0.5 + var changed = true + var iteration = 0 + + var cost = 0.0 + var oldMeans = means + + timeit("CPU :: ", { + // while (changed && iteration < maxIterations) { + while (iteration < maxIterations) { + + // this gets distributed + val result = data.mapPartitions(pointItr => train(oldMeans, pointItr)).reduce((x,y) => + (addArr(x._1, y._1), addArr(x._2, y._2), addArr(x._3, y._3))) + + val newMeans: Array[DataPointKMeans] = getCenters(k, d, result._1, result._2) + + val maxDelta = oldMeans.zip(newMeans) + .map(squaredDistance) + .max + + cost = getCost(k, d, result._1, result._2, result._3) + + changed = maxDelta > epsilon + oldMeans = newMeans + // println(s"Cost @ iteration ${iteration} is ${cost}") + iteration += 1 + } + }) + + println("Finished in " + iteration + " iterations") + (oldMeans, cost) + } + + def generateData(seed: Long, N: Long, D: Int, R: Double): DataPointKMeans = { + val r = new Random(seed) + def generatePoint(i: Long): DataPointKMeans = { + val x = Array.fill(D){r.nextGaussian + i * R} + DataPointKMeans(x) + } + generatePoint(seed) + } + + private def getDataSet(spark: SparkSession, N: Long, d: Int, slices: Int): Dataset[DataPointKMeans] = { + import spark.implicits._ + val R = 0.7 // Scaling factor + val pointsCached = spark.range(1, N+1, 1, slices).map(i => generateData(i, N, d, R)).cache + pointsCached.count() + pointsCached + } + + private def printCenters(heading: String, centers: Array[DataPointKMeans]) = { + println + println(heading) + centers.foreach(point => println(" " + point.features.mkString(" ") +";")) + } + + def squaredDistance(a: Seq[Double], b: Seq[Double]): Double = { + require(a.length == b.length, "equal lengths") + + a.zip(b) + .map { p => val diff = p._1 - p._2; diff * diff } + .sum + } + + def addArr(lhs: Array[Double], rhs: Array[Double]) = { + require(lhs.length == rhs.length, "equal lengths") + + lhs.zip(rhs).map { case (x, y) => x + y } + } + + def addArr(lhs: Array[Int], rhs: Array[Int]) = { + require(lhs.length == rhs.length, "equal lengths") + + lhs.zip(rhs).map { case (x, y) => x + y } + } + + def squaredDistance(v1: DataPointKMeans, v2: DataPointKMeans): Double = { + squaredDistance(v1.features, v2.features) + } + def squaredDistance(p: (DataPointKMeans, DataPointKMeans)): Double = { + squaredDistance(p._1, p._2) + } +} + diff --git a/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansBatch.scala b/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansBatch.scala new file mode 100644 index 0000000..914fd7a --- /dev/null +++ b/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansBatch.scala @@ -0,0 +1,363 @@ +package com.ibm.gpuenabler + +import java.util.Random +import org.apache.log4j.{Level, Logger} +import org.apache.spark.sql.{Dataset, SparkSession} +import com.ibm.gpuenabler.CUDADSImplicits._ +import scala.collection.mutable + +object GpuKMeansBatch { + case class DataPointKMeans(features: Array[Double]) + case class ClusterIndexes(features: Array[Double], index: Int) + case class Results(s0: Array[Int], s1: Array[Double], s2: Array[Double]) + + def timeit(msg: String, code: => Any): Any ={ + val now1 = System.nanoTime + code + val ms1 = (System.nanoTime - now1) / 1000000 + println("%s Elapsed time: %d ms".format(msg, ms1)) + } + + def maxPoints(d: Int, k: Int, part: Int, nGpu: Int): Long = { + val perCluster: Long = 4*450 + (8*450 + 1) *d * 2 // s0 + s1 + s2 + val clusterBytes: Long = perCluster * k * 2 * part // we allocate twice + + val perPoint = 8 * d + 2 * 4 + val available: Long = (10L * 1024 * 1024 * 1024 * nGpu) - clusterBytes // 10GB + val maxPoints = available / perPoint + if (maxPoints <= 0) throw new IllegalArgumentException(s"too big: k * dimensions for the partition count: ${k} * ${d} * ${part} ") + maxPoints + } + + def main(args: Array[String]):Unit = { + + val masterURL = if (args.length > 0) args(0) else "local[*]" + val k: Int = if (args.length > 1) args(1).toInt else 32 + val N: Long = if (args.length > 2) args(2).toLong else 100000 + val d: Int= if (args.length > 3) args(3).toInt else 32 + val numSlices = if (args.length > 4) args(4).toInt else 1 + val iters: Int = if (args.length > 5) args(5).toInt else 5 + val nGpu: Int = if (args.length > 6) args(6).toInt else 1 + + if (N > maxPoints(d, k, numSlices, nGpu)) { + println(s"N(${N}) is too high for the given d(${d}) & k(${k}) ; MAX ${maxPoints(d, k, numSlices,nGpu) }") + return + } + + println(s"KMeans (${k}) Algorithm on N: ${N} datasets; Dimension: ${d}; slices: ${numSlices} for ${iters} iterations") + val spark = SparkSession.builder().master(masterURL).appName("SparkDSKMeans").getOrCreate() + spark.sparkContext.setCheckpointDir("/tmp") + + import spark.implicits._ + + Logger.getRootLogger().setLevel(Level.ERROR) + + val data: Dataset[DataPointKMeans] = getDataSet(spark, N, d, numSlices) + println("Data Generation Done") + + println(" ======= GPU ===========") + + val (centers, cost) = runGpu(data, d, k, iters) + + // printCenters("Cluster centers:", centers) + println(s"Cost: ${cost}") + + println(" ======= CPU ===========") + + val (ccenters, ccost) = run(data, d, k, iters) + + // printCenters("Cluster centers:", ccenters) + println(s"Cost: ${ccost}") + } + + def runGpu(data: Dataset[DataPointKMeans], d: Int, k: Int, + maxIterations: Int): (Array[DataPointKMeans], Double) = { + + import data.sparkSession.implicits._ + + val epsilon = 0.5 + var changed = true + var iteration = 0 + var cost = 0.0 + val ptxURL = "/SparkGPUKmeans.ptx" + + val centroidFn = DSCUDAFunction( + "getClusterCentroids", + Array("features"), + Array("index"), + ptxURL) + + var limit = 1024 + var modD = Math.min(d, limit) + var modK = Math.min(k, limit) + if (modD * modK > limit) { + if (modD <= modK) + modD = Math.max(limit/modK, 1) + else + modK = Math.max(limit/modD, 1) + } + + println(s"Dimension are modD x modK : ${modD} x ${modK}") + + val dimensions1 = (size: Long, stage: Int) => stage match { + case 0 => (450, modD, 1, modK, 1, 1) + } + + val gpuParams1 = gpuParameters(dimensions1) + + val interFn = DSCUDAFunction( + "calculateIntermediates", + Array("features", "index"), + Array("s0", "s1", "s2"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams1), outputSize=Some(450)) + + + val dimensions2 = (size: Long, stage: Int) => stage match { + case 0 => (1, modD, 1, modK, 1, 1) + } + + val gpuParams2 = gpuParameters(dimensions2) + + val sumFn = DSCUDAFunction( + "calculateFinal", + Array("s0", "s1", "s2"), + Array("s0", "s1", "s2"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams2), outputSize=Some(1)) + + + def func1(p: DataPointKMeans): ClusterIndexes = { + ClusterIndexes(p.features, 0) + } + + def func2(c: ClusterIndexes): Results = { + Results(Array.empty, Array.empty, Array.empty) + } + + def func3(r1: Results, r2: Results): Results = { + Results(addArr(r1.s0, r2.s0), + addArr(r1.s1, r2.s1),addArr(r1.s2, r2.s2)) + } + + val means: Array[DataPointKMeans] = data.rdd.takeSample(true, k, 42) + + val dataSplits = data.randomSplit(Array(0.5,0.5), 42) + + // dataSplits.foreach(ds=> { timeit("Data loaded in GPU", { ds.cacheGpu(true); ds.loadGpu(); }) } ) + + var oldMeans = means + var batch = 1 + + timeit("GPU :: ", { + while (iteration < maxIterations) { + var s0 = new Array[Int](k) + var s1 = new Array[Double](d * k) + var s2 = new Array[Double](d * k) + + val oldCentroids = oldMeans.flatMap(p => p.features) + + dataSplits.foreach(datasplit => { + println(s"Executing batch ${batch} for iteration $iteration ") + batch += 1 + datasplit.cacheGpu(true) + // timeit("Data loaded in GPU", { datasplit.loadGpu }) + + // this gets distributed + val centroidIndex = datasplit.mapExtFunc(func1, + centroidFn, + Array(oldCentroids, k, d), outputArraySizes = Array(d) + ).cacheGpu(true) + + val interValues = centroidIndex + .mapExtFunc(func2, interFn, Array(k, d), + outputArraySizes = Array(k, k*d, k*d)).cacheGpu(true) + + val result = interValues + .reduceExtFunc(func3, sumFn, Array(k, d), + outputArraySizes = Array(k, k*d, k*d)) + + centroidIndex.unCacheGpu + interValues.unCacheGpu + datasplit.unCacheGpu + + s0 = addArr(s0, result.s0) + s1 = addArr(s1, result.s1) + s2 = addArr(s2, result.s2) + }) + + val newMeans = getCenters(k, d, s0, s1) + + val maxDelta = oldMeans.zip(newMeans) + .map(squaredDistance) + .max + + cost = getCost(k, d, s0, s1, s2) + + changed = maxDelta > epsilon + oldMeans = newMeans + // println(s"Cost @ iteration ${iteration} is ${cost}") + iteration += 1 + } + }) + println("Finished in " + iteration + " iterations") + (oldMeans, cost) + } + + + def train(means: Array[DataPointKMeans], pointItr: Iterator[DataPointKMeans]): + Iterator[Tuple3[Array[Int], Array[Double], Array[Double]]] = { + val d = means(0).features.size + val k = means.length + + val s0 = new Array[Int](k) + val s1 = new Array[Double](d * k) + val s2 = new Array[Double](d * k) + + pointItr.foreach(point => { + var bestCluster = 0 + var bestDistance = 0.0 + + for (c <- 0 until k) { + var dist = squaredDistance(point.features, means(c).features) + + if (c == 0 || bestDistance > dist) { + bestCluster = c + bestDistance = dist + } + } + + s0(bestCluster) += 1 + + for (dim <- 0 until d) { + var coord = point.features(dim) + + s1(bestCluster * d + dim) += coord + s2(bestCluster * d + dim) += coord * coord + } + + }) + + var flag = true + + new Iterator[Tuple3[Array[Int], Array[Double], Array[Double]]]{ + override def hasNext: Boolean = flag + + override def next: Tuple3[Array[Int], Array[Double], Array[Double]] = { + flag = false + (s0, s1, s2) + } + } + } + + def getCenters(k: Int, d: Int, s0: Array[Int], + s1: Array[Double]): Array[DataPointKMeans] = { + Array.tabulate(k)(i => DataPointKMeans(Array.tabulate(d)(j => s1(i * d + j) / s0(i).max(1)))) + } + + def getCost(k: Int, d: Int, s0: Array[Int], s1: Array[Double], s2: Array[Double]) = { + var cost: Double = 0 + + for (i <- 0 until d * k) { + val mean = i / d + val center = s1(i) / s0(mean).max(1) + + // TODO simplify this + cost += center * (center * s0(mean) - 2 * s1(i)) + s2(i) + } + + cost + } + + def run(data: Dataset[DataPointKMeans], d: Int, k: Int, + maxIterations: Int): (Array[DataPointKMeans], Double) = { + + import data.sparkSession.implicits._ + val means: Array[DataPointKMeans] = data.rdd.takeSample(true, k, 42) + + val epsilon = 0.5 + var changed = true + var iteration = 0 + + var cost = 0.0 + var oldMeans = means + + timeit("CPU :: ", { + // while (changed && iteration < maxIterations) { + while (iteration < maxIterations) { + + // this gets distributed + val result = data.mapPartitions(pointItr => train(oldMeans, pointItr)).reduce((x,y) => + (addArr(x._1, y._1), addArr(x._2, y._2), addArr(x._3, y._3))) + + val newMeans: Array[DataPointKMeans] = getCenters(k, d, result._1, result._2) + + val maxDelta = oldMeans.zip(newMeans) + .map(squaredDistance) + .max + + cost = getCost(k, d, result._1, result._2, result._3) + + changed = maxDelta > epsilon + oldMeans = newMeans + // println(s"Cost @ iteration ${iteration} is ${cost}") + iteration += 1 + } + }) + + println("Finished in " + iteration + " iterations") + (oldMeans, cost) + } + + def generateData(seed: Long, N: Long, D: Int, R: Double): DataPointKMeans = { + val r = new Random(seed) + def generatePoint(i: Long): DataPointKMeans = { + val x = Array.fill(D){r.nextGaussian + i * R} + DataPointKMeans(x) + } + generatePoint(seed) + } + + private def getDataSet(spark: SparkSession, N: Long, d: Int, slices: Int): Dataset[DataPointKMeans] = { + import spark.implicits._ + val R = 0.7 // Scaling factor + val pointsCached = spark.range(1, N+1, 1, slices).map(i => generateData(i, N, d, R)).cache + pointsCached.count() + pointsCached + } + + private def printCenters(heading: String, centers: Array[DataPointKMeans]) = { + println + println(heading) + centers.foreach(point => println(" " + point.features.mkString(" ") +";")) + } + + def squaredDistance(a: Seq[Double], b: Seq[Double]): Double = { + require(a.length == b.length, "equal lengths") + + a.zip(b) + .map { p => val diff = p._1 - p._2; diff * diff } + .sum + } + + def addArr(lhs: Array[Double], rhs: Array[Double]) = { + require(lhs.length == rhs.length, "equal lengths") + + lhs.zip(rhs).map { case (x, y) => x + y } + } + + def addArr(lhs: Array[Int], rhs: Array[Int]) = { + require(lhs.length == rhs.length, "equal lengths") + + lhs.zip(rhs).map { case (x, y) => x + y } + } + + def squaredDistance(v1: DataPointKMeans, v2: DataPointKMeans): Double = { + squaredDistance(v1.features, v2.features) + } + def squaredDistance(p: (DataPointKMeans, DataPointKMeans)): Double = { + squaredDistance(p._1, p._2) + } +} diff --git a/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansBatchSmall.scala b/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansBatchSmall.scala new file mode 100644 index 0000000..57ca60d --- /dev/null +++ b/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansBatchSmall.scala @@ -0,0 +1,355 @@ +package com.ibm.gpuenabler + +import java.util.Random +import org.apache.log4j.{Level, Logger} +import org.apache.spark.sql.{Dataset, SparkSession} +import com.ibm.gpuenabler.CUDADSImplicits._ +import scala.collection.mutable + +object GpuKMeansBatchSmall { + case class DataPointKMeans(features: Array[Double]) + case class ClusterIndexes(features: Array[Double], index: Int) + case class Results(s0: Array[Int], s1: Array[Double], s2: Array[Double]) + + def timeit(msg: String, code: => Any): Any ={ + val now1 = System.nanoTime + code + val ms1 = (System.nanoTime - now1) / 1000000 + println("%s Elapsed time: %d ms".format(msg, ms1)) + } + + def maxPoints(d: Int, k: Int, part: Int, nGpu: Int): Long = { + val perCluster: Long = 4*450 + (8*450 + 1) *d * 2 // s0 + s1 + s2 + val clusterBytes: Long = perCluster * k * 2 * part // we allocate twice + + val perPoint = 8 * d + 2 * 4 + val available: Long = (10L * 1024 * 1024 * 1024 * nGpu) - clusterBytes // 10GB + val maxPoints = available / perPoint + if (maxPoints <= 0) throw new IllegalArgumentException(s"too big: k * dimensions for the partition count: ${k} * ${d} * ${part} ") + maxPoints + } + + def main(args: Array[String]):Unit = { + + val masterURL = if (args.length > 0) args(0) else "local[*]" + val k: Int = if (args.length > 1) args(1).toInt else 32 + val N: Long = if (args.length > 2) args(2).toLong else 100000 + val d: Int= if (args.length > 3) args(3).toInt else 32 + val numSlices = if (args.length > 4) args(4).toInt else 1 + val iters: Int = if (args.length > 5) args(5).toInt else 5 + val nGpu: Int = if (args.length > 6) args(6).toInt else 1 + + if (N > maxPoints(d, k, numSlices, nGpu)) { + println(s"N(${N}) is too high for the given d(${d}) & k(${k}) ; MAX ${maxPoints(d, k, numSlices,nGpu) }") + return + } + + println(s"KMeans (${k}) Algorithm on N: ${N} datasets; Dimension: ${d}; slices: ${numSlices} for ${iters} iterations") + val spark = SparkSession.builder().master(masterURL).appName("SparkDSKMeans").getOrCreate() + import spark.implicits._ + + Logger.getRootLogger().setLevel(Level.ERROR) + + val data: Dataset[DataPointKMeans] = getDataSet(spark, N, d, numSlices) + println("Data Generation Done") + + println(" ======= GPU ===========") + + val (centers, cost) = runGpu(data, d, k, iters) + // printCenters("Cluster centers:", centers) + println(s"Cost: ${cost}") + + println(" ======= CPU ===========") + + val (ccenters, ccost) = run(data, d, k, iters) + + // printCenters("Cluster centers:", ccenters) + println(s"Cost: ${ccost}") + } + + def runGpu(data: Dataset[DataPointKMeans], d: Int, k: Int, + maxIterations: Int): (Array[DataPointKMeans], Double) = { + + import data.sparkSession.implicits._ + + val epsilon = 0.5 + var changed = true + var iteration = 0 + var cost = 0.0 + val ptxURL = "/SparkGPUKmeans.ptx" + + val centroidFn = DSCUDAFunction( + "getClusterCentroids", + Array("features"), + Array("index"), + ptxURL) + + var limit = 1024 + var modD = Math.min(d, limit) + var modK = Math.min(k, limit) + if (modD * modK > limit) { + if (modD <= modK) + modD = Math.max(limit/modK, 1) + else + modK = Math.max(limit/modD, 1) + } + + println(s"Dimension are modD x modK : ${modD} x ${modK}") + + val dimensions1 = (size: Long, stage: Int) => stage match { + case 0 => (450, modD, 1, modK, 1, 1) + } + + val gpuParams1 = gpuParameters(dimensions1) + + val interFn = DSCUDAFunction( + "calculateIntermediates", + Array("features", "index"), + Array("s0", "s1", "s2"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams1), outputSize=Some(450)) + + + val dimensions2 = (size: Long, stage: Int) => stage match { + case 0 => (1, modD, 1, modK, 1, 1) + } + + val gpuParams2 = gpuParameters(dimensions2) + + val sumFn = DSCUDAFunction( + "calculateFinal", + Array("s0", "s1", "s2"), + Array("s0", "s1", "s2"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams2), outputSize=Some(1)) + + + def func1(p: DataPointKMeans): ClusterIndexes = { + ClusterIndexes(p.features, 0) + } + + def func2(c: ClusterIndexes): Results = { + Results(Array.empty, Array.empty, Array.empty) + } + + def func3(r1: Results, r2: Results): Results = { + Results(addArr(r1.s0, r2.s0), + addArr(r1.s1, r2.s1),addArr(r1.s2, r2.s2)) + } + + val means: Array[DataPointKMeans] = data.rdd.takeSample(true, k, 42) + + val dataSplits = data.randomSplit(Array(0.5,0.5), 42) + + var oldMeans = means + + timeit("GPU :: ", { + while (iteration < maxIterations) { + + val dataSplitDS = dataSplits.map(datasplit => { + + datasplit.cacheGpu(true) + + val oldCentroids = oldMeans.flatMap(p => p.features) + + // this gets distributed + val centroidIndex = datasplit.mapExtFunc(func1, + centroidFn, + Array(oldCentroids, k, d), outputArraySizes = Array(d) + ).cacheGpu(true) + + val interValues = centroidIndex + .mapExtFunc(func2, interFn, Array(k, d), + outputArraySizes = Array(k, k*d, k*d)).cacheGpu(true) + + val result = interValues + .reduceExtFunc(func3, sumFn, Array(k, d), + outputArraySizes = Array(k, k*d, k*d)) + + centroidIndex.unCacheGpu + interValues.unCacheGpu + datasplit.unCacheGpu + + result + }) + + val result = dataSplitDS.reduce((x,y) => + Results(addArr(x.s0, y.s0), addArr(x.s1, y.s1), addArr(x.s2, y.s2))) + + val newMeans = getCenters(k, d, result.s0, result.s1) + + val maxDelta = oldMeans.zip(newMeans) + .map(squaredDistance) + .max + + cost = getCost(k, d, result.s0, result.s1, result.s2) + + changed = maxDelta > epsilon + oldMeans = newMeans + //println(s"Cost @ iteration ${iteration} is ${cost}") + iteration += 1 + + } + }) + println("Finished in " + iteration + " iterations") + (oldMeans, cost) + + } + + + def train(means: Array[DataPointKMeans], pointItr: Iterator[DataPointKMeans]): + Iterator[Tuple3[Array[Int], Array[Double], Array[Double]]] = { + val d = means(0).features.size + val k = means.length + + val s0 = new Array[Int](k) + val s1 = new Array[Double](d * k) + val s2 = new Array[Double](d * k) + + pointItr.foreach(point => { + var bestCluster = 0 + var bestDistance = 0.0 + + for (c <- 0 until k) { + var dist = squaredDistance(point.features, means(c).features) + + if (c == 0 || bestDistance > dist) { + bestCluster = c + bestDistance = dist + } + } + + s0(bestCluster) += 1 + + for (dim <- 0 until d) { + var coord = point.features(dim) + + s1(bestCluster * d + dim) += coord + s2(bestCluster * d + dim) += coord * coord + } + + }) + + var flag = true + + new Iterator[Tuple3[Array[Int], Array[Double], Array[Double]]]{ + override def hasNext: Boolean = flag + + override def next: Tuple3[Array[Int], Array[Double], Array[Double]] = { + flag = false + (s0, s1, s2) + } + } + } + + def getCenters(k: Int, d: Int, s0: Array[Int], + s1: Array[Double]): Array[DataPointKMeans] = { + Array.tabulate(k)(i => DataPointKMeans(Array.tabulate(d)(j => s1(i * d + j) / s0(i).max(1)))) + } + + def getCost(k: Int, d: Int, s0: Array[Int], s1: Array[Double], s2: Array[Double]) = { + var cost: Double = 0 + + for (i <- 0 until d * k) { + val mean = i / d + val center = s1(i) / s0(mean).max(1) + + // TODO simplify this + cost += center * (center * s0(mean) - 2 * s1(i)) + s2(i) + } + + cost + } + + def run(data: Dataset[DataPointKMeans], d: Int, k: Int, + maxIterations: Int): (Array[DataPointKMeans], Double) = { + + import data.sparkSession.implicits._ + val means: Array[DataPointKMeans] = data.rdd.takeSample(true, k, 42) + + val epsilon = 0.5 + var changed = true + var iteration = 0 + + var cost = 0.0 + var oldMeans = means + + timeit("CPU :: ", { + // while (changed && iteration < maxIterations) { + while (iteration < maxIterations) { + + // this gets distributed + val result = data.mapPartitions(pointItr => train(oldMeans, pointItr)).reduce((x,y) => + (addArr(x._1, y._1), addArr(x._2, y._2), addArr(x._3, y._3))) + + val newMeans: Array[DataPointKMeans] = getCenters(k, d, result._1, result._2) + + val maxDelta = oldMeans.zip(newMeans) + .map(squaredDistance) + .max + + cost = getCost(k, d, result._1, result._2, result._3) + + changed = maxDelta > epsilon + oldMeans = newMeans + // println(s"Cost @ iteration ${iteration} is ${cost}") + iteration += 1 + } + }) + + println("Finished in " + iteration + " iterations") + (oldMeans, cost) + } + + def generateData(seed: Long, N: Long, D: Int, R: Double): DataPointKMeans = { + val r = new Random(seed) + def generatePoint(i: Long): DataPointKMeans = { + val x = Array.fill(D){r.nextGaussian + i * R} + DataPointKMeans(x) + } + generatePoint(seed) + } + + private def getDataSet(spark: SparkSession, N: Long, d: Int, slices: Int): Dataset[DataPointKMeans] = { + import spark.implicits._ + val R = 0.7 // Scaling factor + val pointsCached = spark.range(1, N+1, 1, slices).map(i => generateData(i, N, d, R)).cache + pointsCached.count() + pointsCached + } + + private def printCenters(heading: String, centers: Array[DataPointKMeans]) = { + println + println(heading) + centers.foreach(point => println(" " + point.features.mkString(" ") +";")) + } + + def squaredDistance(a: Seq[Double], b: Seq[Double]): Double = { + require(a.length == b.length, "equal lengths") + + a.zip(b) + .map { p => val diff = p._1 - p._2; diff * diff } + .sum + } + + def addArr(lhs: Array[Double], rhs: Array[Double]) = { + require(lhs.length == rhs.length, "equal lengths") + + lhs.zip(rhs).map { case (x, y) => x + y } + } + + def addArr(lhs: Array[Int], rhs: Array[Int]) = { + require(lhs.length == rhs.length, "equal lengths") + + lhs.zip(rhs).map { case (x, y) => x + y } + } + + def squaredDistance(v1: DataPointKMeans, v2: DataPointKMeans): Double = { + squaredDistance(v1.features, v2.features) + } + def squaredDistance(p: (DataPointKMeans, DataPointKMeans)): Double = { + squaredDistance(p._1, p._2) + } +} diff --git a/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansFile.scala b/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansFile.scala new file mode 100644 index 0000000..b85bfa7 --- /dev/null +++ b/examples/src/main/scala/com/ibm/gpuenabler/GpuKMeansFile.scala @@ -0,0 +1,338 @@ + +package com.ibm.gpuenabler + +import scala.language.implicitConversions +import org.apache.log4j.{Level, Logger} +import org.apache.spark.sql.{Dataset, SparkSession} +import com.ibm.gpuenabler.CUDADSImplicits._ +import scala.collection.mutable + +object GpuKMeansFile { + case class DataPointKMeans(features: Array[Double]) + case class ClusterIndexes(features: Array[Double], index: Int) + case class Results(s0: Array[Int], s1: Array[Double], s2: Array[Double]) + + def timeit(msg: String, code: => Any): Any ={ + val now1 = System.nanoTime + code + val ms1 = (System.nanoTime - now1) / 1000000 + println("%s Elapsed time: %d ms".format(msg, ms1)) + } + + def main(args: Array[String]) = { + + val masterURL = if (args.length > 0) args(0) else "local[*]" + val k: Int = if (args.length > 1) args(1).toInt else 20 + val inputPath = if (args.length > 2) args(2) else "src/main/resources/kmeans-samples.txt" + val iters: Int = if (args.length > 3) args(3).toInt else 50 + + val spark = SparkSession.builder().master(masterURL).appName("SparkDSKMeans").getOrCreate() + import spark.implicits._ + + Logger.getRootLogger().setLevel(Level.ERROR) + + val data: Dataset[DataPointKMeans] = getDataSet(spark, inputPath).cache + val N = data.count() + val d = data.head.features.length + + println(" ======= GPU ===========") + + val (centers, cost) = runGpu(data, d, k, iters, true) + printCenters("Cluster centers:", centers) + println(s"Cost: ${cost}") + + println(" ======= CPU ===========") + + val (ccenters, ccost) = run(data, d, k, iters) + + printCenters("Cluster centers:", ccenters) + println(s"Cost: ${ccost}") + } + + def runGpu(data: Dataset[DataPointKMeans], d: Int, k: Int, + maxIterations: Int, dump: Boolean): (Array[DataPointKMeans], Double) = { + + import data.sparkSession.implicits._ + + val epsilon = 0.5 + var changed = true + var iteration = 0 + var cost = 0.0 + val ptxURL = "/SparkGPUKmeans.ptx" + + val centroidFn = DSCUDAFunction( + "getClusterCentroids", + Array("features"), + Array("index"), + ptxURL) + + val dimensions1 = (size: Long, stage: Int) => stage match { + case 0 => (450, d, 1, k, 1, 1) + } + + val gpuParams1 = gpuParameters(dimensions1) + + val interFn = DSCUDAFunction( + "calculateIntermediates", + Array("features", "index"), + Array("s0", "s1", "s2"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams1), outputSize=Some(450)) + + val dimensions2 = (size: Long, stage: Int) => stage match { + case 0 => (1, d, 1, k, 1, 1) + } + + val gpuParams2 = gpuParameters(dimensions2) + + val sumFn = DSCUDAFunction( + "calculateFinal", + Array("s0", "s1", "s2"), + Array("s0", "s1", "s2"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams2), outputSize=Some(1)) + + + def func1(p: DataPointKMeans): ClusterIndexes = { + ClusterIndexes(p.features, 0) + } + + def func2(c: ClusterIndexes): Results = { + Results(Array.empty, Array.empty, Array.empty) + } + + def func3(r1: Results, r2: Results): Results = { + Results(addArr(r1.s0, r2.s0), + addArr(r1.s1, r2.s1),addArr(r1.s2, r2.s2)) + } + + val means: Array[DataPointKMeans] = data.rdd.takeSample(true, k, 42) + + data.cacheGpu(true) + data.loadGpu() + + var oldMeans = means + + timeit("GPU :: ", { + while (changed && iteration < maxIterations) { + val oldCentroids = oldMeans.flatMap(p => p.features) + + // this gets distributed + val centroidIndex = data.mapExtFunc(func1, + centroidFn, + Array(oldCentroids, k, d), outputArraySizes = Array(d) + ).cacheGpu(true) + + val interValues = centroidIndex + .mapExtFunc(func2, interFn, Array(k, d), + outputArraySizes = Array(k, k*d, k*d)).cacheGpu(true) + + val result = interValues + .reduceExtFunc(func3, sumFn, Array(k, d), + outputArraySizes = Array(k, k*d, k*d)) + + centroidIndex.unCacheGpu + interValues.unCacheGpu + + val newMeans = getCenters(k, d, result.s0, result.s1) + + val maxDelta = oldMeans.zip(newMeans) + .map(squaredDistance) + .max + + cost = getCost(k, d, result.s0, result.s1, result.s2) + + changed = maxDelta > epsilon + oldMeans = newMeans + //println(s"Cost @ iteration ${iteration} is ${cost}") + iteration += 1 + } + }) + + val oldCentroids = oldMeans.flatMap(p => p.features) + val centroidIndex = data.mapExtFunc(func1, + centroidFn, + Array(oldCentroids, k, d), outputArraySizes = Array(d) + ) + + if (dump) { + import java.io._ + val pw = new PrintWriter(new File("pointsWithIndex" )) + + centroidIndex.collect().foreach(c => { + c.features.foreach(x=> pw.write(x.toString + " ")) + pw.write(c.index + "\n") + }) + pw.close + } + + data.unCacheGpu + + println("Finished in " + iteration + " iterations") + (oldMeans, cost) + } + + + def train(means: Array[DataPointKMeans], pointItr: Iterator[DataPointKMeans]): + Iterator[Tuple3[Array[Int], Array[Double], Array[Double]]] = { + val d = means(0).features.size + val k = means.length + + val s0 = new Array[Int](k) + val s1 = new Array[Double](d * k) + val s2 = new Array[Double](d * k) + + pointItr.foreach(point => { + var bestCluster = 0 + var bestDistance = 0.0 + + for (c <- 0 until k) { + var dist = squaredDistance(point.features, means(c).features) + + if (c == 0 || bestDistance > dist) { + bestCluster = c + bestDistance = dist + } + } + + s0(bestCluster) += 1 + + for (dim <- 0 until d) { + var coord = point.features(dim) + + s1(bestCluster * d + dim) += coord + s2(bestCluster * d + dim) += coord * coord + } + + }) + + var flag = true + + new Iterator[Tuple3[Array[Int], Array[Double], Array[Double]]]{ + override def hasNext: Boolean = flag + + override def next: Tuple3[Array[Int], Array[Double], Array[Double]] = { + flag = false + (s0, s1, s2) + } + } + } + + def getCenters(k: Int, d: Int, s0: Array[Int], + s1: Array[Double]): Array[DataPointKMeans] = { + Array.tabulate(k)(i => DataPointKMeans(Array.tabulate(d)(j => s1(i * d + j) / s0(i).max(1)))) + } + + def getCost(k: Int, d: Int, s0: Array[Int], s1: Array[Double], s2: Array[Double]) = { + var cost: Double = 0 + + for (i <- 0 until d * k) { + val mean = i / d + val center = s1(i) / s0(mean).max(1) + + // TODO simplify this + cost += center * (center * s0(mean) - 2 * s1(i)) + s2(i) + } + + cost + } + + def run(data: Dataset[DataPointKMeans], d: Int, k: Int, + maxIterations: Int): (Array[DataPointKMeans], Double) = { + + import data.sparkSession.implicits._ + val means: Array[DataPointKMeans] = data.rdd.takeSample(true, k, 42) + + val epsilon = 0.5 + var changed = true + var iteration = 0 + + var cost = 0.0 + var oldMeans = means + + timeit("CPU :: ", { + while (changed && iteration < maxIterations) { + + // this gets distributed + val result = data.mapPartitions(pointItr => train(oldMeans, pointItr)).reduce((x,y) => + (addArr(x._1, y._1), addArr(x._2, y._2), addArr(x._3, y._3))) + + val newMeans: Array[DataPointKMeans] = getCenters(k, d, result._1, result._2) + + val maxDelta = oldMeans.zip(newMeans) + .map(squaredDistance) + .max + + cost = getCost(k, d, result._1, result._2, result._3) + + changed = maxDelta > epsilon + oldMeans = newMeans + // println(s"Cost @ iteration ${iteration} is ${cost}") + iteration += 1 + } + }) + + println("Finished in " + iteration + " iterations") + (oldMeans, cost) + } + + private def getDataSet(spark: SparkSession, path: String): Dataset[DataPointKMeans] = { + import spark.implicits._ + + val rawinputDF = spark.read + .option("header", "false") + .option("inferSchema", "true") + .csv(path) + + val pointsCached = rawinputDF.map(x=> { + + val rowElem = x.getString(0).split(" ") + val len = rowElem.length + val buffer = new mutable.ListBuffer[Double]() + (0 until len). foreach { idx => + if(!rowElem(idx).isEmpty) + buffer += rowElem(idx).toDouble + } + + DataPointKMeans(buffer.toArray) + }) + + pointsCached + } + + private def printCenters(heading: String, centers: Array[DataPointKMeans]) = { + println + println(heading) + centers.foreach(point => println(" " + point.features.mkString(" ") +";")) + } + + def squaredDistance(a: Seq[Double], b: Seq[Double]): Double = { + require(a.length == b.length, "equal lengths") + + a.zip(b) + .map { p => val diff = p._1 - p._2; diff * diff } + .sum + } + + def addArr(lhs: Array[Double], rhs: Array[Double]) = { + require(lhs.length == rhs.length, "equal lengths") + + lhs.zip(rhs).map { case (x, y) => x + y } + } + + def addArr(lhs: Array[Int], rhs: Array[Int]) = { + require(lhs.length == rhs.length, "equal lengths") + + lhs.zip(rhs).map { case (x, y) => x + y } + } + + def squaredDistance(v1: DataPointKMeans, v2: DataPointKMeans): Double = { + squaredDistance(v1.features, v2.features) + } + def squaredDistance(p: (DataPointKMeans, DataPointKMeans)): Double = { + squaredDistance(p._1, p._2) + } +} + diff --git a/examples/src/main/scala/com/ibm/gpuenabler/SparkDSLRmod.scala b/examples/src/main/scala/com/ibm/gpuenabler/SparkDSLRmod.scala new file mode 100644 index 0000000..3cc492d --- /dev/null +++ b/examples/src/main/scala/com/ibm/gpuenabler/SparkDSLRmod.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ibm.gpuenabler + +import java.util.Random +import com.ibm.gpuenabler.CUDADSImplicits._ +import org.apache.spark.sql.SparkSession +import scala.language.implicitConversions +import scala.math._ + +// scalastyle:off println +object SparkDSLRmod { + val rand = new Random(42) + + case class DataPoint(x: Array[Double], y: Double) + + def dmulvs(x: Array[Double], c: Double) : Array[Double] = + Array.tabulate(x.length)(i => x(i) * c) + def daddvv(x: Array[Double], y: Array[Double]) : Array[Double] = + Array.tabulate(x.length)(i => x(i) + y(i)) + def dsubvv(x: Array[Double], y: Array[Double]) : Array[Double] = + Array.tabulate(x.length)(i => x(i) - y(i)) + def ddotvv(x: Array[Double], y: Array[Double]) : Double = + (x zip y).foldLeft(0.0)((a, b) => a + (b._1 * b._2)) + + def generateData(seed: Long, N: Int, D: Int, R: Double): DataPoint = { + val r = new Random(seed) + def generatePoint(i: Long): DataPoint = { + val y = if (i % 2 == 0) 0 else 1 + val x = Array.fill(D){r.nextGaussian + y * R} + DataPoint(x, y) + } + generatePoint(seed) + } + + def showWarning() { + System.err.println( + """WARN: This is a naive implementation of Logistic Regression and is given as an example! + |Please use either org.apache.spark.mllib.classification.LogisticRegressionWithSGD or + |org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS + |for more conventional use. + """.stripMargin) + } + + def main(args: Array[String]) { + + showWarning() + + val masterURL = if (args.length > 0) args(0) else "local[*]" + val spark = SparkSession.builder().master(masterURL).appName("SparkDSLR").getOrCreate() + import spark.implicits._ + + val numSlices = if (args.length > 1) args(1).toInt else 1 + val N = if (args.length > 2) args(2).toInt else 10000 // Number of data points + val D = if (args.length > 3) args(3).toInt else 10 // Numer of dimensions + val ITERATIONS = if (args.length > 4) args(4).toInt else 5 + + val R = 0.7 // Scaling factor + val ptxURL = "/LogisticRegression.ptx" + + val mapFunction = spark.sparkContext.broadcast( + DSCUDAFunction( + "map", + Array("x", "y"), + Array("value"), + ptxURL)) + + val threads = 1024 + val blocks = min((N + threads- 1) / threads, 1024) + val dimensions = (size: Long, stage: Int) => stage match { + case 0 => (blocks, threads, 1, 1, 1, 1) + } + val gpuParams = gpuParameters(dimensions) + val reduceFunction = spark.sparkContext.broadcast( + DSCUDAFunction( + "reducegrad", + Array("value"), + Array("value"), + ptxURL, + Some((size: Long) => 1), + Some(gpuParams), outputSize=Some(1))) + + val pointsCached = spark.range(1, N+1, 1, numSlices).map(i => generateData(i, N, D, R)).cache() + val pointsColumnCached = pointsCached.cacheGpu(true) + + // load data points into GPU + pointsColumnCached.loadGpu() + println("Data Generation Done !!") + + // Initialize w to a random value + // var wCPU = Array.fill(D){2 * rand.nextDouble - 1} + var wCPU = Array.fill[Double](D)(0.0) + var wGPU = Array.tabulate(D)(i => wCPU(i)) + val wbc1 = spark.sparkContext.broadcast(wGPU) + + println("============ GPU =======================") + print("Initial Weights :: ") + wGPU.take(3).foreach(y => print(y + ", ")) + println(" ... ") + + val now = System.nanoTime + for (i <- 1 to ITERATIONS) { + val now1 = System.nanoTime + val wGPUbcast = spark.sparkContext.broadcast(wGPU) + + val mapDS = pointsColumnCached.mapExtFunc((p: DataPoint) => + dmulvs(p.x, (1 / (1 + exp(-p.y * (ddotvv(wGPU, p.x)))) - 1) * p.y), + mapFunction.value, + Array(wGPUbcast.value, D), + outputArraySizes = Array(D) + ).cacheGpu(true) + + val gradient = mapDS.reduceExtFunc((x: Array[Double], y: Array[Double]) => daddvv(x, y), + reduceFunction.value, + Array(D), + outputArraySizes = Array(D)) + + mapDS.unCacheGpu() + + wGPU = dsubvv(wGPU, gradient) + + val ms1 = (System.nanoTime - now1) / 1000000 + println(s"Iteration $i Done in $ms1 ms") + } + val ms = (System.nanoTime - now) / 1000000 + println("GPU Elapsed time: %d ms".format(ms)) + + pointsColumnCached.unCacheGpu() + + wGPU.take(5).foreach(y => print(y + ", ")) + print(" .... ") + wGPU.takeRight(5).foreach(y => print(y + ", ")) + println() + + println("============ CPU =======================") + val cnow = System.nanoTime + for (i <- 1 to ITERATIONS) { + val cnow1 = System.nanoTime + + val gradient = pointsCached.map { p => + dmulvs(p.x, (1 / (1 + exp(-1 * ddotvv(wCPU, p.x)))) - p.y) + }.reduce((x: Array[Double], y: Array[Double]) => daddvv(x, y)) + + + wCPU = dsubvv(wCPU, gradient) + val cms1 = (System.nanoTime - cnow1) / 1000000 + println(s"Iteration $i Done in $cms1 ms") + } + + val cms = (System.nanoTime - cnow) / 1000000 + println("CPU Elapsed time: %d ms".format(cms)) + + wCPU.take(5).foreach(y => print(y + ", ")) + print(" .... ") + wCPU.takeRight(5).foreach(y => print(y + ", ")) + println() + + } +} + diff --git a/examples/src/main/scala/com/ibm/gpuenabler/perfDebug.scala b/examples/src/main/scala/com/ibm/gpuenabler/perfDebug.scala index 63c649b..44cef80 100644 --- a/examples/src/main/scala/com/ibm/gpuenabler/perfDebug.scala +++ b/examples/src/main/scala/com/ibm/gpuenabler/perfDebug.scala @@ -90,35 +90,43 @@ object perfDebug { val data = rd.cacheGpu(true) // Load the data to GPU - data.loadGpu() + timeit("Data load in GPU", { data.loadGpu()}) timeit("DS: All cached", { val mapDS = data.mapExtFunc(2 * _, dsmapFunction).cacheGpu() - val output = mapDS.reduceExtFunc(_ + _, dsreduceFunction) + val mapDS1 = mapDS.mapExtFunc(2 * _, dsmapFunction).cacheGpu() + val mapDS2 = mapDS1.mapExtFunc(2 * _, dsmapFunction).cacheGpu() + val output = mapDS2.reduceExtFunc(_ + _, dsreduceFunction) mapDS.unCacheGpu() + mapDS1.unCacheGpu() + mapDS2.unCacheGpu() println("Output is " + output) }) + data.unCacheGpu() val data111 = rd.cacheGpu(true) // Load the data to GPU - data111.loadGpu() + timeit("Data load in GPU", {data111.loadGpu() }) timeit("DS: All cached GPUONLY", { val mapDS123 = data111.mapExtFunc(2 * _, dsmapFunction).cacheGpu(true) val output = mapDS123.reduceExtFunc(_ + _, dsreduceFunction) mapDS123.unCacheGpu() - println("Output is " + output) + println(s"Output is $output") }) + + data111.unCacheGpu() val data1 = rd // Load the data to GPU - data1.loadGpu() + timeit("Data load in GPU", {data1.loadGpu() }) timeit("DS: No Cache", { val mapDS1 = data1.mapExtFunc(2 * _, dsmapFunction) val output = mapDS1.reduceExtFunc(_ + _, dsreduceFunction) println("Output is " + output) }) + data1.unCacheGpu() timeit("DS: CPU", { val output = data.map(2 * _).reduce(_ + _) diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDADSUtils.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDADSUtils.scala index 4b152b5..6742c04 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDADSUtils.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDADSUtils.scala @@ -16,12 +16,11 @@ */ package com.ibm.gpuenabler -import jcuda.driver.CUdeviceptr import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics @@ -31,11 +30,10 @@ import scala.collection.mutable import org.apache.spark.sql.gpuenabler.CUDAUtils._ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import java.util.concurrent.ConcurrentHashMap -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerJobEnd} +import org.apache.spark.broadcast.Broadcast case class MAPGPUExec[T, U](cf: DSCUDAFunction, constArgs : Array[Any], - outputArraySizes: Array[Int], + outputArraySizes: Array[Int], partSizes: Broadcast[Map[Int, Int]], child: SparkPlan, inputEncoder: Encoder[T], outputEncoder: Encoder[U], outputObjAttr: Attribute, @@ -55,8 +53,9 @@ case class MAPGPUExec[T, U](cf: DSCUDAFunction, constArgs : Array[Any], protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") - val inexprEnc = inputEncoder.asInstanceOf[ExpressionEncoder[T]] val outexprEnc = outputEncoder.asInstanceOf[ExpressionEncoder[U]] + val outEnc = outexprEnc + .resolveAndBind(getAttributes(outputEncoder.schema)) val childRDD = child.execute() @@ -72,96 +71,91 @@ case class MAPGPUExec[T, U](cf: DSCUDAFunction, constArgs : Array[Any], cached |= (if(DScache.contains(logPlans(1))) 2 else 0) cached |= (if(GPUSparkEnv.get.gpuMemoryManager.cachedGPUOnlyDS.contains(logPlans(0))) 4 else 0) - // Generate the JCUDA program to be executed and obtain the iterator object - val jcudaIterator = JCUDACodeGen.generate(inputSchema, - outputSchema,cf,constArgs, outputArraySizes) val list = new mutable.ListBuffer[InternalRow] // Get hold of hashmap for this Plan to store the GPU pointers from output parameters // cached: 1 -> this logical plan is cached; 2 -> child logical plan is cached - val curPlanPtrs: java.util.Map[String, CUdeviceptr] = if ((cached & 1) > 0) { + val curPlanPtrs: java.util.Map[String, CachedGPUMeta] = if ((cached & 1) > 0) { + logDebug("current plan is cached") val partPtr = GPUSparkEnv.get.gpuMemoryManager.getCachedGPUPointersDS.getOrElse(logPlans(0), null) if (partPtr != null) { partPtr.getOrElseUpdate(partNum.toLong, { - new ConcurrentHashMap[String, CUdeviceptr].asScala + logDebug("no cached ptrs for current plan ") + new ConcurrentHashMap[String, CachedGPUMeta].asScala }).asJava } else { null } } else { - Map[String, CUdeviceptr]().asJava + Map[String, CachedGPUMeta]().asJava } - val childPlanPtrs: java.util.Map[String, CUdeviceptr] = if ((cached & 2) > 0) { + val childPlanPtrs: java.util.Map[String, CachedGPUMeta] = if ((cached & 2) > 0) { + logDebug("child plan is cached") val partPtr = GPUSparkEnv.get.gpuMemoryManager.getCachedGPUPointersDS.getOrElse(logPlans(1), null) if (partPtr != null) { partPtr.getOrElseUpdate(partNum.toLong, { - new ConcurrentHashMap[String, CUdeviceptr].asScala + logDebug("no cached ptr for child plan ") + new ConcurrentHashMap[String, CachedGPUMeta].asScala }).asJava } else { null } } else { - Map[String, CUdeviceptr]().asJava + Map[String, CachedGPUMeta]().asJava } - val imgpuPtrs: java.util.List[java.util.Map[String, CUdeviceptr]] = + val imgpuPtrs: java.util.List[java.util.Map[String, CachedGPUMeta]] = List(curPlanPtrs, childPlanPtrs).asJava - // Retrieve the partition size and cache it if the child logical plan is cached. - val dataSize = if (!((cached & 2) > 0) || childPlanPtrs.isEmpty) { - var count = 0 - iter.foreach(x => { - count += 1 - val value = x.get(0, inputSchema) - if (!value.isInstanceOf[UnsafeRow]) - list += inexprEnc.toRow(value.asInstanceOf[T]).copy() - else - list += value.asInstanceOf[InternalRow] - }) - // TODO: caching required only if the child logical plan is cached. - GPUSparkEnv.get.cachedDSPartSize.getOrElseUpdate((logPlans(1), partNum), count) - count - } else { - // This logical plan is expected to be in cached; else something is wrong - // and it will assert out; - GPUSparkEnv.get.cachedDSPartSize.getOrElse((logPlans(1), partNum), 0) - } + var skipExecution = false - assert(dataSize > 0) - // cache the partition size if this plan is cached in GPU - if ((cached & 1) > 0) { - GPUSparkEnv.get.cachedDSPartSize.put((logPlans(0), partNum), { - dataSize - }) + // handle special case of loadGPU; Since data is already in GPU, do nothing + if (cf.funcName == "") { + skipExecution = true } - // Compute the GPU Grid Dimensions based on the input data size - // For user provided Dimensions; retrieve it along with the - // respective stage information. - val (stages, userGridSizes, userBlockSizes, sharedMemory) = - JCUDACodeGen.getUserDimensions(cf, dataSize) + if (!skipExecution) { + // Generate the JCUDA program to be executed and obtain the iterator object + val jcudaIterator = JCUDACodeGen.generate(inputSchema, + outputSchema,cf,constArgs, outputArraySizes) + + val dataSize = partSizes.value.getOrElse(partNum, 1) + assert(dataSize > 0) - // Initialize the auto generated code's iterator - jcudaIterator.init(list.toIterator.asJava, constArgs, - dataSize, cached, imgpuPtrs, partNum, - userGridSizes, userBlockSizes, stages, sharedMemory) + // Compute the GPU Grid Dimensions based on the input data size + // For user provided Dimensions; retrieve it along with the + // respective stage information. + val (stages, userGridSizes, userBlockSizes, sharedMemory) = + JCUDACodeGen.getUserDimensions(cf, dataSize) - // Triggers execution - jcudaIterator.hasNext() + // Initialize the auto generated code's iterator + jcudaIterator.init[T](iter.asJava, constArgs, + dataSize, cached, imgpuPtrs, partNum, + userGridSizes, userBlockSizes, stages, sharedMemory, inputEncoder) - list.clear() + // Triggers execution + jcudaIterator.hasNext() - val outEnc = outexprEnc - .resolveAndBind(getAttributes(outputEncoder.schema)) + val outEnc = outexprEnc + .resolveAndBind(getAttributes(outputEncoder.schema)) - new Iterator[InternalRow] { - override def hasNext: Boolean = jcudaIterator.hasNext() + new Iterator[InternalRow] { + override def hasNext: Boolean = jcudaIterator.hasNext() - override def next: InternalRow = - InternalRow(outEnc - .fromRow(jcudaIterator.next())) + override def next: InternalRow = + InternalRow(outEnc + .fromRow(jcudaIterator.next())) + } + } else { + new Iterator[InternalRow] { + override def hasNext: Boolean = false + + override def next: InternalRow = + InternalRow(outEnc + .fromRow(null)) + } } } } @@ -173,10 +167,11 @@ object MAPGPU func: DSCUDAFunction, args : Array[Any], outputArraySizes: Array[Int], + partSizes: Broadcast[Map[Int, Int]], child: LogicalPlan) : LogicalPlan = { val deserialized = CatalystSerde.deserialize[T](child) val mapped = MAPGPU( - func, args, outputArraySizes, + func, args, outputArraySizes, partSizes, deserialized, implicitly[Encoder[T]], implicitly[Encoder[U]], @@ -190,9 +185,10 @@ object MAPGPU object LOADGPU { - def apply[T: Encoder](child: LogicalPlan) : LogicalPlan = { + def apply[T: Encoder](partSizes: Broadcast[Map[Int, Int]], child: LogicalPlan) : LogicalPlan = { val deserialized = CatalystSerde.deserialize[T](child) val mapped = LOADGPU( + partSizes, deserialized, implicitly[Encoder[T]], CatalystSerde.generateObjAttr[T] @@ -202,7 +198,7 @@ object LOADGPU } } -case class LOADGPU[T: Encoder](child: LogicalPlan, +case class LOADGPU[T: Encoder](partSizes: Broadcast[Map[Int, Int]], child: LogicalPlan, inputEncoder: Encoder[T], outputObjAttr: Attribute) extends ObjectConsumer with ObjectProducer { @@ -212,7 +208,7 @@ case class LOADGPU[T: Encoder](child: LogicalPlan, case class MAPGPU[T: Encoder, U : Encoder](func: DSCUDAFunction, args : Array[Any], - outputArraySizes: Array[Int], + outputArraySizes: Array[Int], partSizes: Broadcast[Map[Int, Int]], child: LogicalPlan, inputEncoder: Encoder[T], outputEncoder: Encoder[U], outputObjAttr: Attribute) @@ -223,7 +219,7 @@ case class MAPGPU[T: Encoder, U : Encoder](func: DSCUDAFunction, object GPUOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case MAPGPU(cf, args, outputArraySizes, child,inputEncoder, outputEncoder, + case MAPGPU(cf, args, outputArraySizes, partSizes, child,inputEncoder, outputEncoder, outputObjAttr) => // Store the logical plan UID and pass it to physical plan as // cached it done with logical plan UID. @@ -242,9 +238,9 @@ object GPUOperators extends Strategy { GPUSparkEnv.get.gpuMemoryManager.cacheGPUSlavesAuto(logPlans(1)) } - MAPGPUExec(cf, args, outputArraySizes, planLater(child), + MAPGPUExec(cf, args, outputArraySizes, partSizes, planLater(child), inputEncoder, outputEncoder, outputObjAttr, logPlans) :: Nil - case LOADGPU(child, inputEncoder, outputObjAttr) => + case LOADGPU(partSizes, child, inputEncoder, outputObjAttr) => val logPlans = new Array[String](2) val modChildPlan = child match { case DeserializeToObject(_, _, lp) => lp @@ -254,7 +250,7 @@ object GPUOperators extends Strategy { logPlans(1) = md5HashObj(modChildPlan) val cf = DSCUDAFunction("",null,null,"") - MAPGPUExec(cf, null, null, planLater(child), + MAPGPUExec(cf, null, null, partSizes, planLater(child), inputEncoder, inputEncoder, outputObjAttr, logPlans) :: Nil case _ => Nil } @@ -304,6 +300,30 @@ case class DSCUDAFunction( */ object CUDADSImplicits { implicit class CUDADSFuncs[T: Encoder](ds: _ds[T]) extends Serializable { + /** + * getPartSizes: Helper routine to get Partition Size & add broadcast it. + * Getting the partition size right before MAPGPU operation + * improves performance. + */ + def getPartSizes: Broadcast[Map[Int, Int]] = { + val execPlan = ds.queryExecution.executedPlan + val logPlan = ds.queryExecution.optimizedPlan match { + case SerializeFromObject(_, lp) => lp + case _ => ds.queryExecution.optimizedPlan + } + + val partSizes: Broadcast[Map[Int, Int]] = logPlan match { + case MAPGPU(_, _, _, partSize, _, _, _, _) => + partSize + case _ => + val partSize: Map[Int, Int] = execPlan.execute().mapPartitionsWithIndex { + (partNum, iter) => Iterator(Map(partNum -> iter.length)) + }.reduce(_ ++ _) + ds.sparkSession.sparkContext.broadcast(partSize) + } + partSizes + } + /** * Return a new Dataset by applying a function to all elements of this Dataset. * @@ -326,7 +346,7 @@ object CUDADSImplicits { outputArraySizes: Array[Int] = Array.empty): Dataset[U] = { DS[U](ds.sparkSession, - MAPGPU[T, U](cf, args, outputArraySizes, + MAPGPU[T, U](cf, args, outputArraySizes, getPartSizes, getLogicalPlan(ds))) } @@ -351,20 +371,18 @@ object CUDADSImplicits { outputArraySizes: Array[Int] = Array.empty): T = { val ds1 = DS[T](ds.sparkSession, - MAPGPU[T, T](cf, args, outputArraySizes, + MAPGPU[T, T](cf, args, outputArraySizes, getPartSizes, getLogicalPlan(ds))) ds1.reduce(func) } /** - * Cache the child plan and - * Trigger an action on this Dataset so that data is loaded into GPU. - * - * @return Return the result after performing a count operation - * on this Dataset + * Load & Cache the partitions of the Dataset in GPU. + * + * @return Returns the same Dataset after performing the operation */ - def loadGpu(): Long = { + def loadGpu(): Dataset[T] = { // Enable Caching on the current Dataset val logPlan = ds.queryExecution.optimizedPlan match { case SerializeFromObject(_, lp) => lp @@ -374,20 +392,21 @@ object CUDADSImplicits { // Create a new Dataset to load the data into GPU val ds1 = DS[T](ds.sparkSession, - LOADGPU[T](getLogicalPlan(ds))) + LOADGPU[T](getPartSizes, getLogicalPlan(ds))) // trigger an action ds1.count() + ds1 } /** - * This function is used to mark the respective Dataset's data to - * be cached in GPU for future computation rather than cleaning it - * up every time the DataSet is processed. - * - * By marking an DataSet to cache in GPU, huge performance gain can - * be achieved as data movement between CPU memory and GPU - * memory is considered costly. + * Mark the Dataset's partitions to be cached in GPU. + * Unmarked Dataset partitions will be cleaned up on every Job Completion. + * + * @param onlyGPU Boolean value to indicate partitions will be used only inside GPU + * so that copy from GPU to Host will be skipped during Job execution. + * Boost performance but to be used with caution on need basis. + * @return Returns the same Dataset after performing the operation */ def cacheGpu(onlyGPU: Boolean = false): Dataset[T] = { val logPlan = ds.queryExecution.optimizedPlan match { @@ -399,8 +418,9 @@ object CUDADSImplicits { } /** - * This function is used to clean up all the caches in GPU held - * by the respective DataSet on the various partitions. + * Unloads the Dataset's partitions that were cached earlier in GPU. + * + * @return Returns the same Dataset after performing the operation */ def unCacheGpu(): Dataset[T] = { val logPlan = ds.queryExecution.optimizedPlan match { diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala index 8e597b7..3790663 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/CUDAManager.scala @@ -17,21 +17,15 @@ package com.ibm.gpuenabler -import java.util.Date import java.net.URL import jcuda.Pointer import jcuda.driver.JCudaDriver._ -import jcuda.driver._ import jcuda.driver.{CUdeviceptr, CUmodule, JCudaDriver} import jcuda.runtime.JCuda import org.apache.commons.io.IOUtils import org.slf4j.{Logger, LoggerFactory} -import scala.collection.mutable -import java.text.SimpleDateFormat import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.{SparkEnv, SparkException} - private[gpuenabler] object CUDAManagerCachedModule { private val cachedModules = new ConcurrentHashMap[(String, Int), CUmodule].asScala @@ -65,7 +59,7 @@ private class CUDAManager { def getModule(fname : String) : CUmodule = { val ptxURL = getClass.getResource(fname) - val clm = cachedLoadModule(Left(ptxURL)); + val clm = cachedLoadModule(Left(ptxURL)) clm } diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUMemoryManager.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUMemoryManager.scala index 87e5c9a..39dbd2d 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUMemoryManager.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUMemoryManager.scala @@ -20,9 +20,9 @@ package com.ibm.gpuenabler import jcuda.driver.CUdeviceptr import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.gpuenabler.CUDAUtils._ -import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import java.util.concurrent.ConcurrentHashMap import org.apache.spark.sql.gpuenabler.CUDAUtils._Logging -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, SparkListenerJobStart} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -188,11 +188,13 @@ private[gpuenabler] class GPUMemoryManagerSlaveEndPoint(val rpcEnv: _RpcEnv, case CacheGPUDS(lp : String, flag: Boolean) => cacheGPU(lp, flag) context.reply (true) - case id : String => + case _ : String => context.reply (true) } } +case class CachedGPUMeta (ptr: CUdeviceptr, own: Boolean, colWidth: java.lang.Integer ) + private[gpuenabler] class GPUMemoryManager(val executorId : String, val rpcEnv : _RpcEnv, val driverEndpoint: _RpcEndpointRef, @@ -200,12 +202,12 @@ private[gpuenabler] class GPUMemoryManager(val executorId : String, val isLocal : Boolean) { val cachedGPUPointersDS = new ConcurrentHashMap[String, - collection.concurrent.Map[Long, collection.concurrent.Map[String, CUdeviceptr]]].asScala + collection.concurrent.Map[Long, collection.concurrent.Map[String, CachedGPUMeta]]].asScala val cachedGPUOnlyDS = new mutable.ListBuffer[String]() val cachedGPUDS = new mutable.ListBuffer[String]() def getCachedGPUPointersDS : collection.concurrent.Map[String, - collection.concurrent.Map[Long, collection.concurrent.Map[String, CUdeviceptr]]] = cachedGPUPointersDS + collection.concurrent.Map[Long, collection.concurrent.Map[String, CachedGPUMeta]]] = cachedGPUPointersDS def cacheGPU(lp : String, flag: Boolean =false): Unit = { if (flag) { @@ -221,7 +223,7 @@ private[gpuenabler] class GPUMemoryManager(val executorId : String, } } cachedGPUPointersDS.getOrElseUpdate(lp, { - new ConcurrentHashMap[Long, collection.concurrent.Map[String, CUdeviceptr]].asScala + new ConcurrentHashMap[Long, collection.concurrent.Map[String, CachedGPUMeta]].asScala }) } @@ -229,15 +231,14 @@ private[gpuenabler] class GPUMemoryManager(val executorId : String, cachedGPUDS -= lp cachedGPUPointersDS.get(lp) match { - case Some(partNumPtrs) => { + case Some(partNumPtrs) => val gpuPtrs = partNumPtrs.values gpuPtrs.foreach((partPtr) => { partPtr.foreach{ - case (_, ptr) => GPUSparkEnv.get.cudaManager.freeGPUMemory(ptr) + case (_, obj) => if (obj.own) { GPUSparkEnv.get.cudaManager.freeGPUMemory(obj.ptr) } } }) cachedGPUPointersDS -= lp - } case None => } } @@ -274,7 +275,6 @@ private[gpuenabler] class GPUMemoryManager(val executorId : String, cachedGPURDDs -= rddId for ((name, ptr) <- cachedGPUPointers) { if (name.startsWith("rdd_" + rddId)) { - import com.ibm.gpuenabler.GPUSparkEnv // TODO: Free GPU memory GPUSparkEnv.get.cudaManager.freeGPUMemory(ptr.devPtr) cachedGPUPointers.remove(name) diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala index ccaa5c8..c9f7788 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/GPUSparkEnv.scala @@ -55,9 +55,6 @@ private[gpuenabler] class GPUSparkEnv() { val isGPUCodeGenEnabled = isGPUEnabled && SparkEnv.get.conf.getBoolean("spark.gpu.codegen", false) - // Every executor maintains a cached list of partition size for a particular logical plan. - val cachedDSPartSize = new ConcurrentHashMap[(String, Int), Int].asScala - var gpuDevice = 0 def cudaManager:CUDAManager = { diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/JCUDACodeGen.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/JCUDACodeGen.scala index 4723a2e..82d71f8 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/JCUDACodeGen.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/JCUDACodeGen.scala @@ -19,8 +19,6 @@ package com.ibm.gpuenabler import java.io.{File, PrintWriter} import org.apache.spark.SparkEnv -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, CodeGenerator, CodegenContext} import org.apache.spark.sql.types._ @@ -29,7 +27,8 @@ import com.google.common.cache.{CacheBuilder, CacheLoader} import org.apache.spark.sql.gpuenabler.CUDAUtils._ /** - * Generates bytecode that evaluates a boolean [[Expression]] on a given input [[InternalRow]]. + * Generates bytecode that can load/unload data & modules into GPU and + * launch kernel in GPU using native CUDA libraries. */ object JCUDACodeGen extends _Logging { @@ -43,7 +42,7 @@ object JCUDACodeGen extends _Logging { dataType: DataType, inSchemaIdx : Int, outSchemaIdx : Int, - var length : Long, + var length : Int, outputSize: Long, ctx : CodegenContext) { @@ -92,9 +91,9 @@ object JCUDACodeGen extends _Logging { boxType = ctx.boxedType(d) javaType = ctx.javaType(d) if (outputSize != 0 && is(GPUOUTPUT)) { - size = s"$outputSize * Sizeof.${javaType.toUpperCase()} * ${hostVariableName}_numCols" + size = s"$outputSize * Sizeof.${javaType.toUpperCase()} * ${hostVariableName}_colWidth" } else { - size = s"numElements * Sizeof.${javaType.toUpperCase()} * ${hostVariableName}_numCols" + size = s"numElements * Sizeof.${javaType.toUpperCase()} * ${hostVariableName}_colWidth" } case _ => if (outputSize != 0 && is(GPUOUTPUT)) { @@ -114,14 +113,14 @@ object JCUDACodeGen extends _Logging { s""" |private ByteBuffer $hostVariableName; |private CUdeviceptr pinMemPtr_$colName; - |${if(isArray) s"private int ${hostVariableName}_numCols;" else ""} + |${if(isArray) s"private int ${hostVariableName}_colWidth;" else ""} """.stripMargin } else if (is(RDDOUTPUT)){ if (isArray) s""" |private $javaType $hostVariableName[][]; - |private int ${hostVariableName}_numCols; + |private int ${hostVariableName}_colWidth; """.stripMargin else s"private $javaType $hostVariableName[];\n" @@ -145,13 +144,17 @@ object JCUDACodeGen extends _Logging { |${ if (isArray) { if (is(GPUINPUT)) - s""" + s""" |if (!((cached & 2) > 0) || - | !inputCMap.containsKey(blockID+"gpuOutputDevice_${colName}") || ${is(RDDOUTPUT)}) - | ${hostVariableName}_numCols = r.getArray($inSchemaIdx).numElements(); + | !inputCMap.containsKey(blockID+"gpuOutputDevice_${colName}") || ${is(RDDOUTPUT)}) { + | if (r == null && inputCMap.containsKey(blockID+"gpuOutputDevice_${colName}")) + | ${hostVariableName}_colWidth = ((CachedGPUMeta)inputCMap.get(blockID+"gpuOutputDevice_${colName}")).colWidth(); + | else + | ${hostVariableName}_colWidth = r.getArray($inSchemaIdx).numElements(); + |} |""".stripMargin else - s"${hostVariableName}_numCols = $length;" + s"${hostVariableName}_colWidth = $length;" } else "" } @@ -167,8 +170,8 @@ object JCUDACodeGen extends _Logging { else if(is(RDDOUTPUT)) { if (isArray) s""" - |${hostVariableName}_numCols = r.getArray($inSchemaIdx).numElements(); - |$hostVariableName = new $javaType[numElements][${hostVariableName}_numCols]; + |${hostVariableName}_colWidth = r.getArray($inSchemaIdx).numElements(); + |$hostVariableName = new $javaType[numElements][${hostVariableName}_colWidth]; | """.stripMargin else @@ -188,8 +191,8 @@ object JCUDACodeGen extends _Logging { | $deviceVariableName = new CUdeviceptr(); | cuMemAlloc($deviceVariableName, $size); | ${if (is(GPUOUTPUT)) s"cuMemsetD32Async($deviceVariableName, 0, $size / 4, cuStream);" else ""} - |} else { - | $deviceVariableName = (CUdeviceptr)inputCMap.get(blockID+"gpuOutputDevice_${colName}"); + |} else { + | $deviceVariableName = (CUdeviceptr)((CachedGPUMeta)inputCMap.get(blockID+"gpuOutputDevice_${colName}")).ptr(); |} """.stripMargin else @@ -212,14 +215,14 @@ object JCUDACodeGen extends _Logging { s""" |if (!((cached & 2) > 0) || !inputCMap.containsKey(blockID+"gpuOutputDevice_${colName}")) { | $javaType tmp_${colName}[] = r.getArray($inSchemaIdx).to${boxType}Array(); - | for(int j = 0; j < gpuInputHost_${colName}_numCols; j ++) + | for(int j = 0; j < gpuInputHost_${colName}_colWidth; j ++) | ${hostVariableName}.put$boxType(tmp_${colName}[j]); |} """.stripMargin else s""" |if (!((cached & 2) > 0) || !inputCMap.containsKey(blockID+"gpuOutputDevice_${colName}")) { - | ${hostVariableName}.put$boxType(${ctx.getValue("r", dataType, inSchemaIdx.toString)}); + | ${hostVariableName}.put$boxType(${ctx.getValue("r", dataType, inSchemaIdx.toString)}); |} """.stripMargin } @@ -231,7 +234,7 @@ object JCUDACodeGen extends _Logging { s"$hostVariableName[i] = ${ctx.getValue("r", dataType, inSchemaIdx.toString)}.clone();\n" case ArrayType(d,_) => s""" | $javaType tmp_${colName}[] = r.getArray($inSchemaIdx).to${boxType}Array(); - | for(int j=0; j<${hostVariableName}_numCols;j++) + | for(int j=0; j<${hostVariableName}_colWidth;j++) | ${hostVariableName}[i][j] = tmp_${colName}[j]; """.stripMargin case _ => @@ -244,7 +247,7 @@ object JCUDACodeGen extends _Logging { // Retrieve the free variable passed from user program codeStmt += "readFromConstArray" -> { - if((is(CONST) && length != -1)) { + if(is(CONST) && length != -1) { s""" | for(int i = 0; i<$length; i++ ) { | $hostVariableName.put$boxType((($javaType[])refs[$colName])[i]); @@ -306,34 +309,58 @@ object JCUDACodeGen extends _Logging { // TODO : Evaluate for performance; if(is(GPUOUTPUT) || (is(GPUINPUT) && is(RDDOUTPUT))) s""" - | if (!((cached & 4) > 0)) - | cuMemcpyDtoHAsync(Pointer.to(${hostVariableName}), $deviceVariableName, $size, cuStream); \n + | if (!((cached & 4) > 0)) { + | cuMemcpyDtoHAsync(Pointer.to(${hostVariableName}), $deviceVariableName, $size, cuStream); \n } """.stripMargin else "" } // Device memory will be freed if not cached. + // GPUOUT : This LP should own this device memory; if cached , store it into my ptrs clean it later + // GPUIN : child LP should own this device memory; if child is cached, store it in child ptrs, + // if child is cached and clean when child is unCached. If this variable is + // part of this plan's output(RDDOUTPUT) store it into my ptrs but not clean + // it as part of my unCached routines. But own it if the child is not cached. codeStmt += "FreeDeviceMemory" -> { if(is(GPUINPUT) || is(GPUOUTPUT) || (is(CONST) && length != -1)) { if (is(GPUOUTPUT)) { - s"""| if (((cached & 1) > 0)) { - | outputCMap.putIfAbsent(blockID+ - | "${deviceVariableName.replace("_out_", "_in_")}", $deviceVariableName); - | } else { - | cuMemFree($deviceVariableName); - | } + s"""| + | if (((cached & 1) > 0)) { + | own = true; + | ${if (isArray) { + s"colWidth = ${hostVariableName}_colWidth;" + } else { + "colWidth = 1;" + }} + | outputCMap.putIfAbsent(blockID+ + | "${deviceVariableName.replace("_out_", "_in_")}", new CachedGPUMeta($deviceVariableName, own, colWidth)); + | } else { + | cuMemFree($deviceVariableName); + | } """.stripMargin } else if (is(GPUINPUT)) { - s"""|if (((cached & 2) > 0)) { - | inputCMap.putIfAbsent(blockID - | +"${deviceVariableName.replace("gpuInputDevice_", "gpuOutputDevice_")}", $deviceVariableName); - |} else { - | cuMemFree($deviceVariableName); - |} + s"""| + | ${if (isArray) { + s"colWidth = ${hostVariableName}_colWidth;" + } else { + "colWidth = 1;" + }} + | own = true; + | if (((cached & 2) > 0)) { + | own = false; + | inputCMap.putIfAbsent(blockID + | +"${deviceVariableName.replace("gpuInputDevice_", "gpuOutputDevice_")}", new CachedGPUMeta($deviceVariableName, true, colWidth)); + | } + | if (${is(RDDOUTPUT)} && (cached & 1) > 0) { + | outputCMap.putIfAbsent(blockID+ + | "${deviceVariableName.replace("gpuInputDevice_", "gpuOutputDevice_")}", new CachedGPUMeta($deviceVariableName, own, colWidth)); + | } else if (own) { + | cuMemFree($deviceVariableName); + | } """.stripMargin - } else { - s"""|cuMemFree($deviceVariableName); + } else { + s"""|cuMemFree($deviceVariableName); """.stripMargin } }else @@ -360,9 +387,9 @@ object JCUDACodeGen extends _Logging { if(isArray) s""" |int tmpCursor_${colName} = holder.cursor; - |arrayWriter.initialize(holder,${hostVariableName}_numCols, + |arrayWriter.initialize(holder,${hostVariableName}_colWidth, | ${dataType.defaultSize}); - |for(int j=0;j<${hostVariableName}_numCols;j++) + |for(int j=0;j<${hostVariableName}_colWidth;j++) | arrayWriter.write(j, ${hostVariableName}.get$boxType()); |rowWriter.setOffsetAndSize(${outSchemaIdx}, | tmpCursor_${colName}, holder.cursor - tmpCursor_${colName}); @@ -376,9 +403,9 @@ object JCUDACodeGen extends _Logging { case ArrayType(d, _) => s""" |int tmpCursor${colName} = holder.cursor; - |arrayWriter.initialize(holder,${hostVariableName}_numCols, + |arrayWriter.initialize(holder,${hostVariableName}_colWidth, | ${dataType.defaultSize}); - |for(int j=0;j<${hostVariableName}_numCols;j++) + |for(int j=0;j<${hostVariableName}_colWidth;j++) | arrayWriter.write(j, ${hostVariableName}[idx][j]); |rowWriter.setOffsetAndSize(${outSchemaIdx}, tmpCursor${colName}, | holder.cursor - tmpCursor${colName}); @@ -536,13 +563,12 @@ object JCUDACodeGen extends _Logging { val gpuGridSizeList = Array.fill(stagesCount,3){0} var gpuSharedMemory : Int = 0 - (0 to stagesCount-1).foreach(idx => { + (0 until stagesCount).foreach(idx => { val (gpuGridSizeX : Int, gpuBlockSizeX: Int, gpuGridSizeY : Int, gpuBlockSizeY : Int, gpuGridSizeZ : Int, gpuBlockSizeZ: Int) = localCF.gpuParams match { - case Some(gpuParamsCompute) => { + case Some(gpuParamsCompute) => gpuParamsCompute.dimensions match { case (computeDim) => computeDim(numElements, idx) } - } case None => (1,1,1,1,1,1) // Compute this in the executor nodes } @@ -567,7 +593,7 @@ object JCUDACodeGen extends _Logging { def createAllInputVariables(inputSchema : StructType, ctx : CodegenContext): Array[Variable] = { val variables = ArrayBuffer.empty[Variable] - (0 to inputSchema.length-1).map(inIdx => { + (0 until inputSchema.length).map(inIdx => { val fieldname = inputSchema(inIdx).name variables += Variable("in_"+fieldname, GPUINPUT | RDDOUTPUT, @@ -618,6 +644,9 @@ object JCUDACodeGen extends _Logging { |import org.apache.spark.sql.catalyst.expressions.UnsafeRow; |import com.ibm.gpuenabler.JCUDACodegenIterator; |import org.apache.spark.unsafe.types.UTF8String; + |import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; + |import org.apache.spark.sql.Encoder; + |import org.apache.spark.sql.types.StructType; | |import java.nio.*; |import java.util.Iterator; @@ -625,6 +654,7 @@ object JCUDACodeGen extends _Logging { |import com.ibm.gpuenabler.GPUSparkEnv; |import java.util.Map; |import java.util.List; + |import com.ibm.gpuenabler.CachedGPUMeta; | |public class GeneratedCode_${cf.funcName} { // REMOVE | // Handle to call from compiled source @@ -647,15 +677,18 @@ object JCUDACodeGen extends _Logging { | private int numElements = 0; | private int hasNextLoop = 0; | private Object refs[]; + | private boolean own = true; + | private int colWidth = 1; | | // cached : 0 - NoCache; | // 1 - DS is cached; Hold GPU results in GPU | // 2 - child DS is cached; Use GPU results stored by child for input parameters | private int cached = 0; | private int blockID = 0; - | private List> gpuPtrs; - | private Map inputCMap; - | private Map outputCMap; + | private List> gpuPtrs; + | private Map inputCMap; + | private Map outputCMap; + | private Encoder inpEnc; | | private int[][] blockSizeX; | private int[][] gridSizeX; @@ -682,16 +715,19 @@ object JCUDACodeGen extends _Logging { | arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); | } | - | public void init(Iterator inp, Object inprefs[], - | int size, int cached, List> gpuPtrs, - | int blockID, int[][] userGridSizes, int[][] userBlockSizes, int stages, int smSize) { + | public void init(Iterator inp, Object inprefs[], + | int size, int cached, List> gpuPtrs, + | int blockID, int[][] userGridSizes, int[][] userBlockSizes, int stages, int smSize, + | Encoder inpEnc) { | inpitr = inp; | numElements = size; - | if (!((cached & 4) > 0)) hasNextLoop = ${cf.outputSize.getOrElse("numElements")}; + | if (!((cached & 4) > 0) && ${if (cf.funcName != "") true else false }) + | hasNextLoop = ${cf.outputSize.getOrElse("numElements")}; | else hasNextLoop = 0; | | refs = inprefs; | + | this.inpEnc = inpEnc; | this.cached = cached; | this.gpuPtrs = gpuPtrs; | this.blockID = blockID; @@ -706,11 +742,11 @@ object JCUDACodeGen extends _Logging { | assert ((deviceProp.sharedMemPerBlock) > smSize) : "Invalid shared Memory Size Provided"; | | if (((cached & 1) > 0)) { - | outputCMap = (Map)gpuPtrs.get(0); + | outputCMap = (Map)gpuPtrs.get(0); | } | | if (((cached & 2) > 0)) { - | inputCMap = (Map) gpuPtrs.get(1); + | inputCMap = (Map) gpuPtrs.get(1); | } | } | @@ -773,28 +809,35 @@ object JCUDACodeGen extends _Logging { | CUstream cuStream = new CUstream(); | JCudaDriver.cuStreamCreate(cuStream, 0); | - | Boolean enterLoop = true; + | Boolean enterLoop = true; | if (!((cached & 2) > 0) ${getStmt(variables,List("checkLoop"),"")} ) { | enterLoop = true; | } else { | enterLoop = false; | allocateMemory(null, cuStream); - | } - | + | } + | | if (enterLoop){ | // Fill GPUInput/Direct Copy Host variables + | long now = System.nanoTime(); + | + | ExpressionEncoder inExpr = (ExpressionEncoder)inpEnc; + | StructType inputSchema = inpEnc.schema(); + | | for(int i=0; inpitr.hasNext();i++) { - | InternalRow r = (InternalRow) inpitr.next(); + | Object obj = ((InternalRow)inpitr.next()).get(0, inputSchema); + | InternalRow r = ((InternalRow) inExpr.toRow(obj)); | if (i == 0) allocateMemory(r, cuStream); | ${getStmt(variables,List("readFromInternalRow"),"")} | } - | } | + | } + | | ${getStmt(variables,List("readFromConstArray"),"")} - | + | | // Flip buffer for read | ${getStmt(variables,List("flip"),"")} - | + | | // Copy data from Host to Device | ${getStmt(variables,List("memcpyH2D"),"")} | cuCtxSynchronize(); @@ -839,8 +882,10 @@ object JCUDACodeGen extends _Logging { """.stripMargin } else "" } - | - | ${getStmt(variables,List("memcpyD2H"),"")} + | ${if (cf.funcName != "") { + // If no kernel is executed as in case of loadGpu; no need to copy data back. + getStmt(variables, List("memcpyD2H"), "") + } else ""} | cuCtxSynchronize(); | // Rewind buffer for read for GPUINPUT & GPUOUTPUT | ${getStmt(variables,List("rewind"),"")} @@ -863,23 +908,22 @@ object JCUDACodeGen extends _Logging { val fpath = if (cf.funcName != "") s"/tmp/GeneratedCode_${cf.funcName}.java" else "/tmp/GeneratedCode_autoload.java" - if(debugMode == 2) + if(debugMode == 2) { println(s"Compile Existing File - ${fpath}") + val _codeBody = generateFromFile(fpath).filter(!_.contains("REMOVE")).map(x => x+"\n").mkString + val code = new CodeAndComment(_codeBody, ctx.getPlaceHolderToComments()) - val _codeBody = if(debugMode == 2) - generateFromFile(fpath) - else if (debugMode == 1) { - val code = new CodeAndComment(codeBody,ctx.getPlaceHolderToComments()) - writeToFile(code, fpath) - codeBody + CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[JCUDACodegenIterator] } else { - codeBody - } - - val code = _codeBody.split("\n").filter(!_.contains("REMOVE")).map(_ + "\n").mkString + if (debugMode == 1) { + val code = new CodeAndComment(codeBody,ctx.getPlaceHolderToComments()) + writeToFile(code, fpath) + } + val code = codeBody.split("\n").filter(!_.contains("REMOVE")).map(_ + "\n").mkString val p = cache.get(new CodeAndComment(code, ctx.getPlaceHolderToComments())). generate(ctx.references.toArray).asInstanceOf[JCUDACodegenIterator] - p + p + } } private val cache = CacheBuilder.newBuilder() @@ -899,8 +943,8 @@ object JCUDACodeGen extends _Logging { pw.close() } - def generateFromFile(fpath : String) : String = { - scala.io.Source.fromFile(fpath).getLines().mkString + def generateFromFile(fpath : String) : Iterator[String] = { + scala.io.Source.fromFile(fpath).getLines() } } diff --git a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/JCUDACodegenIterator.scala b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/JCUDACodegenIterator.scala index 4c2dc61..998b8cd 100644 --- a/gpu-enabler/src/main/scala/com/ibm/gpuenabler/JCUDACodegenIterator.scala +++ b/gpu-enabler/src/main/scala/com/ibm/gpuenabler/JCUDACodegenIterator.scala @@ -17,8 +17,8 @@ package com.ibm.gpuenabler -import jcuda.driver.CUdeviceptr import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.Encoder /** * Interface for generated predicate @@ -26,8 +26,16 @@ import org.apache.spark.sql.catalyst.InternalRow abstract class JCUDACodegenIterator extends Iterator[InternalRow] { def hasNext() : Boolean def next() : InternalRow - def init(itr : java.util.Iterator[InternalRow], args: Array[Any],size : Int, - cached: Int, gpuPtrs: java.util.List[java.util.Map[String, CUdeviceptr]], blockID: Int, - userGridSizes: Array[Array[Int]], userBlockSizes: Array[Array[Int]], stages: Int, smSize: Int) + def init[T](itr : java.util.Iterator[InternalRow], + args: Array[Any], + size : Int, + cached: Int, + gpuPtrs: java.util.List[java.util.Map[String, CachedGPUMeta]], + blockID: Int, + userGridSizes: Array[Array[Int]], + userBlockSizes: Array[Array[Int]], + stages: Int, + smSize: Int, + inpEnc: Encoder[T]) } diff --git a/gpu-enabler/src/test/scala/com/ibm/gpuenabler/CUDADSFunctionSuite.scala b/gpu-enabler/src/test/scala/com/ibm/gpuenabler/CUDADSFunctionSuite.scala index 90df7a4..1a977fc 100644 --- a/gpu-enabler/src/test/scala/com/ibm/gpuenabler/CUDADSFunctionSuite.scala +++ b/gpu-enabler/src/test/scala/com/ibm/gpuenabler/CUDADSFunctionSuite.scala @@ -1,20 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.ibm.gpuenabler import java.util.Random @@ -46,7 +29,7 @@ case class MatrixData(M: Long, N: Long) class CUDADSFunctionSuite extends FunSuite { - val rootLogger = Logger.getRootLogger() +val rootLogger = Logger.getRootLogger() val ptxURL = "/testDSCUDAKernels.ptx" rootLogger.setLevel(Level.OFF) @@ -59,7 +42,7 @@ class CUDADSFunctionSuite extends FunSuite { Array("this"), Array("this"), ptxURL) - + SparkEnv.get.closureSerializer.newInstance().serialize(cf) } @@ -84,7 +67,7 @@ class CUDADSFunctionSuite extends FunSuite { } } - test("Run identity CUDA kernel on a single primitive array column", GPUTest) { + test("Run identity CUDA kernel on a single primitive array column", GPUTest) { val spark = SparkSession.builder().master("local[*]").appName("test").config(conf).getOrCreate() import spark.implicits._ @@ -1118,7 +1101,6 @@ test("Run map + map + map + reduce on datasets - Cached multiple partitions", GP info("No CUDA devices, so skipping the test.") } } - }