diff --git a/cuAcc/LibSVMParser.h b/cuAcc/LibSVMParser.h new file mode 100755 index 0000000..2e0222f --- /dev/null +++ b/cuAcc/LibSVMParser.h @@ -0,0 +1,174 @@ +#include +#include +#include +#include + + + +template +class CLibSVMParser +{ +public: + CLibSVMParser(){} + + std::vector m_csr_ridx; + std::vector m_csr_cidx; + std::vector m_csr_data; + std::vector m_y; + std::vector m_w; + + std::vector m_data; + + unsigned m_max_row; + unsigned m_min_row; + std::vector m_covered_by_X; + + int m_nx; + + unsigned get_nnz() { return m_csr_ridx.back(); } + unsigned get_ny() { return m_csr_ridx.size()-1;} + unsigned get_nx() { return m_nx;} + + int* get_csr_ridx() { return &m_csr_ridx.front();} + int* get_csr_cidx() { return &m_csr_cidx.front();} + T* get_csr_data() { return &m_csr_data.front();} + T* get_y() { return &m_y.front();} + T* get_w() { return m_w.empty()? NULL:&m_w.front();} + T* get_data() + { + if(m_data.empty()) + { + m_data.resize(get_ny()*get_nx(),0); + + int row = 0; + int col = 0; + T val; + + for(unsigned i=0,s=get_nnz();i +#include +#include + +#include "cuAcc_cluster.h" +#include "cuAcc_updater.h" + + +JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_create_1updater + (JNIEnv *env, jobject, jstring name, jint option) +{ + CSystemInfo tm(__FUNCTION__); + + //CSystemInfo::log("NativeSGD-INFO] CUDA enabled on %s pid=%d\n",CSystemInfo::get_host_info(),getpid()); + //tm.proc_info(); + + char* _name = (char*)env->GetStringUTFChars(name,0); + + CSystemInfo::mutex_lock(); + + int device = CCuAcc::get_most_idle_device(); + + CUDA_CHECK(cudaSetDevice(device)); + CUpdater* pUpdater = NULL; + if(!strcmp(_name,"SquaredL2Updater")) pUpdater = new CL2AdaDeltaUpdater(); + else if(!strcmp(_name,"L1Updater")) pUpdater = new CL1AdaDeltaUpdater; + else if(!strcmp(_name,"SimpleUpdater")) pUpdater = new CSimpleUpdater; + else + { + CSystemInfo::log("ERROR: %s is not supported\n",_name); + assert(false); + } + + CSystemInfo::mutex_unlock(); + + env->ReleaseStringUTFChars(name,_name); + + return (jlong)pUpdater; +} + +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_destroy_1updater + (JNIEnv *, jobject, jlong handle) +{ + delete (CUpdater*)handle; +} + +JNIEXPORT jdouble JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_updater_1initialize + (JNIEnv *env, jobject, jlong handle, jdoubleArray x, jdouble lambda, jint option) +{ + CUpdater* pUpdater = (CUpdater*)handle; + assert(pUpdater); + + double* _x = (double*)env->GetPrimitiveArrayCritical(x,0); + double regVal = pUpdater->initialize(env->GetArrayLength(x),_x,lambda); + + env->ReleasePrimitiveArrayCritical(x, _x, 0); + return regVal; +} + +JNIEXPORT jdouble JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_updater_1convergence_1compute + (JNIEnv *env, jobject, jlong handle, jdoubleArray x, jdoubleArray cgrad, jlong mini_batch_size, jdouble step_size, jint iter, jdouble lambda, jdoubleArray convergence_info, jint option) +{ + CSystemInfo tm(__FUNCTION__); + + CUpdater* pUpdater = (CUpdater*)handle; + assert(pUpdater); + + double* _x = (double*)env->GetPrimitiveArrayCritical(x,0); + double* _g = (double*)env->GetPrimitiveArrayCritical(cgrad,0); + double* _convergence_info = (double*)env->GetPrimitiveArrayCritical(convergence_info,0); + + tm.timer_check(); + + //cudaHostRegister(_x,pUpdater->m_nx*sizeof(double),0); + //cudaHostRegister(_g,pUpdater->m_nx*sizeof(double),0); + + double regVal = pUpdater->update(_x,_g,mini_batch_size,step_size,iter,lambda,_convergence_info); + + //cudaHostUnregister(_x); + //cudaHostUnregister(_g); + + tm.timer_check(); + + env->ReleasePrimitiveArrayCritical(convergence_info, _convergence_info, 0); + env->ReleasePrimitiveArrayCritical(cgrad, _g, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(x, _x, JNI_ABORT); + + return regVal; +} + + +JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_create_1acc_1cluster + (JNIEnv *env, jobject, jstring name, jdoubleArray data, jdoubleArray y, jint option) +{ + CSystemInfo tm(__FUNCTION__); + + unsigned ny = env->GetArrayLength(y); + unsigned nx = env->GetArrayLength(data)/ny; + + assert(ny); + assert(nx); + + char* _name = (char*)env->GetStringUTFChars(name,0); + double* _data = (double*)env->GetPrimitiveArrayCritical(data,0); + double* _y = (double*)env->GetPrimitiveArrayCritical(y,0); + + + tm.timer_check(); + + CMachineLearning::ALGO algo; + if(!strcmp(_name,"LinearRegression")) algo = CMachineLearning::LINEAR_REGRESSION; + else if(!strcmp(_name,"LogisticRegression")) algo = CMachineLearning::LOGISTIC_REGRESSION; + else + { + CSystemInfo::log("ERROR: %s is not supported\n",_name); + assert(false); + } + + cudaHostRegister(_data, ny*nx*sizeof(double), cudaHostRegisterDefault); + cudaHostRegister(_y, ny*sizeof(double), cudaHostRegisterDefault); + + CCuAccCluster* p_cluster = new CCuAccCluster(algo,CCuAcc::get_num_device()*2,ny,nx,_data,_y,false);; + + cudaHostUnregister(_data); + cudaHostUnregister(_y); + + tm.timer_check(); + + CCuMemory::showMemInfo(); + + env->ReleasePrimitiveArrayCritical(y, _y, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(data, _data, JNI_ABORT); + env->ReleaseStringUTFChars(name,_name); + + return (jlong)p_cluster; +} + +JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_create_1sparse_1acc_1cluster + (JNIEnv *env, jobject, jstring name, jdoubleArray data, jdoubleArray y, jintArray csr_ridx, jintArray csr_cidx, jint nx, jboolean intercept, jint option) +{ + CSystemInfo tm(__FUNCTION__); + + unsigned ny = env->GetArrayLength(csr_ridx)-1; + unsigned nnz = env->GetArrayLength(csr_cidx); + + char* _name = (char*)env->GetStringUTFChars(name,0); + double* _data = data? (double*)env->GetPrimitiveArrayCritical(data,0):NULL; + double* _y = (double*)env->GetPrimitiveArrayCritical(y,0); + int* _csr_ridx = (int*)env->GetPrimitiveArrayCritical(csr_ridx,0); + int* _csr_cidx = (int*)env->GetPrimitiveArrayCritical(csr_cidx,0); + + tm.timer_check(); + + CMachineLearning::ALGO algo; + if(!strcmp(_name,"LinearRegression")) algo = CMachineLearning::LINEAR_REGRESSION; + else if(!strcmp(_name,"LogisticRegression")) algo = CMachineLearning::LOGISTIC_REGRESSION; + else + { + CSystemInfo::log("ERROR: %s is not supported\n",_name); + assert(false); + } + + if(data) cudaHostRegister(_data, ny*nx*sizeof(double), cudaHostRegisterDefault); + cudaHostRegister(_y, ny*sizeof(double), cudaHostRegisterDefault); + + CCuAccCluster* p_cluster = new CCuAccCluster(algo,CCuAcc::get_num_device()*2,ny,nx,_data,_y,_csr_ridx,_csr_cidx,nnz,intercept); + + if(data) cudaHostUnregister(_data); + cudaHostUnregister(_y); + + tm.timer_check(); + + CCuMemory::showMemInfo(); + + if(data) env->ReleasePrimitiveArrayCritical(data, _data, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(csr_ridx, _csr_ridx, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(csr_cidx, _csr_cidx, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(y, _y, JNI_ABORT); + env->ReleaseStringUTFChars(name,_name); + + return (jlong)p_cluster; +} + +JNIEXPORT jstring JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_get_1exec_1id + (JNIEnv *env, jobject) +{ + CSystemInfo::mutex_lock(); + const char* uid = CSystemInfo::get_host_info(); + CSystemInfo::mutex_unlock(); + + return env->NewStringUTF(uid); +} + +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1aug + (JNIEnv *env, jobject, jlong handle, jdoubleArray x, jdouble intercept, jdoubleArray metric) +{ + CCuAccCluster* p_cluster = (CCuAccCluster*)handle; + + double* _x = (double*)env->GetPrimitiveArrayCritical(x,0); + double* _metric = (double*)env->GetPrimitiveArrayCritical(metric,0); + + //tm.timer_check(); + //needs to be incremental + *_metric = p_cluster->aug(_x,intercept); + //tm.timer_check(); + + env->ReleasePrimitiveArrayCritical(x, _x, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(metric, _metric, 0); +} + +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1rmse + (JNIEnv *env, jobject, jlong handle, jdoubleArray x, jdouble intercept, jdoubleArray metric) +{ + CCuAccCluster* p_cluster = (CCuAccCluster*)handle; + + double* _x = (double*)env->GetPrimitiveArrayCritical(x,0); + double* _metric = (double*)env->GetPrimitiveArrayCritical(metric,0); + + //tm.timer_check(); + //needs to be incremental + *_metric = p_cluster->rmse(_x,intercept); + //tm.timer_check(); + + env->ReleasePrimitiveArrayCritical(x, _x, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(metric, _metric, 0); +} + + + + + + +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1evaluate + (JNIEnv *env, jobject, jlong handle, jdouble mini_batch_fraction, jdouble weight_sum, jdoubleArray x, jdoubleArray g, jdoubleArray fx, jint option) +{ + CCuAccCluster* p_cluster = (CCuAccCluster*)handle; + + switch(CCuAccCluster::get_set_cluster_state(p_cluster,CCuAccCluster::ACTVE)){ + case CCuAccCluster::IDLE: + break; + case CCuAccCluster::DELETED: + case CCuAccCluster::INVALID: + assert(false); + case CCuAccCluster::ACTVE: + return 0; + } + + + //CSystemInfo tm(__FUNCTION__); + + double* _x = (double*)env->GetPrimitiveArrayCritical(x,0); + double* _g = (double*)env->GetPrimitiveArrayCritical(g,0); + double* _fx = (double*)env->GetPrimitiveArrayCritical(fx,0); + + //tm.timer_check(); + //needs to be incremental + *_fx += p_cluster->evaluate(env->GetArrayLength(g),_x,_g,weight_sum); + //tm.timer_check(); + + env->ReleasePrimitiveArrayCritical(x, _x, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(fx, _fx, 0); + env->ReleasePrimitiveArrayCritical(g, _g, 0); + + return 1; +} + +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_destroy_1acc_1cluster + (JNIEnv *env, jobject, jlong handle) +{ + CCuAccCluster* p_cluster = (CCuAccCluster*)handle; + + switch(CCuAccCluster::get_set_cluster_state(p_cluster,CCuAccCluster::DELETED)){ + case CCuAccCluster::ACTVE: + assert(false); + case CCuAccCluster::IDLE: + return 1; + case CCuAccCluster::DELETED: + case CCuAccCluster::INVALID: + return 0; + } + assert(false); + + return 0; +} + +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_reset_1acc_1cluster + (JNIEnv *env, jobject, jlong handle) +{ + CCuAccCluster* p_cluster = (CCuAccCluster*)handle; + + //allow race condition + CCuAccCluster::set_cluster_state(p_cluster,CCuAccCluster::IDLE); +} + +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1weighten + (JNIEnv *env, jobject, jlong handle, jdoubleArray weight, jdoubleArray weight_info) +{ + CCuAccCluster* p_cluster = (CCuAccCluster*)handle; + + switch(CCuAccCluster::get_set_cluster_state(p_cluster,CCuAccCluster::ACTVE)){ + case CCuAccCluster::IDLE: + break; + case CCuAccCluster::DELETED: + case CCuAccCluster::INVALID: + assert(false); + case CCuAccCluster::ACTVE: + return 0; + } + + double* _weight_info = (double*)env->GetPrimitiveArrayCritical(weight_info,0); + + if(weight) + { + double* _weight = (double*)env->GetPrimitiveArrayCritical(weight,0); + p_cluster->weighten(_weight,_weight_info,_weight_info+1); + env->ReleasePrimitiveArrayCritical(weight, _weight, JNI_ABORT); + } + else + { + p_cluster->weighten(NULL,_weight_info,_weight_info+1); + } + + env->ReleasePrimitiveArrayCritical(weight_info, _weight_info, JNI_ABORT); + + return 1; + +} + +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1summarize + (JNIEnv *env, jobject, jlong handle, jdouble weight_sum, jdoubleArray data_sum, jdoubleArray data_sq_sum, jdoubleArray label_info) +{ + CCuAccCluster* p_cluster = (CCuAccCluster*)handle; + + switch(CCuAccCluster::get_set_cluster_state(p_cluster,CCuAccCluster::ACTVE)){ + case CCuAccCluster::IDLE: + break; + case CCuAccCluster::DELETED: + case CCuAccCluster::INVALID: + assert(false); + case CCuAccCluster::ACTVE: + return 0; + } + + double* _sum = (double*)env->GetPrimitiveArrayCritical(data_sum,0); + double* _data_sq_sum = (double*)env->GetPrimitiveArrayCritical(data_sq_sum,0); + double* _label_info = (double*)env->GetPrimitiveArrayCritical(label_info,0); + + p_cluster->summarize(weight_sum,_sum,_data_sq_sum,_label_info); + + env->ReleasePrimitiveArrayCritical(data_sum, _sum, 0); + env->ReleasePrimitiveArrayCritical(data_sq_sum, _data_sq_sum, 0); + env->ReleasePrimitiveArrayCritical(label_info, _label_info, 0); + + return 1; +} + +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1standardize + (JNIEnv *env, jobject, jlong handle, jdoubleArray data_mean, jdoubleArray data_std, jdouble label_mean, jdouble label_std) +{ + CCuAccCluster* p_cluster = (CCuAccCluster*)handle; + + switch(CCuAccCluster::get_set_cluster_state(p_cluster,CCuAccCluster::ACTVE)){ + case CCuAccCluster::IDLE: + break; + case CCuAccCluster::DELETED: + case CCuAccCluster::INVALID: + assert(false); + case CCuAccCluster::ACTVE: + return 0; + } + + double* _data_mean = (double*)env->GetPrimitiveArrayCritical(data_mean,0); + double* _data_std = (double*)env->GetPrimitiveArrayCritical(data_std,0); + + p_cluster->standardize(_data_mean,_data_std,label_mean,label_std); + + env->ReleasePrimitiveArrayCritical(data_mean, _data_mean, JNI_ABORT); + env->ReleasePrimitiveArrayCritical(data_std, _data_std, JNI_ABORT); + + return 1; +} + + +///* +// * Class: org_apache_spark_mllib_optimization_NativeSGD +// * Method: compress +// * Signature: ([D[IDI)[I +// */ +//JNIEXPORT jintArray JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_compress +// (JNIEnv *, jobject, jdoubleArray, jintArray, jdouble, jint); +// +///* +// * Class: org_apache_spark_mllib_optimization_NativeSGD +// * Method: decompress +// * Signature: ([D[IDI)V +// */ +//JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_decompress +// (JNIEnv *, jobject, jdoubleArray, jintArray, jdouble, jint); \ No newline at end of file diff --git a/cuAcc/NativeCuAcc.h b/cuAcc/NativeCuAcc.h new file mode 100755 index 0000000..aedaceb --- /dev/null +++ b/cuAcc/NativeCuAcc.h @@ -0,0 +1,149 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class org_apache_spark_ml_optim_NativeCuAcc */ + +#ifndef _Included_org_apache_spark_ml_optim_NativeCuAcc +#define _Included_org_apache_spark_ml_optim_NativeCuAcc +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: create_updater + * Signature: (Ljava/lang/String;I)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_create_1updater + (JNIEnv *, jobject, jstring, jint); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: destroy_updater + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_destroy_1updater + (JNIEnv *, jobject, jlong); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: updater_initialize + * Signature: (J[DDI)D + */ +JNIEXPORT jdouble JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_updater_1initialize + (JNIEnv *, jobject, jlong, jdoubleArray, jdouble, jint); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: updater_convergence_compute + * Signature: (J[D[DJDID[DI)D + */ +JNIEXPORT jdouble JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_updater_1convergence_1compute + (JNIEnv *, jobject, jlong, jdoubleArray, jdoubleArray, jlong, jdouble, jint, jdouble, jdoubleArray, jint); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: get_exec_id + * Signature: ()Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_get_1exec_1id + (JNIEnv *, jobject); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: create_acc_cluster + * Signature: (Ljava/lang/String;[D[DI)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_create_1acc_1cluster + (JNIEnv *, jobject, jstring, jdoubleArray, jdoubleArray, jint); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: create_sparse_acc_cluster + * Signature: (Ljava/lang/String;[D[D[I[IIZI)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_create_1sparse_1acc_1cluster + (JNIEnv *, jobject, jstring, jdoubleArray, jdoubleArray, jintArray, jintArray, jint, jboolean, jint); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: reset_acc_cluster + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_reset_1acc_1cluster + (JNIEnv *, jobject, jlong); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: destroy_acc_cluster + * Signature: (J)I + */ +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_destroy_1acc_1cluster + (JNIEnv *, jobject, jlong); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: acc_cluster_summarize + * Signature: (JD[D[D[D)I + */ +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1summarize + (JNIEnv *, jobject, jlong, jdouble, jdoubleArray, jdoubleArray, jdoubleArray); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: acc_cluster_weighten + * Signature: (J[D[D)I + */ +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1weighten + (JNIEnv *, jobject, jlong, jdoubleArray, jdoubleArray); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: acc_cluster_standardize + * Signature: (J[D[DDD)I + */ +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1standardize + (JNIEnv *, jobject, jlong, jdoubleArray, jdoubleArray, jdouble, jdouble); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: acc_cluster_evaluate + * Signature: (JDD[D[D[DI)I + */ +JNIEXPORT jint JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1evaluate + (JNIEnv *, jobject, jlong, jdouble, jdouble, jdoubleArray, jdoubleArray, jdoubleArray, jint); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: acc_cluster_aug + * Signature: (J[DD[D)V + */ +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1aug + (JNIEnv *, jobject, jlong, jdoubleArray, jdouble, jdoubleArray); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: acc_cluster_rmse + * Signature: (J[DD[D)V + */ +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_acc_1cluster_1rmse + (JNIEnv *, jobject, jlong, jdoubleArray, jdouble, jdoubleArray); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: compress + * Signature: ([D[IDI)[I + */ +JNIEXPORT jintArray JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_compress + (JNIEnv *, jobject, jdoubleArray, jintArray, jdouble, jint); + +/* + * Class: org_apache_spark_ml_optim_NativeCuAcc + * Method: decompress + * Signature: ([D[IDI)V + */ +JNIEXPORT void JNICALL Java_org_apache_spark_ml_optim_NativeCuAcc_decompress + (JNIEnv *, jobject, jdoubleArray, jintArray, jdouble, jint); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/cuAcc/cuAcc.cpp b/cuAcc/cuAcc.cpp new file mode 100755 index 0000000..375d6de --- /dev/null +++ b/cuAcc/cuAcc.cpp @@ -0,0 +1,368 @@ +#include +#include +#include +#include +#include +#include + +#include "cuAcc_base.h" +#include "cuAcc_function.h" +#include "cuAcc_updater.h" +#include "cuAcc_cluster.h" +#include "LibSVMParser.h" + +//////#include +////// +////// +////// +//////__device__ double atomicAdd(double* address, double val) +//////{ +////// unsigned long long int* address_as_ull = (unsigned long long int*)address; +////// unsigned long long int old = *address_as_ull, assumed; +////// do { +////// assumed = old; +////// old = atomicCAS(address_as_ull, assumed, __double_as_longlong(val +__longlong_as_double(assumed))); +////// } while (assumed != old); +////// return __longlong_as_double(old); +//////} +//////#define WARP_SIZE (32) +//////template +//////__global__ void kernel_trans_csrmv(int numRow, int* ptr, int* idx, T* val, T* y, T* x){ +////// +////// // global thread index +////// int thread_id = THREAD_BLOCK_SIZE * blockIdx.x + threadIdx.x; +////// +////// // thread index within the warp +////// int thread_lane = threadIdx.x & (WARP_SIZE-1); +////// // global warp index +////// int warp_id = thread_id / WARP_SIZE; +////// // total number of active warps +////// int num_warps = (THREAD_BLOCK_SIZE / WARP_SIZE) * gridDim.x; +////// for(unsigned row=warp_id; row < numRow; row+=num_warps){ +////// int row_start = ptr[row]; +////// int row_end = ptr[row+1]; +////// for (unsigned i=row_start+thread_lane; i < row_end;i+=WARP_SIZE) +////// atomicAdd(x+idx[i], val[i] * y[row]); +////// } +//////} + + +////template +//// __global__ void spmv_csr_vector_kernel (const int num_rows, const int *ptr, const int *indices, const T *data, const T *x , T *y) { +//// +//// __shared__ T vals[THREAD_BLOCK_SIZE]; +//// +//// int thread_id = blockDim.x * blockIdx.x + threadIdx.x; // global thread index +//// +//// int warp_id = thread_id / WARP_SIZE; // global warp index +//// +//// int lane = thread_id & (WARP_SIZE - 1); // thread index within the warp +//// +//// +//// +//// // one warp per row +//// +//// int row = warp_id; +//// +//// +//// +//// if (row < num_rows) { +//// +//// int row_start = ptr[row]; +//// +//// int row_end = ptr[row+1]; +//// +//// +//// +//// // compute running sum per thread +//// +//// vals [threadIdx.x] = 0; +//// +//// for (int jj = row_start + lane; jj < row_end; jj += WARP_SIZE) +//// +//// vals [ threadIdx.x ] += data [ jj ] * x [ indices [ jj ]]; +//// +//// +//// +//// // parallel reduction in shared memory +//// +//// if ( lane < 16) vals[threadIdx.x] += vals[threadIdx.x + 16]; +//// +//// if ( lane < 8) vals[threadIdx.x] += vals[threadIdx.x + 8]; +//// +//// if ( lane < 4) vals[threadIdx.x] += vals[threadIdx.x + 4]; +//// +//// if ( lane < 2) vals[threadIdx.x] += vals[threadIdx.x + 2]; +//// +//// if ( lane < 1) vals[threadIdx.x] += vals[threadIdx.x + 1]; +//// +//// +//// +//// // first thread writes the result +//// if (lane == 0) +//// y[row] += vals[threadIdx.x]; +//// } +////} +//// +// +//int test(int argc, char* argv[]) +//{ +// ///////////////////////////////////////////////// +// //parse the input +// std::map Parm; +// +// printf("cmd: %s ",argv[0]); +// +// char* pLast = NULL; +// for(int iIdx=1;iIdx P; +// P.read_libsvm(Parm["libsvm"].c_str()); +// +// printf("nx=%d\n",P.get_nx()); +// printf("ny=%d\n",P.get_ny()); +// +// //cusparse initialization +// cudaStream_t stream; +// cusparseHandle_t cusparse_handle; +// cusparseMatDescr_t data_descr; +// cudaStreamCreate(&stream); +// +// cusparseCreate(&cusparse_handle); +// cusparseSetStream(cusparse_handle,stream); +// +// cusparseCreateMatDescr(&data_descr); +// cusparseSetMatType(data_descr,CUSPARSE_MATRIX_TYPE_GENERAL); +// cusparseSetMatIndexBase(data_descr,CUSPARSE_INDEX_BASE_ZERO); +// +// double* p_dev_csr_data; +// int* p_dev_csr_ridx; +// int* p_dev_csr_cidx; +// +// double* p_dev_csc_data; +// int* p_dev_csc_ridx; +// int* p_dev_csc_cidx; +// +// cudaMalloc( (void**)&p_dev_csr_data, P.get_nnz()*sizeof(double) ); +// cudaMalloc( (void**)&p_dev_csr_ridx, (P.get_ny()+1)*sizeof(int) ); +// cudaMalloc( (void**)&p_dev_csr_cidx, P.get_nnz()*sizeof(int) ); +// +// cudaMemcpyAsync(p_dev_csr_data,P.get_csr_data(),P.get_nnz()*sizeof(double),cudaMemcpyHostToDevice,stream); +// cudaMemcpyAsync(p_dev_csr_ridx,P.get_csr_ridx(),(P.get_ny()+1)*sizeof(int),cudaMemcpyHostToDevice,stream); +// cudaMemcpyAsync(p_dev_csr_cidx,P.get_csr_cidx(),P.get_nnz()*sizeof(int),cudaMemcpyHostToDevice,stream); +// +// CSystemInfo t; +// +// cudaMalloc( (void**)&p_dev_csc_data, P.get_nnz()*sizeof(double) ); +// cudaMalloc( (void**)&p_dev_csc_cidx, (P.get_nx()+1)*sizeof(int) ); +// cudaMalloc( (void**)&p_dev_csc_ridx, P.get_nnz()*sizeof(int) ); +// +// if(CUSPARSE_STATUS_SUCCESS!=cusparseDcsr2csc(cusparse_handle,P.get_ny(),P.get_nx(),P.get_nnz(), +// p_dev_csr_data,p_dev_csr_ridx,p_dev_csr_cidx, +// p_dev_csc_data,p_dev_csc_ridx,p_dev_csc_cidx, +// CUSPARSE_ACTION_NUMERIC,CUSPARSE_INDEX_BASE_ZERO)) +// { +// assert(false); +// } +// +// t.timer_check(); +// +// +// double alpha = 1.0; +// double beta = 0.0; +// +// double* x; +// double* y; +// +// cudaMalloc( (void**)&x, P.get_nx()*sizeof(double) ); +// cudaMalloc( (void**)&y, P.get_ny()*sizeof(double) ); +// +// cudaMemset(x,1, P.get_nx()*sizeof(double)); +// +// for(int i=0;i<1000;++i) +// { +// ////////////////////////////////////////////////////////////////// +// //y = Ax +// if(CUSPARSE_STATUS_SUCCESS!=cusparseDcsrmv(cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,P.get_ny(),P.get_nx(),P.get_nnz(),&alpha, +// data_descr,p_dev_csr_data,p_dev_csr_ridx,p_dev_csr_cidx, +// x, +// &beta,y)) +// { +// assert(false); +// } +// +// CUDA_CHECK(cudaStreamSynchronize(stream)); +// } +// +// +// t.timer_check(); +// +// for(int i=0;i<1000;++i) +// { +// ///////////////////////////////////////////////////////////////// +// //y = tran(A)x +// if(CUSPARSE_STATUS_SUCCESS!=cusparseDcsrmv(cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,P.get_nx(),P.get_ny(),P.get_nnz(),&alpha, +// data_descr,p_dev_csc_data,p_dev_csc_cidx,p_dev_csc_ridx, +// x, +// &beta,y)) +// { +// assert(false); +// } +// +// CUDA_CHECK(cudaStreamSynchronize(stream)); +// } +// +// t.timer_print(); +// } +// +// +// return 0; +//} + +int main(int argc, char* argv[]) +{ + //return test(argc,argv); + + + std::map Parm; + + printf("cmd: %s ",argv[0]); + + char* pLast = NULL; + for(int iIdx=1;iIdx P; + if(!Parm["libsvm"].empty()) + { + + P.read_libsvm(Parm["libsvm"].c_str()); + } + else if(!Parm["rdd"].empty()) + { + P.read_rdd(Parm["rdd"].c_str()); + } + else + { + assert(false); + } + + CSystemInfo tm("main"); + + printf("nx=%d\n",P.get_nx()); + printf("ny=%d\n",P.get_ny()); + + std::vector x(P.get_nx()+intercept,0); + + + cudaDeviceReset(); + + + CCuAccCluster SLGC(CMachineLearning::LOGISTIC_REGRESSION,num_partition,P.get_ny(),P.get_nx(), + NULL,P.get_y(), + P.get_csr_ridx(),P.get_csr_cidx(),P.get_nnz(), + intercept + ); + + double weight_sum; + double weight_nnz; + + SLGC.weighten(P.get_w(),&weight_sum,&weight_nnz); + + std::vector mean(P.get_nx(),0); + std::vector std(P.get_nx(),0); + std::vector label_info(3,0); + + + SLGC.summarize(weight_sum,&mean.front(),&std.front(),&label_info.front()); + + + if(intercept) + x.back() = log(label_info[1]/label_info[0]); + + double n = weight_sum; + + double unbiased_factor = P.get_ny() / (P.get_ny() - 1.0); + + for(unsigned i=0;i=0); + std[i] = sqrt(x); + } + + + double label_mean = label_info[1] / n; + double label_std = sqrt((label_info[2] / (n) - label_mean * label_mean) * unbiased_factor); + + double label_std2 = sqrt( + + double(P.get_ny())/(P.get_ny()-1)* + ( + + label_info[2] / (n) - label_mean * label_mean + + ) + + ); + + + SLGC.standardize(&mean.front(),&std.front(),label_mean,label_std); + + SLGC.solve_sgd(weight_sum,&CL2Updater(),max_iter,lambda,step_size,convergence_tol,&x.front()); + + CCuAccCluster::get_set_cluster_state(&SLGC,CCuAccCluster::DELETED); + + + return 0; +} + diff --git a/cuAcc/cuAcc_base.cu b/cuAcc/cuAcc_base.cu new file mode 100755 index 0000000..c495634 --- /dev/null +++ b/cuAcc/cuAcc_base.cu @@ -0,0 +1,276 @@ +#include "cuAcc_base.h" + + +pthread_mutex_t CSystemInfo::m_mutex = PTHREAD_MUTEX_INITIALIZER; +void CSystemInfo::mutex_lock() +{ + pthread_mutex_lock(&m_mutex); +} + +void CSystemInfo::mutex_unlock() +{ + pthread_mutex_unlock(&m_mutex); +} + +void CSystemInfo::timer_check() +{ + push_back(timer_elapsed_time()); + timer_reset(); +} + +double CSystemInfo::timer_elapsed_time() +{ +#ifdef WIN32 + double elapsed = difftime(clock(), m_start)/CLOCKS_PER_SEC; +#else + struct timespec finish; + clock_gettime(CLOCK_MONOTONIC, &finish); + double elapsed = (finish.tv_sec - m_start.tv_sec)+(finish.tv_nsec - m_start.tv_nsec) / 1000000000.0; +#endif + + return elapsed; +} + +void CSystemInfo::timer_reset() +{ +#ifdef WIN32 + m_start = clock(); +#else + clock_gettime(CLOCK_MONOTONIC, &m_start); +#endif +} + +void CSystemInfo::timer_print() +{ + m_printed = true; + + double tot = 0; + + log("%s : ",m_msg.c_str()); + + for(unsigned i=0;i %e\n",tot); + fflush(stdout); +} + +void CSystemInfo::proc_info() +{ +#ifndef WIN32 +#define MAX_BUFFER (4096) + char cCmd[MAX_BUFFER]; + sprintf(cCmd,"ps -p %d -o cmd h",getpid()); + //ps -p 17826 -o cmd h + //FILE* pFile = fopen("/proc/self/cmdline","rt"); + FILE* pFile = popen(cCmd,"r"); + if(!pFile) + { + printf("cannot open /proc/self/cmdline!\n"); + } + else + { + for(char cBuf[MAX_BUFFER];fgets(cBuf,sizeof(cBuf),pFile);) + { + printf("%s\n",cBuf); + } + + fclose(pFile); + } +#endif +} + +const char* CSystemInfo::get_host_info() +{ + static char host_info[512] = {NULL}; + + if(!host_info[0]) + { + gethostname(host_info,sizeof(host_info)); + sprintf(host_info+strlen(host_info),":%d",getpid()); + } + + return host_info; +} + +void CSystemInfo::print_stack() +{ +#ifndef WIN32 +#include +#include + + /** Print a demangled stack backtrace of the caller function to FILE* out. */ + FILE *out = stderr; + const unsigned int max_frames = 63 ; + fprintf(out, "stack trace:\n"); + + // storage array for stack trace address data + void* addrlist[max_frames+1]; + + // retrieve current stack addresses + int addrlen = backtrace(addrlist, sizeof(addrlist) / sizeof(void*)); + + if (addrlen == 0) { + fprintf(out, " \n"); + return; + } + + // resolve addresses into strings containing "filename(function+address)", + // this array must be free()-ed + char** symbollist = backtrace_symbols(addrlist, addrlen); + + // allocate string which will be filled with the demangled function name + size_t funcnamesize = 256; + char* funcname = (char*)malloc(funcnamesize); + + // iterate over the returned symbol lines. skip the first, it is the + // address of this function. + for (int i = 1; i < addrlen; i++) + { + char *begin_name = 0, *begin_offset = 0, *end_offset = 0; + + // find parentheses and +address offset surrounding the mangled name: + // ./module(function+0x15c) [0x8048a6d] + for (char *p = symbollist[i]; *p; ++p) + { + if (*p == '(') + begin_name = p; + else if (*p == '+') + begin_offset = p; + else if (*p == ')' && begin_offset) { + end_offset = p; + break; + } + } + + if (begin_name && begin_offset && end_offset + && begin_name < begin_offset) + { + *begin_name++ = '\0'; + *begin_offset++ = '\0'; + *end_offset = '\0'; + + // mangled name is now in [begin_name, begin_offset) and caller + // offset in [begin_offset, end_offset). now apply + // __cxa_demangle(): + + int status; + char* ret = abi::__cxa_demangle(begin_name, + funcname, &funcnamesize, &status); + if (status == 0) { + funcname = ret; // use possibly realloc()-ed string + fprintf(out, " %s : %s+%s\n", + symbollist[i], funcname, begin_offset); + } + else { + // demangling failed. Output function name as a C function with + // no arguments. + fprintf(out, " %s : %s()+%s\n", + symbollist[i], begin_name, begin_offset); + } + } + else + { + // couldn't parse the line? print the whole line. + fprintf(out, " %s\n", symbollist[i]); + } + } + + free(funcname); + free(symbollist); +#endif +} + +int CSystemInfo::log(const char * format, ... ) +{ + char buffer[2560]; + va_list args; + va_start (args, format); + vsprintf (buffer,format, args); + va_end (args); + + fprintf(stdout,"%s",buffer); + return fflush(stdout); +} + +// cuBLAS API errors +const char* CCuAcc::cublas_get_error_enum(cublasStatus_t error) +{ + switch (error){ + case CUBLAS_STATUS_SUCCESS: return "CUBLAS_STATUS_SUCCESS"; + case CUBLAS_STATUS_NOT_INITIALIZED: return "CUBLAS_STATUS_NOT_INITIALIZED"; + case CUBLAS_STATUS_ALLOC_FAILED: + { + CCuMemory::showMemInfo(); + return "CUBLAS_STATUS_ALLOC_FAILED"; + } + case CUBLAS_STATUS_INVALID_VALUE: return "CUBLAS_STATUS_INVALID_VALUE"; + case CUBLAS_STATUS_ARCH_MISMATCH: return "CUBLAS_STATUS_ARCH_MISMATCH"; + case CUBLAS_STATUS_MAPPING_ERROR: return "CUBLAS_STATUS_MAPPING_ERROR"; + case CUBLAS_STATUS_EXECUTION_FAILED: return "CUBLAS_STATUS_EXECUTION_FAILED"; + case CUBLAS_STATUS_INTERNAL_ERROR: return "CUBLAS_STATUS_INTERNAL_ERROR"; + } + + return ""; +} + + +const char* CCuAcc::cusparse_get_error_enum(cusparseStatus_t error) +{ + switch (error){ + case CUSPARSE_STATUS_SUCCESS: return "CUSPARSE_STATUS_SUCCESS"; + case CUSPARSE_STATUS_NOT_INITIALIZED: return "CUSPARSE_STATUS_NOT_INITIALIZED"; + case CUSPARSE_STATUS_ALLOC_FAILED: + { + CCuMemory::showMemInfo(); + return "CUSPARSE_STATUS_ALLOC_FAILED"; + } + case CUSPARSE_STATUS_INVALID_VALUE: return "CUSPARSE_STATUS_INVALID_VALUE"; + case CUSPARSE_STATUS_ARCH_MISMATCH: return "CUSPARSE_STATUS_ARCH_MISMATCH"; + case CUSPARSE_STATUS_MAPPING_ERROR: return "CUSPARSE_STATUS_MAPPING_ERROR"; + case CUSPARSE_STATUS_EXECUTION_FAILED: return "CUSPARSE_STATUS_EXECUTION_FAILED"; + case CUSPARSE_STATUS_INTERNAL_ERROR: return "CUSPARSE_STATUS_INTERNAL_ERROR"; + case CUSPARSE_STATUS_MATRIX_TYPE_NOT_SUPPORTED: return "CUSPARSE_STATUS_MATRIX_TYPE_NOT_SUPPORTED"; + case CUSPARSE_STATUS_ZERO_PIVOT: return "CUSPARSE_STATUS_ZERO_PIVOT"; + } + + return ""; +} + +int CCuAcc::get_num_device() +{ + int num_device; + CUDA_CHECK(cudaGetDeviceCount(&num_device)); + + return num_device; +} +int CCuAcc::get_most_idle_device() +{ + //return 0; + int num_device = get_num_device(); + int most_idle_gpu = -1; + double most_idle_level = 0; + + for (int i = 0;i< num_device;i++) + { + CUDA_CHECK(cudaSetDevice(i)); + size_t mem_tot = 0; + size_t mem_free = 0; + CUDA_CHECK(cudaMemGetInfo(&mem_free, & mem_tot)); + + double cur_idle_level = (double)mem_free/mem_tot; + if(most_idle_level=0); + + return most_idle_gpu; +} + diff --git a/cuAcc/cuAcc_base.h b/cuAcc/cuAcc_base.h new file mode 100755 index 0000000..ca638a5 --- /dev/null +++ b/cuAcc/cuAcc_base.h @@ -0,0 +1,367 @@ +#ifndef __CUACC_BASE_H__ +#define __CUACC_BASE_H__ + + + +#include +#include +#include +#include +#include +#include +#include +#ifdef WIN32 +#include +#include +#else +#include +#include +#include +#include +#include +#include +#endif + +#include +#include +#include + + +#include +#include +#include +#include +#include + + +#ifdef WIN32 +#undef __ldg +#define __ldg * +#endif + +#ifdef _DISABLE_CUDA_ +#undef __device__ +#define __device__ +#undef __global__ +#define __global__ +#undef __host__ +#define __host__ +#define __syncthreads __gpu_dummy +__device__ void __syncthreads(); +#undef __shared__ +#define __shared__ +#define __ldg * +#endif + + +#ifndef max +#define max(a,b) (((a) > (b)) ? (a) : (b)) +#endif + +#define THREAD_BLOCK_SIZE (512) +#define CUDA_CHECK(call) \ +{\ + cudaError_t err = (call);\ + if(cudaSuccess != err)\ +{\ + fprintf(stderr,"CUDA Error:\nFile = %s\nLine = %d\nReason = %s\n", __FILE__, __LINE__, cudaGetErrorString(err));fflush(stderr);\ + cudaDeviceReset();\ + exit(EXIT_FAILURE);\ +}\ +} + +#define CUBLAS_CHECK(call) \ +{\ + cublasStatus_t status = (call);\ + if(CUBLAS_STATUS_SUCCESS != status)\ +{\ + fprintf(stderr,"CUBLAS Error:\nFile = %s\nLine = %d\nCode = %d\nReason = %s\n", __FILE__, __LINE__, status,CCuAcc::cublas_get_error_enum(status));fflush(stderr);\ + cudaDeviceReset();\ + exit(EXIT_FAILURE);\ +}\ +} + +#define CUSPARSE_CHECK(call) \ +{\ + cusparseStatus_t status = (call);\ + if(CUSPARSE_STATUS_SUCCESS != status)\ +{\ + fprintf(stderr,"CUSPARSE Error:\nFile = %s\nLine = %d\nCode = %d\nReason = %s\n", __FILE__, __LINE__, status,CCuAcc::cusparse_get_error_enum(status));fflush(stderr);\ + cudaDeviceReset();\ + exit(EXIT_FAILURE);\ +}\ +} + + +const double HALF = 0.5; +const double ONE = 1; +const double MONE = -1; +const double ZERO = 0; + + +class CSystemInfo : private std::vector { +public: + static pthread_mutex_t m_mutex; + + bool m_printed; + std::string m_msg; + +#ifdef WIN32 + time_t m_start; +#else + struct timespec m_start; +#endif + + CSystemInfo(const char* msg="") : m_printed(false), m_msg(msg) + { + timer_reset(); + } + + ~CSystemInfo() + { + timer_check(); + + if(!m_printed) + timer_print(); + } + + void timer_check(); + double timer_elapsed_time(); + void timer_reset(); + void timer_print(); + + static int log(const char * format, ... ); //timer_print log + static void proc_info(); + static void print_stack(); + static const char* get_host_info(); + static void mutex_lock(); //while getting the most idle device, stop others from interverning + static void mutex_unlock(); +}; + +///////////////////////////////////////////////// +//top level class for CUDA accelerator +//-controls device assignment (ie get_most_idle_device) +//-other CUDA utility functions (ie errorcode2string) +//-CUDA lib handles/stremas per accelerator +class CCuAcc { +public: + int m_device; //assigned device to this accelerator (once set, don't change it) + cudaStream_t m_stream; + cusparseHandle_t m_cusparse_handle; + cublasHandle_t m_cublas_handle; + + static const char* cublas_get_error_enum(cublasStatus_t error); // cuBLAS API errors + static const char* cusparse_get_error_enum(cusparseStatus_t error); + + static int get_num_device(); + static int get_most_idle_device(); + + CCuAcc(int device) : m_device(device) + { + CUDA_CHECK(cudaSetDevice(device)); + CUDA_CHECK(cudaStreamCreate(&m_stream)); + + CUBLAS_CHECK(cublasCreate(&m_cublas_handle)) + CUBLAS_CHECK(cublasSetStream(m_cublas_handle,m_stream)); + + CUSPARSE_CHECK(cusparseCreate(&m_cusparse_handle)); + CUSPARSE_CHECK(cusparseSetStream(m_cusparse_handle,m_stream)); + } + + virtual ~CCuAcc() + { + CUDA_CHECK(cudaStreamDestroy(m_stream)); + cublasDestroy(m_cublas_handle); + cusparseDestroy(m_cusparse_handle); + } +}; + +//////////////////////////////////////////////////// +//CUDA memory class +//-automatic cudaFree at the end of scope +//-way to get host copy cleanly +#define _USE_PINNED_MEMORY_ +#ifdef WIN32 +#define _USE_VECTOR_ +#endif + +template +class CCuMemory{ +public: + T* m_dev; + unsigned m_count; +#ifdef _USE_VECTOR_ + std::vector m_host; +#else + T* m_host; +#endif + + static void showMemInfo() + { + size_t mem_tot = 0; + size_t mem_free = 0; + cudaMemGetInfo(&mem_free, & mem_tot); + + printf("mem_free=%.2f/%.2f GB \n",mem_free/1024.0/1024/1024,mem_tot/1024.0/1024/1024); + } + + static T* alloc( unsigned count ) + { + T* dev; + CUDA_CHECK(cudaMalloc( (void**)&dev, count*sizeof(T) )); + + if(!dev) + { + CSystemInfo::log("failed to allocate %.1f KB on GPU\n",count*sizeof(T)/1024.0); + CSystemInfo::print_stack(); + assert(false); + } + + return dev; + } + + //default + CCuMemory() : m_dev(NULL), m_host(NULL), m_count(0) {} + + //initialize with a given data in host memory + CCuMemory(const T* host, unsigned count, cudaStream_t stream) : m_dev(NULL), m_host(NULL) + { + m_count = count; + m_dev = alloc(m_count); + to_dev(host,m_count,stream,0); + } + + //initialize with a given value, cannot be used to set values to non one-byte type + CCuMemory(int v, unsigned count) : m_dev(NULL), m_host(NULL) + { + m_count = count; + m_dev = alloc(m_count); + memset(v); + } + + //just allocate memory on device + CCuMemory(unsigned count) : m_dev(NULL), m_host(NULL) + { + m_count = count; + m_dev = alloc(m_count); + } + + //initialize with a data on the same device + CCuMemory(CCuMemory& o, cudaStream_t stream) : m_dev(NULL), m_host(NULL) + { + m_count = o.m_count; + m_dev = alloc(m_count); + from_dev(o.dev(),m_count,stream,0); + } + + void resize(unsigned count) + { + release(); + m_count = count; + m_dev = alloc(m_count); + } + + void memset(int v) + { + CUDA_CHECK(cudaMemset(m_dev,v,m_count*sizeof(T))); + } + + T* dev() + { + return m_dev; + } + + T* host() + { +#ifdef _USE_VECTOR_ + return &m_host.front(); +#else + assert(m_host); + return m_host; +#endif + } + + //copy to member host memory + T* copy(cudaStream_t stream, unsigned count=0) + { +#ifdef _USE_VECTOR_ + if(m_host.size()!=m_count) + m_host.resize(m_count); +#else + if(!m_host) +#ifdef _USE_PINNED_MEMORY_ + CUDA_CHECK(cudaMallocHost(&m_host,m_count*sizeof(T))); +#else + m_host = new T[m_count]; +#endif +#endif + if(count==0) + to_host(host(),m_count,stream); + else + to_host(host(),count,stream); + +#ifdef WIN32 + if(stream==0) + cudaStreamSynchronize(stream); +#endif + + return host(); + } + + void from_dev(const T* dev, unsigned count, cudaStream_t stream, const unsigned offset) + { + assert(m_dev); + CUDA_CHECK(cudaMemcpyAsync(m_dev+offset,dev,count*sizeof(T),cudaMemcpyDeviceToDevice,stream)); + } + + //read from host and write to device + void to_dev(const T* host, unsigned count, cudaStream_t stream, const unsigned offset) + { + assert(host); + assert(m_dev); + CUDA_CHECK(cudaMemcpyAsync(m_dev+offset,host,count*sizeof(T),cudaMemcpyHostToDevice,stream)); + } + + + //write back to host + void to_host(T* host, unsigned count, cudaStream_t stream) + { + assert(host); + assert(m_dev); + CUDA_CHECK(cudaMemcpyAsync(host,m_dev,count*sizeof(T),cudaMemcpyDeviceToHost,stream)); + } + + void release(T* host, unsigned count, cudaStream_t stream) + { + read(host,count,stream); + release(); + } + + void release() + { +#ifndef _USE_VECTOR_ + if(m_host) + { +#ifdef _USE_PINNED_MEMORY_ + cudaFreeHost(m_host); +#else + delete[] m_host; +#endif + m_host = NULL; + } +#endif + + if(m_dev) + { + cudaFree(m_dev); + m_dev = NULL; + } + } + + ~CCuMemory() + { + release(); + } +}; + +#endif \ No newline at end of file diff --git a/cuAcc/cuAcc_cluster.cu b/cuAcc/cuAcc_cluster.cu new file mode 100755 index 0000000..025b614 --- /dev/null +++ b/cuAcc/cuAcc_cluster.cu @@ -0,0 +1,541 @@ +#include "cuAcc_base.h" +#include "cuAcc_cluster.h" +#include "cuAcc_function.h" +#include + +std::vector CCuAccCluster::m_cluster_list; +int CCuAccCluster::m_num_deleted = 0; +CCuAccCluster::STATE CCuAccCluster::get_set_cluster_state(CCuAccCluster* p_cluster, CCuAccCluster::STATE next) +{ + STATE cur = INVALID; + + CSystemInfo::mutex_lock(); + + if(!m_cluster_list.empty()) + { + cur = p_cluster->m_state; + p_cluster->m_state = next; + + if(next==DELETED&&cur!=DELETED) + { + p_cluster->release(); + m_num_deleted++; + + CSystemInfo::log("%s] clear cluster list (%d/%d)\n",CSystemInfo::get_host_info(),m_num_deleted,m_cluster_list.size()); + + if(m_num_deleted==m_cluster_list.size()) + { + for(unsigned i=0,s=m_cluster_list.size();im_state = next; +} + + +CCuAccCluster::CCuAccCluster(CMachineLearning::ALGO algo, const unsigned num_partition, const unsigned ny, const unsigned nx, + const double* data, const double* y, const int* csr_ridx, const int* csr_cidx, const unsigned nnz, const bool intercept) + :m_nx(nx), m_ny(ny), m_cgrad(NULL), m_state(IDLE) +{ + CSystemInfo::mutex_lock(); + m_cluster_list.push_back(this); + CSystemInfo::mutex_unlock(); + + m_ml_array.resize(num_partition); + + std::vector csr_ridx_array(num_partition+1,csr_ridx); + std::vector ny_array(num_partition,0); + std::vector y_array(num_partition,y); + + //make earlier partitions larger + unsigned partition_size = ny/num_partition+1; + + for(unsigned i=1;i ny_array(num_partition,0); + std::vector y_array(num_partition,y); + + unsigned partition_size = ny/num_partition+1; + + CSystemInfo::log("partition_size = %d\n",partition_size); + + for(unsigned i=1;im_stream)); +} + +void CCuAccCluster::synchronize() +{ + const unsigned num_partition = m_ml_array.size(); + for(unsigned i=0;im_device)); + + m_ml_array[i]->m_p_matrix->summarize(weight_sum, + m_ml_array[i]->m_p_dev_tmp_data_sum, + m_ml_array[i]->m_p_dev_tmp_data_sq_sum, + m_ml_array[i]->m_p_dev_tmp_label_info, + m_ml_array[i]); + } + + //honor existing values + double label_sum; + for(unsigned i=0;im_p_dev_tmp_data_sum->host(),1,data_sum,1); + cblas_daxpy(m_nx,1,m_ml_array[i]->m_p_dev_tmp_data_sq_sum->host(),1,data_sq_sum,1); + + + label_sum = *m_ml_array[i]->m_p_dev_tmp_label_info->host(); + + //squared sum of label + label_info[2] += *(m_ml_array[i]->m_p_dev_tmp_label_info->host()+1); + //this is the number of ones for binary classification, otherwise, just sum (to get mean) + label_info[1] += label_sum; + //this is the number of zeros for binary classification + label_info[0] += m_ml_array[i]->m_p_matrix->m_dev_y.m_count-label_sum; + } +} + + +void CCuAccCluster::weighten(double* weight, double* weight_sum, double* weight_nnz) +{ + const unsigned num_partition = m_ml_array.size(); + + if(weight) + { + std::vector w_array(num_partition,NULL); + + unsigned offset = 0; + for(unsigned i=0;im_p_matrix->m_nrow; + } + + synchronize(); + + for(unsigned i=0;im_device)); + m_ml_array[i]->m_p_matrix->weighten(w_array[i],&m_ml_array[i]->m_dev_buf_xy1,m_ml_array[i]); + } + + *weight_nnz = 0; + for(unsigned i=0;im_p_matrix->m_nrow; + + *weight_nnz = *weight_sum; + } +} + +double CCuAccCluster::aug(double* weight, double intercept) +{ + const unsigned num_partition = m_ml_array.size(); + + double* fx = new double[m_ny]; + double* label = new double[m_ny]; + + unsigned offset = 0; + + synchronize(); + + for(unsigned i=0;im_device)); + m_ml_array[i]->predict(weight,intercept,fx+offset,label+offset); + + offset += m_ml_array[i]->m_p_matrix->m_nrow; + } + + synchronize(); + + double ret = m_ml_array.front()->aug(label,fx,m_ny); + + delete[] fx; + delete[] label; + + //printf("aug =%.16f\n",ret); + + return ret; +} + +double CCuAccCluster::rmse(double* weight, double intercept) +{ + const unsigned num_partition = m_ml_array.size(); + + unsigned max_ny = m_ml_array[0]->m_p_matrix->m_nrow; + for(unsigned i=1;im_p_matrix->m_nrow); + + double* fx = new double[max_ny+num_partition]; + + synchronize(); + + for(unsigned i=0;im_device)); + m_ml_array[i]->sq_err(weight,intercept,fx+i,NULL); + } + + synchronize(); + + double sum = fx[0]; + for(unsigned i=1;im_device)); + m_ml_array[i]->standardize(data_mean,data_std,label_mean,label_std); + } +} + +double CCuAccCluster::evaluate(unsigned size, double* x, double* cgrad, double weight_sum) +{ + assert(x); + assert(cgrad); + + const unsigned num_partition = m_ml_array.size(); + + synchronize(); + + for(unsigned i=0;im_device)); + m_ml_array[i]->evaluate(x,weight_sum); + } + + //honor existing cgrad + double cfx = 0; + for(unsigned i=0;iget_g(),1,cgrad,1); + cfx += m_ml_array[i]->get_fx(); + } + + //CSystemInfo::log("cfx=%f cgrad[0]=%f size=%d\n",cfx,cgrad[0],size); + return cfx; +} + +void CCuAccCluster::solve_sgd(double weight_sum, CUpdater* updater, unsigned int max_iteration, double lambda, double step_size, double convergence_tol, double* x) +{ + CSystemInfo tm(__FUNCTION__); + + if(!m_cgrad) + m_cgrad = new double[m_nx+m_ml_array.front()->m_intercept]; + + updater->initialize(m_nx,x,lambda); + + for(unsigned i=1;i<=max_iteration;++i) + { + memset(m_cgrad,0,(m_nx+m_ml_array.front()->m_intercept)*sizeof(double)); + + { + double nzx = 0; + for(unsigned i=0;im_intercept,x,m_cgrad,weight_sum); + + std::vector convergence_info(2,1); + + double regVal = updater->update(x,m_cgrad,m_ny,step_size,i,lambda,&convergence_info.front()); + + bool converged = convergence_info[0] < convergence_tol * max(convergence_info[1], 1.0); + + if(m_ml_array.front()->m_intercept) + printf("%s i=%d fx=%e converged=%d loss=%e cgrad(0)=%e x(0)=%e regVal=%e intercept=%e\n", + "GPU", i, fx, converged,fx/weight_sum, m_cgrad[0], x[0], regVal,m_cgrad[m_nx]); + else + printf("%s i=%d fx=%e converged=%d loss=%e cgrad(0)=%e x(0)=%e regVal=%e\n", + "GPU", i, fx, converged,fx/weight_sum, m_cgrad[0], x[0], regVal); + +// double test_X[] = { +//-0.006838634518180606,-0.04457094511877472,0.0,0.07773612956391085,0.020533723356241724,0.0,0.0,0.022182386253426847,0.09301926482022636,0.015388873442756919,-0.0055706686315458625,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.2144022892015011,0.0,0.07630180403429483,-0.041239465967306106,0.0,0.0,0.054897525742876915,0.0,0.0,0.3234479467804168,-0.09985496661607271,0.0,-0.05912154214988861,0.0,0.0,0.0,0.15645030060625612,0.27932668102340386,0.0,0.0,0.0,0.0,0.0,0.015353470498619775,0.03626368315145928,-4.705486802913351E-4,-0.004311330923063878,0.0,0.15268269392225883,0.0,0.0052617150790345985,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.01576293865701875,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.11983673271431113,0.0,0.0,0.0,0.0,-0.1211005227086926,0.1211005227087023,-0.0646176630433938,0.06461766304339668,-0.02892466364285392,0.0,-0.002802605387570024,0.026113589272167094,0.02955739713306733,0.00209330814976037,0.0,-0.03139374015546418,-0.03535609886258558,0.0,0.0,0.0,0.0,0.5200839381282015,0.0,0.0,-0.04422023506672689,0.0,0.0,0.0,0.049975552753353254,0.15381322000562847,0.0,0.0,0.0,0.0,0.0,-0.11695772237442635,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.1230812559815254,0.0,0.0,0.0,0.0,0.38527907409563156,0.0 +// }; +// +// this->rmse(test_X,0.19676101393228124); + + if(converged) break; + } +} + +#ifdef _ENABLE_LBFGS_ + +#include "lbfgs\lbfgs.h" + +static lbfgsfloatval_t evaluate( + void *instance, + const lbfgsfloatval_t *x, + lbfgsfloatval_t *g, + const int n, + const lbfgsfloatval_t step + ) +{ + int i; + lbfgsfloatval_t fx = 0.0; + + CCuMatrix* func = (CCuMatrix*)instance; + + func->evaluate(x); + CUDA_CHECK(cudaStreamSynchronize(func->m_stream)); + + memcpy(g,func->get_g(),n*sizeof(double)); + + return func->get_fx(); + + //for (i = 0;i < n;i += 2) { + // lbfgsfloatval_t t1 = 1.0 - x[i]; + // lbfgsfloatval_t t2 = 10.0 * (x[i+1] - x[i] * x[i]); + // g[i+1] = 20.0 * t2; + // g[i] = -2.0 * (x[i] * g[i+1] + t1); + // fx += t1 * t1 + t2 * t2; + //} + //return fx; +} + +static int progress( + void *instance, + const lbfgsfloatval_t *x, + const lbfgsfloatval_t *g, + const lbfgsfloatval_t fx, + const lbfgsfloatval_t xnorm, + const lbfgsfloatval_t gnorm, + const lbfgsfloatval_t step, + int n, + int k, + int ls + ) +{ + printf("Iteration %d:\n", k); + printf(" fx = %f, x[0] = %f, x[1] = %f\n", fx, x[0], x[1]); + printf(" xnorm = %f, gnorm = %f, step = %f\n", xnorm, gnorm, step); + printf("\n"); + return 0; +} + +void CCuAccCluster::solve_lbfgs(CUpdater* updater, unsigned int max_iteration, double lambda, double step_size, double convergence_tol, double* x) +{ + assert(false); + + CSystemInfo tm(__FUNCTION__); + + lbfgs_parameter_t param; + + lbfgs_parameter_init(¶m); + param.max_iterations = 20; + + if(!m_cgrad) + m_cgrad = new double[m_nx]; + + + + + + double* new_x = new double[m_nx*m_ml_array.size()]; + + + for(unsigned i=1;i<=max_iteration;++i) + { + if(i==1) + { + double fx_check = 0; + for(unsigned j=0,s=m_ml_array.size();jinitialize(m_nx,x,lambda); + //memset(m_cgrad,0,m_nx*sizeof(double)); + + //double fx = evaluate(x,m_cgrad); + + //std::vector convergence_info(2,1); + + //double regVal = updater->update(x,m_cgrad,m_ny,step_size,i,lambda,&convergence_info.front()); + + //bool converged = convergence_info[0] < convergence_tol * max(convergence_info[1], 1.0); + + //printf("%s i=%d fx=%e converged=%d cgrad(0)=%e x(0)=%e regVal=%e\n", + // "GPU", i, fx, converged, m_cgrad[0], x[0], regVal); + + //if(converged) break; + } + else + { + memset(m_cgrad,0,m_nx*sizeof(double)); + + double fx = evaluate(x,m_cgrad); + + std::vector convergence_info(2,1); + + double regVal = updater->update(x,m_cgrad,m_ny,step_size,i,lambda,&convergence_info.front()); + + bool converged = convergence_info[0] < convergence_tol * max(convergence_info[1], 1.0); + + printf("%s i=%d fx=%e converged=%d cgrad(0)=%e x(0)=%e regVal=%e\n", + "GPU", i, fx, converged, m_cgrad[0], x[0], regVal); + + if(converged) break; + } + } + +} + +#endif \ No newline at end of file diff --git a/cuAcc/cuAcc_cluster.h b/cuAcc/cuAcc_cluster.h new file mode 100755 index 0000000..14f7bf0 --- /dev/null +++ b/cuAcc/cuAcc_cluster.h @@ -0,0 +1,75 @@ +#ifndef __CUACCCLUSTER_H__ +#define __CUACCCLUSTER_H__ + + +#include "cuAcc_function.h" +#include "cuAcc_updater.h" + + +class CCuAccCluster +{ +public: + enum STATE{ + INVALID, + DELETED, + IDLE, + ACTVE + }; + + unsigned m_nx; + unsigned m_ny; + double* m_cgrad; + + STATE m_state; + std::vector m_ml_array; + + static int m_num_deleted; + static std::vector m_cluster_list; + static void set_cluster_state(CCuAccCluster* p_cluster, STATE next); + static STATE get_set_cluster_state(CCuAccCluster* p_cluster, CCuAccCluster::STATE next); + + //sparse + CCuAccCluster(CMachineLearning::ALGO algo, const unsigned num_partition, const unsigned ny, const unsigned nx, + const double* data, const double* y, const int* csr_ridx, const int* csr_cidx, const unsigned nnz, const bool intercept); + + //dense + CCuAccCluster(CMachineLearning::ALGO algo, const unsigned num_partition, const unsigned ny, const unsigned nx, + const double* data, const double* y, const bool intercept); + + virtual ~CCuAccCluster(void) + { + release(); + } + + void release() + { + for(unsigned i=0,s=m_ml_array.size();i +#include + + +template +__global__ void kernel_set(unsigned size, T* arr, T v) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + if(idx>=size) return; + + arr[idx] = v; +} + +//template +//__global__ void kernel_seq_array(unsigned size, T* arr) +//{ +// const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; +// if(idx>=size) return; +// +// arr[idx] = idx; +//} + +//template +//__global__ void kernel_sq_arry(unsigned size, T* dst, T* src) +//{ +// const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; +// if(idx>=size) return; +// +// dst[idx] = src[idx]*src[idx]; +//} +// +//template +//__global__ void kernel_mult_array(unsigned size, T* dst, T* src) +//{ +// const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; +// if(idx>=size) return; +// +// dst[idx] *= src[idx]; +//} +// +template +__global__ void kernel_mult(unsigned size, T* dst, const T* __restrict__ a, const T* __restrict__ b) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + if(idx>=size) return; + + dst[idx] = a[idx]*b[idx]; +} + +template +__global__ void kernel_inv(unsigned size, T* dst, T* src) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + if(idx>=size) return; + + if(src[idx]) dst[idx] = 1.0/src[idx]; + else dst[idx] = 0; +} + + + +//template +//__global__ void kernel_div_array(unsigned size, T* dst, T* src) +//{ +// const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; +// if(idx>=size) return; +// +// if(src[idx]) dst[idx] /= src[idx]; +// else dst[idx] = 0; +//} + +// +//template +//__global__ void kernel_sub_array(unsigned size, T* dst, T* src) +//{ +// const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; +// if(idx>=size) return; +// +// dst[idx] -= src[idx]; +//} + + +//template +//__global__ void kernel_dec(unsigned size, T* arr, T v) +//{ +// const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; +// if(idx>=size) return; +// +// arr[idx] -= v; +//} + +template +__global__ void kernel_dec(unsigned size, T* __restrict__ arr, T* __restrict__ v) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + if(idx>=size) return; + + arr[idx] -= *v; +} + + +template +__global__ void kernel_inc(unsigned size, T* __restrict__ arr, T* __restrict__ v) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + if(idx>=size) return; + + arr[idx] += *v; +} + +template +__global__ void kernel_inc(unsigned size, T* arr, T v) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + if(idx>=size) return; + + arr[idx] += v; +} + + +__global__ void kernel(float *x, int n) +{ + int tid = threadIdx.x + blockIdx.x * blockDim.x; + for (int i = tid; i < n; i += blockDim.x * gridDim.x) { + x[i] = sqrt(pow(3.14159,i)); + } +} + + +/* +This version adds multiple elements per thread sequentially. This reduces the overall +cost of the algorithm while keeping the work complexity O(n) and the step complexity O(log n). +(Brent's Theorem optimization) + +Note, this kernel needs a minimum of 64*sizeof(T) bytes of shared memory. +In other words if block_size <= 32, allocate 64*sizeof(T) bytes. +If block_size > 32, allocate block_size*sizeof(T) bytes. +*/ +template +__global__ void kernel_reduce(T* __restrict__ idata, T* __restrict__ odata, unsigned int n) +{ + extern __shared__ T __smem_d[]; + + T *sdata = __smem_d; + + // perform first level of reduction, + // reading from global memory, writing to shared memory + const unsigned tid = threadIdx.x; + const unsigned grid_size = block_size*2*gridDim.x; + const bool is_pow2 = (n&(n-1))==0; + + T sum = 0; + + // we reduce multiple elements per thread. The number is determined by the + // number of active thread blocks (via gridDim). More blocks will result + // in a larger grid_size and therefore fewer elements per thread + if(is_pow2) + { + for(unsigned i= blockIdx.x*block_size*2+threadIdx.x;i= 512) && (tid < 256)) sdata[tid] = sum = sum + sdata[tid + 256]; __syncthreads(); + if ((block_size >= 256) && (tid < 128)) sdata[tid] = sum = sum + sdata[tid + 128]; __syncthreads(); + if ((block_size >= 128) && (tid < 64)) sdata[tid] = sum = sum + sdata[tid + 64]; __syncthreads(); + +#if (__CUDA_ARCH__ >= 300 ) + if ( tid < 32 ) + { + // Fetch final intermediate sum from 2nd warp + if (block_size>=64) sum += sdata[tid + 32]; + // Reduce final warp using shuffle + for (int offset = warpSize/2; offset > 0; offset /= 2) + { + sum += __shfl_down(sum, offset); + } + } +#else + // fully unroll reduction within a single warp + if ((block_size>=64) && (tid < 32)) sdata[tid] = sum = sum + sdata[tid + 32]; __syncthreads(); + if ((block_size>=32) && (tid < 16)) sdata[tid] = sum = sum + sdata[tid + 16]; __syncthreads(); + if ((block_size>=16) && (tid < 8)) sdata[tid] = sum = sum + sdata[tid + 8]; __syncthreads(); + if ((block_size>= 8) && (tid < 4)) sdata[tid] = sum = sum + sdata[tid + 4]; __syncthreads(); + if ((block_size>= 4) && (tid < 2)) sdata[tid] = sum = sum + sdata[tid + 2]; __syncthreads(); + if ((block_size>= 2) && (tid < 1)) sdata[tid] = sum = sum + sdata[tid + 1]; __syncthreads(); +#endif + + // write result for this block to global mem + if (tid == 0) odata[blockIdx.x] = sum; +} + +void kernel_reduce_array(double* __restrict__ idata, double* __restrict__ mdata, double* __restrict__ odata, double* __restrict__ one, unsigned n, cudaStream_t stream, cublasHandle_t cbulas_handle, double scale=ONE) +{ + const unsigned fx_size = n/THREAD_BLOCK_SIZE+1; + + if(fx_size>1) + { + kernel_reduce <<>> + (idata, mdata, n); + + CUBLAS_CHECK(cublasDgemv(cbulas_handle,CUBLAS_OP_N,1,fx_size,&scale,mdata,1,one,1, + &ZERO,odata,1)); + } + else + { + CUBLAS_CHECK(cublasDgemv(cbulas_handle,CUBLAS_OP_N,1,n,&scale,idata,1,one,1, + &ZERO,odata,1)); + } +} + +void kernel_mult_array(const double* __restrict__ a, const double* __restrict__ b, double* dst, unsigned n, cudaStream_t stream, cublasHandle_t cbulas_handle) +{ + kernel_mult<<>>(n,dst,a,b); + + //CUBLAS_CHECK(cublasDsbmv(cbulas_handle,CUBLAS_FILL_MODE_LOWER,n,0,&ONE,a,1,b,1,&ZERO,dst,1)); +} + +std::vector > CCuMatrix::m_cache_dev_one_per_device; + +CCuMatrix::CCuMatrix(const int device, const unsigned ncol, const unsigned nrow, const double* y, cudaStream_t stream) + :m_ncol(ncol),m_nrow(nrow),m_data_one_only(false),m_label_weighted(false) +{ + m_dev_y.resize(nrow); + m_dev_y.to_dev(y,nrow,stream,0); + m_dev_x_std_inv.resize(ncol); + m_dev_x_mean.resize(ncol); + + //MUST BE protected by mutex outside + if(m_cache_dev_one_per_device.empty()) + m_cache_dev_one_per_device.resize(CCuAcc::get_num_device()); + + m_p_dev_one = &m_cache_dev_one_per_device[device]; + if(m_p_dev_one->m_countresize(unsigned(nrow*1.30)); + + kernel_set<<m_count/THREAD_BLOCK_SIZE+1,THREAD_BLOCK_SIZE,0,stream>>> + (m_p_dev_one->m_count,m_p_dev_one->dev(),1); + } +} + + +void CCuMatrix::weighten(double* weight, CCuMemory* p_dev_tmp_weight_sum, CCuAcc* p_acc) +{ + assert(weight); + m_label_weighted = weight; + + m_dev_w.resize(m_nrow); + m_dev_w.to_dev(weight,m_nrow,p_acc->m_stream,0); + + kernel_reduce_array(m_dev_w.dev(),p_dev_tmp_weight_sum->dev(),p_dev_tmp_weight_sum->dev(),m_p_dev_one->dev(),m_nrow,p_acc->m_stream,p_acc->m_cublas_handle); + + p_dev_tmp_weight_sum->to_host(weight,1,p_acc->m_stream); +} + + +void CCuDenseMatrix::write(const char* filename, const unsigned ny, const unsigned nx, const double* data, const double* y, const double* weight) +{ + FILE* pFile = fopen(filename,"wb"); + if(!pFile) + { + CSystemInfo::log("cannot write to [%s]\n",filename); + return; + } + + fwrite(&ny,sizeof(unsigned),1,pFile); + fwrite(&nx,sizeof(unsigned),1,pFile); + + + fwrite(data,sizeof(double),ny*nx,pFile); + fwrite(y,sizeof(double),ny,pFile); + fwrite(weight,sizeof(double),nx,pFile); + + fclose(pFile); +} + +void CCuDenseMatrix::read(const char* filename, unsigned& ny, unsigned& nx, double*& data, double*& y, double*& weight) +{ + FILE* pFile = fopen(filename,"rb"); + if(!pFile) + { + CSystemInfo::log("cannot read [%s]\n",filename); + return; + } + + fread(&ny,sizeof(unsigned),1,pFile); + fread(&nx,sizeof(unsigned),1,pFile); + + data = new double[ny*nx]; + y = new double[ny]; + weight = new double[nx]; + + + fread(data,sizeof(double),ny*nx,pFile); + fread(y,sizeof(double),ny,pFile); + fread(weight,sizeof(double),nx,pFile); + + fclose(pFile); +} + +void CCuSparseMatrix::write(const char* filename, const unsigned ny, const unsigned nx, const double* data, const double* y, + const int* csr_ridx, const int* csr_cidx, const unsigned nnz, const double* weight) +{ + FILE* pFile = fopen(filename,"wb"); + if(!pFile) + { + CSystemInfo::log("cannot write to [%s]\n",filename); + return; + } + + fwrite(&ny,sizeof(unsigned),1,pFile); + fwrite(&nx,sizeof(unsigned),1,pFile); + fwrite(&nnz,sizeof(unsigned),1,pFile); + + fwrite(data,sizeof(double),nnz,pFile); + fwrite(y,sizeof(double),ny,pFile); + fwrite(csr_ridx,sizeof(int),ny+1,pFile); + fwrite(csr_cidx,sizeof(int),nnz,pFile); + + fwrite(weight,sizeof(double),nx,pFile); + + fclose(pFile); +} + +void CCuSparseMatrix::read(const char* filename, unsigned& ny, unsigned& nx, double*& data, double*& y, + int*& csr_ridx, int*& csr_cidx, unsigned& nnz, double*& weight) +{ + FILE* pFile = fopen(filename,"rb"); + if(!pFile) + { + CSystemInfo::log("cannot read [%s]\n",filename); + return; + } + + fread(&ny,sizeof(unsigned),1,pFile); + fread(&nx,sizeof(unsigned),1,pFile); + fread(&nnz,sizeof(unsigned),1,pFile); + + data = new double[nnz]; + y = new double[ny]; + weight = new double[nx]; + csr_ridx = new int[ny]; + csr_cidx = new int[nnz]; + + fread(data,sizeof(double),nnz,pFile); + fread(y,sizeof(double),ny,pFile); + fread(csr_ridx,sizeof(int),ny+1,pFile); + fread(csr_cidx,sizeof(int),nnz,pFile); + + fread(weight,sizeof(double),nx,pFile); + + fclose(pFile); +} + +__global__ void kernel_logit(unsigned ny, double* __restrict__ m, double* __restrict__ y) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + if(idx>=ny) return; + + const double v = m[idx]; + + if(v==0) + y[idx] = 0.5; + else + { + const double exp_v = exp(v); + y[idx] = 1.0/(1+exp_v); + } +} + + +#define EXP_THRESHOLD (36) +__global__ void kernel_logistic_fx(unsigned ny, double* __restrict__ m2fx, double* __restrict__ y, double* __restrict__ t) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + if(idx>=ny) return; + + const double v = m2fx[idx]; //this is margin, will be filled with cost + + if(y[idx]>0) m2fx[idx] = 0; + else m2fx[idx] = -v; + + if(v==0) + { + m2fx[idx] += 0.69314718055994529; + t[idx] = 0.5-y[idx]; + } + else if(v>EXP_THRESHOLD) + { + m2fx[idx] += v; + t[idx] = -y[idx]; + } + else if(v>EXP_THRESHOLD/2) + { + const double exp_v = exp(v); + + m2fx[idx] += v+1/exp_v; + t[idx] = 1.0/(1+exp_v)-y[idx]; + } + else if(v<-EXP_THRESHOLD) + { + t[idx] = 1.0-y[idx]; + } + else + { + const double exp_v = exp(v); + + m2fx[idx] += log1p(exp_v); + t[idx] = 1.0/(1+exp_v)-y[idx]; + } +} + +CCuSparseMatrix::CCuSparseMatrix(const int device,const unsigned ny, const unsigned nx, const double* csr_data, const double* y, const int* csr_ridx, const int* csr_cidx, const unsigned nnz, cudaStream_t stream, cusparseHandle_t cusparse_handle) + :CCuMatrix(device,nx,ny,y,stream), + m_nnz(nnz) +{ + m_data_one_only = !csr_data; + + m_dev_csr_data.resize(nnz); + m_dev_csr_ridx.resize(ny+1); + m_dev_csr_cidx.resize(nnz); + + if(csr_data) m_dev_csr_data.to_dev(csr_data,nnz,stream,0); + m_dev_csr_ridx.to_dev(csr_ridx,ny+1,stream,0); + m_dev_csr_cidx.to_dev(csr_cidx,nnz,stream,0); + + CUSPARSE_CHECK(cusparseCreateMatDescr(&m_data_descr)); + CUSPARSE_CHECK(cusparseSetMatType(m_data_descr,CUSPARSE_MATRIX_TYPE_GENERAL)); + CUSPARSE_CHECK(cusparseSetMatIndexBase(m_data_descr,CUSPARSE_INDEX_BASE_ZERO)); + + assert(y); + + if(csr_ridx[0]!=0) kernel_inc<<<(ny+1)/THREAD_BLOCK_SIZE+1,THREAD_BLOCK_SIZE,0,stream>>>(ny+1,m_dev_csr_ridx.dev(),-csr_ridx[0]); + + //since it is on-set (no data copied from host), initialize arrays with ONE + m_dev_csc_ridx.resize(nnz); + m_dev_csc_cidx.resize(nx+1); + m_dev_csc_data.resize(nnz); + + CUSPARSE_CHECK(cusparseCreateHybMat(&m_hybA)); + CUSPARSE_CHECK(cusparseCreateHybMat(&m_hybT)); + + CUSPARSE_CHECK(cusparseDcsr2csc(cusparse_handle,ny,nx,nnz, + m_dev_csr_data.dev(),m_dev_csr_ridx.dev(),m_dev_csr_cidx.dev(), + m_dev_csc_data.dev(),m_dev_csc_ridx.dev(),m_dev_csc_cidx.dev(), + CUSPARSE_ACTION_NUMERIC,CUSPARSE_INDEX_BASE_ZERO)); +} + +template +__global__ void kernel_standardize_csr_matrix(unsigned size, T* __restrict__ m, int* __restrict__ cidx, T* __restrict__ f) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + + if(idx>=size) return; + + m[idx] *= f[cidx[idx]]; +} + +template +__global__ void kernel_standardize_csc_matrix(unsigned size, T* __restrict__ m,int* __restrict__ cidx, T* __restrict__ f) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + + if(idx>=size) return; + + const double den = f[idx]; + + for(unsigned i=cidx[idx],s=cidx[idx+1];i +__global__ void kernel_count_column_csc_matrix(unsigned size,int* __restrict__ cidx, T* __restrict__ s) +{ + const unsigned idx = threadIdx.x+blockDim.x*blockIdx.x; + + if(idx>=size) return; + + s[idx] = cidx[idx+1]-cidx[idx]; +} + +#define _USE_HYB_ + +void CCuSparseMatrix::transform(CCuAcc* p_acc) +{ +#ifdef _USE_HYB_ + CUSPARSE_CHECK(cusparseDcsr2hyb(p_acc->m_cusparse_handle,m_nrow,m_ncol,m_data_descr,m_dev_csr_data.dev(),m_dev_csr_ridx.dev(),m_dev_csr_cidx.dev(),m_hybA,0,CUSPARSE_HYB_PARTITION_AUTO)); + CUSPARSE_CHECK(cusparseDcsr2hyb(p_acc->m_cusparse_handle,m_ncol,m_nrow,m_data_descr,m_dev_csc_data.dev(),m_dev_csc_cidx.dev(),m_dev_csc_ridx.dev(),m_hybT,0,CUSPARSE_HYB_PARTITION_AUTO)); +#endif +} + +void CCuSparseMatrix::standardize_orig(CCuAcc* p_acc) +{ + kernel_standardize_csr_matrix<<m_stream>>> + (m_nnz,m_dev_csr_data.dev(),m_dev_csr_cidx.dev(),m_dev_x_std_inv.dev()); +} + +void CCuSparseMatrix::standardize_trans(CCuAcc* p_acc) +{ + kernel_standardize_csc_matrix<<m_stream>>> + (m_ncol,m_dev_csc_data.dev(),m_dev_csc_cidx.dev(),m_dev_x_std_inv.dev()); +} + + +void CCuSparseMatrix::summarize(double weight_sum, CCuMemory* p_dev_tmp_data_sum, CCuMemory* p_dev_tmp_data_sq_sum, CCuMemory* p_dev_tmp_label_info, CCuAcc* p_acc) +{ + ///////////////////////////////////////////// + // <------------ x -------------> + // ^ + // | + // | + // y need to add up per column + // | => use transposed matrix + // | multiply all-one vector + // L + // ========================== + // sum sum sum sum + + assert(p_dev_tmp_data_sum&&p_dev_tmp_data_sum->m_dev); + assert(p_dev_tmp_data_sq_sum&&p_dev_tmp_data_sq_sum->m_dev); + assert(p_dev_tmp_label_info&&p_dev_tmp_label_info->m_dev); + assert(p_dev_tmp_data_sum->m_count>=m_ncol); + assert(m_p_dev_one); + assert(m_p_dev_one->m_count>=m_nrow); + + if(m_data_one_only) + { + //set data now + //if we do in the constructor, it would be "synchronous" due to csr->csc conversion + kernel_set<<m_stream>>>(m_nnz,m_dev_csr_data.dev(),1); + kernel_set<<m_stream>>>(m_nnz,m_dev_csc_data.dev(),1); + } + + /////////////////////////////////////////////////////////////////////////// + //data summary + //scale down data by num_sample + double scale = 1/weight_sum; + + if(m_data_one_only&&!m_label_weighted) + { + kernel_count_column_csc_matrix<<m_stream>>> + (m_ncol,m_dev_csc_cidx.dev(),p_dev_tmp_data_sum->dev()); + + CUBLAS_CHECK(cublasDscal(p_acc->m_cublas_handle,m_ncol,&scale,p_dev_tmp_data_sum->dev(),1)); + + p_dev_tmp_data_sq_sum->from_dev(p_dev_tmp_data_sum->dev(),m_ncol,p_acc->m_stream,0);//if all ones, then square sum is identical + } + else if(m_data_one_only&&m_label_weighted) + { + CUSPARSE_CHECK(cusparseDcsrmv(p_acc->m_cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,m_ncol,m_nrow,m_nnz,&ONE, + m_data_descr,m_dev_csc_data.dev(),m_dev_csc_cidx.dev(),m_dev_csc_ridx.dev(), + m_dev_w.dev(),&ZERO,p_dev_tmp_data_sum->dev())); + + CUBLAS_CHECK(cublasDscal(p_acc->m_cublas_handle,m_ncol,&scale,p_dev_tmp_data_sum->dev(),1)); + + p_dev_tmp_data_sq_sum->from_dev(p_dev_tmp_data_sum->dev(),m_ncol,p_acc->m_stream,0);//if all ones, then square sum is identical + } + else + { + CUSPARSE_CHECK(cusparseDcsrmv(p_acc->m_cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,m_ncol,m_nrow,m_nnz,&ONE, + m_data_descr,m_dev_csc_data.dev(),m_dev_csc_cidx.dev(),m_dev_csc_ridx.dev(), + m_dev_w.dev(),&ZERO,p_dev_tmp_data_sum->dev())); + + //TODO: not optimized yet + CCuMemory data_squre(m_nnz); + + kernel_mult_array(m_dev_csc_data.dev(),m_dev_csc_data.dev(),data_squre.dev(),m_nnz,p_acc->m_stream,p_acc->m_cublas_handle); + + CUSPARSE_CHECK(cusparseDcsrmv(p_acc->m_cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,m_ncol,m_nrow,m_nnz,&ONE, + m_data_descr,data_squre.dev(),m_dev_csc_cidx.dev(),m_dev_csc_ridx.dev(), + m_p_dev_one->dev(),&ZERO,p_dev_tmp_data_sq_sum->dev())); + + CUBLAS_CHECK(cublasDscal(p_acc->m_cublas_handle,m_ncol,&scale,p_dev_tmp_data_sum->dev(),1)); + CUBLAS_CHECK(cublasDscal(p_acc->m_cublas_handle,m_ncol,&scale,p_dev_tmp_data_sq_sum->dev(),1)); + } + + + + ////////////////////////////////////////////////////////////////////// + //label summary + + //label sum at p_dev_tmp_label_info[0] + double* p_label_sum = p_dev_tmp_label_info->dev(); + + if(m_label_weighted) + { + kernel_mult_array(m_dev_y.dev(),m_dev_w.dev(),p_label_sum,m_nrow,p_acc->m_stream,p_acc->m_cublas_handle); + + //weight*label sum at p_dev_tmp_label_info[0] + kernel_reduce_array(p_label_sum,p_label_sum,p_label_sum,m_p_dev_one->dev(),m_nrow,p_acc->m_stream,p_acc->m_cublas_handle); + } + else + { + //label sum at p_dev_tmp_label_info[0] + kernel_reduce_array(m_dev_y.dev(),p_label_sum,p_label_sum,m_p_dev_one->dev(),m_nrow,p_acc->m_stream,p_acc->m_cublas_handle); + } + + //label sq_sum at p_dev_tmp_label_info[1] + double* p_label_sq_sum = p_dev_tmp_label_info->dev()+1; + + //make a square first + kernel_mult_array(m_dev_y.dev(),m_dev_y.dev(),p_label_sq_sum,m_nrow,p_acc->m_stream,p_acc->m_cublas_handle); + + if(m_label_weighted) //then mult weight + kernel_mult_array(m_dev_w.dev(),p_label_sq_sum,p_label_sq_sum,m_nrow,p_acc->m_stream,p_acc->m_cublas_handle); + + //add up + kernel_reduce_array(p_label_sq_sum,p_label_sq_sum,p_label_sq_sum,m_p_dev_one->dev(),m_nrow,p_acc->m_stream,p_acc->m_cublas_handle); + + //don't scale down label as it is a scala value + p_dev_tmp_label_info->copy(p_acc->m_stream,2);//label sum and sq_sum + p_dev_tmp_data_sum->copy(p_acc->m_stream); //data sum + p_dev_tmp_data_sq_sum->copy(p_acc->m_stream); //data sq_sum +} + +void CCuSparseMatrix::gemv(unsigned trans, const double* alpha, CCuMemory* x, const double* beta, CCuMemory* y, CCuAcc* p_acc) +{ + if(trans==CUSPARSE_OPERATION_NON_TRANSPOSE) + { +#ifdef _USE_HYB_ + CUSPARSE_CHECK(cusparseDhybmv(p_acc->m_cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,alpha,m_data_descr,m_hybA,x->dev(),beta,y->dev())); +#else + CUSPARSE_CHECK(cusparseDcsrmv(p_acc->m_cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,m_nrow,m_ncol,m_nnz,alpha, + m_data_descr,m_dev_csr_data.dev(),m_dev_csr_ridx.dev(),m_dev_csr_cidx.dev(), + x->dev(), + beta,y->dev())); +#endif + } + else + { +#ifdef _USE_HYB_ + CUSPARSE_CHECK(cusparseDhybmv(p_acc->m_cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,alpha,m_data_descr,m_hybT,x->dev(),beta,y->dev())); +#else + CUSPARSE_CHECK(cusparseDcsrmv(p_acc->m_cusparse_handle,CUSPARSE_OPERATION_NON_TRANSPOSE,m_ncol,m_nrow,m_nnz,alpha, + m_data_descr,m_dev_csc_data.dev(),m_dev_csc_cidx.dev(),m_dev_csc_ridx.dev(), + x->dev(), + beta,y->dev())); +#endif + } +} + + +void CLinearRegression::standardize(double* data_mean, double* data_std, double label_mean, double label_std) +{ + //keep stat in memory + m_p_matrix->m_label_mean = label_mean; + m_p_matrix->m_label_std = label_std; + + m_p_matrix->m_dev_x_mean.to_dev(data_mean,m_p_matrix->m_ncol,m_stream,0); + m_p_matrix->m_dev_x_std_inv.to_dev(data_std,m_p_matrix->m_ncol,m_stream,0); + + kernel_inv<<m_ncol/THREAD_BLOCK_SIZE+1,THREAD_BLOCK_SIZE,0,m_stream>>> + (m_p_matrix->m_ncol,m_p_matrix->m_dev_x_std_inv.dev(),m_p_matrix->m_dev_x_std_inv.dev()); + + ////////////////////////////////////////////////////////// + //standardize data + m_p_matrix->standardize_trans(this); + m_p_matrix->transform(this); + + ///////////////////////////////////////////////////////// + //standardize label + assert(label_std!=0); + if(label_std!=1) + { + double scale = 1.0/label_std; + CUBLAS_CHECK(cublasDscal(m_cublas_handle,m_p_matrix->m_nrow,&scale,m_p_matrix->m_dev_y.dev(),1)); + } +} + + +void CLinearRegression::evaluate(const double* x, const double weight_sum) +{ + const unsigned nx = m_p_matrix->m_ncol; + const unsigned ny = m_p_matrix->m_nrow; + const unsigned fx_size = ny/THREAD_BLOCK_SIZE+1; + + m_p_dev_tmp_x->to_dev(x,nx,m_stream,0); + m_p_dev_tmp_m->from_dev(m_p_matrix->m_dev_y.dev(),ny,m_stream,0); + + //effective X + kernel_mult_array(m_p_dev_tmp_x->dev(),m_p_matrix->m_dev_x_std_inv.dev(),m_p_dev_tmp_x->dev(),nx,m_stream,m_cublas_handle); + + double* sum = m_p_dev_tmp_x->dev()+nx; + double* offset = sum+1; + + CUBLAS_CHECK(cublasDgemv(m_cublas_handle,CUBLAS_OP_N,1,nx,&ONE,m_p_dev_tmp_x->dev(),1,m_p_matrix->m_dev_x_mean.dev(),1,&ZERO,sum,1)); + + if(m_intercept) + { + kernel_set<<<1,1,0,m_stream>>>(1,offset,m_p_matrix->m_label_mean/m_p_matrix->m_label_std); + CUBLAS_CHECK(cublasDaxpy(m_cublas_handle,1,&MONE,sum,1,offset,1)); + } + else + kernel_set<<<1,1,0,m_stream>>>(1,offset,0); + + ///////////main/////////// + //Dw-l => margin + m_p_matrix->gemv(CUBLAS_OP_N,&ONE,m_p_dev_tmp_x,&MONE,m_p_dev_tmp_m,this); + + if(m_intercept) + kernel_inc<<>>(ny,m_p_dev_tmp_m->dev(),offset); + + + CCuMemory* p_dev_tmp_wm = NULL; + + //weight*margin + if(m_p_matrix->m_label_weighted) + { + assert(m_p_matrix->m_dev_w.m_count==ny); + + p_dev_tmp_wm = &m_dev_buf_xy1; + + kernel_mult_array(m_p_dev_tmp_m->dev(),m_p_matrix->m_dev_w.dev(),p_dev_tmp_wm->dev(),ny,m_stream,m_cublas_handle); + } + else + { + p_dev_tmp_wm = m_p_dev_tmp_m; + } + + //(Dw-l)D => gradient and scale down gradient + double scale = 1/weight_sum; + m_p_matrix->gemv(CUBLAS_OP_T,&scale,p_dev_tmp_wm,&ZERO,m_p_dev_tmp_g,this); + + //(Dw-l)^2 => cost + CUBLAS_CHECK(cublasDgemv(m_cublas_handle,CUBLAS_OP_N,1,ny,&HALF,p_dev_tmp_wm->dev(),1, + m_p_dev_tmp_m->dev(),1,&ZERO,m_p_dev_tmp_fx->dev(),1)); + + //debug(); + //CSystemInfo::log("weight_sum =%f,m_p_dev_tmp_g[0]=%f\n",weight_sum,m_p_dev_tmp_g->m_host[0]); + //CSystemInfo::log("m_p_dev_tmp_fx[0]=%f\n",m_p_dev_tmp_fx->m_host[0]); + + m_p_dev_tmp_g->copy(m_stream); + m_p_dev_tmp_fx->copy(m_stream,1); +} + +void CLinearRegression::sq_err(const double* w, const double intercept, double* fx, double* label) +{ + //compute squared error + const unsigned nx = m_p_matrix->m_ncol; + const unsigned ny = m_p_matrix->m_nrow; + const unsigned fx_size = ny/THREAD_BLOCK_SIZE+1; + + m_p_dev_tmp_x->to_dev(w,nx,m_stream,0); + m_p_dev_tmp_m->from_dev(m_p_matrix->m_dev_y.dev(),ny,m_stream,0); + + if(m_p_matrix->m_label_std!=1) + { + double scale = m_p_matrix->m_label_std; + CUBLAS_CHECK(cublasDscal(m_cublas_handle,m_p_matrix->m_nrow,&scale,m_p_dev_tmp_m->dev(),1)); + } + + m_p_matrix->gemv(CUBLAS_OP_N,&ONE,m_p_dev_tmp_x,&MONE,m_p_dev_tmp_m,this); + + kernel_inc<<>>(ny,m_p_dev_tmp_m->dev(),intercept); + + kernel_mult_array(m_p_dev_tmp_m->dev(),m_p_dev_tmp_m->dev(),m_p_dev_tmp_m->dev(),ny,m_stream,m_cublas_handle); + + kernel_reduce_array(m_p_dev_tmp_m->dev(),m_p_dev_tmp_m->dev(),m_p_dev_tmp_m->dev(),m_p_matrix->m_p_dev_one->dev(),ny,m_stream,m_cublas_handle); + + m_p_dev_tmp_m->to_host(fx,1,m_stream); +} + +void CLogisticRegression::standardize(double* data_mean, double* data_std, double label_mean, double label_std) +{ + //; + + //float *data; + //cudaMalloc(&data, 1048576 * sizeof(float)); + //// launch one worker kernel per stream + //kernel<<<1, 64, 0, stream>>>(data, 1048576); + + m_p_matrix->m_dev_x_std_inv.to_dev(data_std,m_p_matrix->m_ncol,m_stream,0); + + kernel_inv<<m_ncol/THREAD_BLOCK_SIZE+1,THREAD_BLOCK_SIZE,0,m_stream>>> + (m_p_matrix->m_ncol,m_p_matrix->m_dev_x_std_inv.dev(),m_p_matrix->m_dev_x_std_inv.dev()); + + ////////////////////////////////////////////////////////// + //standardize data + m_p_matrix->standardize_orig(this); + m_p_matrix->standardize_trans(this); + m_p_matrix->transform(this); +} + +void CLogisticRegression::evaluate(const double* x, const double weight_sum) +{ + const unsigned nx = m_p_matrix->m_ncol; + const unsigned ny = m_p_matrix->m_nrow; + const unsigned fx_size = ny/THREAD_BLOCK_SIZE+1; + + //if m_intercept is given, it can be 1 longer than nx + m_p_dev_tmp_x->to_dev(x,nx+m_intercept,m_stream,0); + + //Dw => margin to be used in logistic cost function + m_p_matrix->gemv(CUSPARSE_OPERATION_NON_TRANSPOSE,&MONE,m_p_dev_tmp_x,&ZERO,m_p_dev_tmp_m,this); + + if(m_intercept) //decrease the margin by the last in x + kernel_dec<<<(ny+1)/THREAD_BLOCK_SIZE+1,THREAD_BLOCK_SIZE,0,m_stream>>>(ny,m_p_dev_tmp_m->dev(),m_p_dev_tmp_x->dev()+nx); + + //for every sample, evaluate gradient and cost using logistic cost function + //model: h(v) = 1over1pExp(v) + //cost : H(Dw,l) = -log (h(Dw)) if l==1 + // -log (1-h(Dw)) if l==0 + //input : m_p_dev_buf_xy1 => y l + // m_p_dev_buf_y1 => margin Dw + //output: m_p_dev_buf_xy1 <= h(Dw)-l for gradient computation + // m_p_dev_buf_y1 <= H(Dw,I) for cost estimation + kernel_logistic_fx<<>> + (ny,m_p_dev_tmp_m->dev(),m_p_matrix->m_dev_y.dev(),m_p_dev_tmp_t->dev()); + + //(h(Dw)-l)D => gradient and scale down + double scale = 1/weight_sum; + m_p_matrix->gemv(CUSPARSE_OPERATION_TRANSPOSE,&scale,m_p_dev_tmp_t,&ZERO,m_p_dev_tmp_g,this); + + if(m_intercept) //add up m_p_dev_tmp_t to the last in m_p_dev_tmp_g + kernel_reduce_array(m_p_dev_tmp_t->dev(),m_p_dev_tmp_t->dev(),m_p_dev_tmp_g->dev()+nx,m_p_matrix->m_p_dev_one->dev(),ny,m_stream,m_cublas_handle,scale); + + + //add up H(Dw,I) for the total cost + kernel_reduce_array(m_p_dev_tmp_fx->dev(),m_p_dev_tmp_fx->dev(),m_p_dev_tmp_fx->dev(),m_p_matrix->m_p_dev_one->dev(),ny,m_stream,m_cublas_handle); + + m_p_dev_tmp_g->copy(m_stream); + m_p_dev_tmp_fx->copy(m_stream,1); +} + +void CLogisticRegression::predict(const double* w, const double intercept, double* fx, double* label) +{ + const unsigned nx = m_p_matrix->m_ncol; + const unsigned ny = m_p_matrix->m_nrow; + const unsigned fx_size = ny/THREAD_BLOCK_SIZE+1; + + m_p_dev_tmp_x->to_dev(w,nx,m_stream,0); + + if(m_p_matrix->m_dev_x_std_inv.m_count) + { + //need to invert first + kernel_inv<<>> + (nx,m_p_dev_tmp_t->dev(),m_p_matrix->m_dev_x_std_inv.dev()); + + kernel_mult_array(m_p_dev_tmp_x->dev(),m_p_dev_tmp_t->dev(),m_p_dev_tmp_x->dev(),nx,m_stream,m_cublas_handle); + } + + m_p_matrix->gemv(CUSPARSE_OPERATION_NON_TRANSPOSE,&MONE,m_p_dev_tmp_x,&ZERO,m_p_dev_tmp_m,this); + + kernel_inc<<>> + (ny,m_p_dev_tmp_t->dev(),intercept); + + kernel_logit<<>> + (ny,m_p_dev_tmp_m->dev(),m_p_dev_tmp_t->dev()); + + m_p_matrix->m_dev_y.to_host(label,ny,m_stream); + m_p_dev_tmp_t->to_host(fx,ny,m_stream); +} + +double CLogisticRegression::aug(double* label, double * fx, unsigned n) +{ + CSystemInfo t(__FUNCTION__); + + radix_sort(fx,label,n); + t.timer_check(); + + std::reverse(fx,fx+n); + std::reverse(label,label+n); + + t.timer_check(); + /* Count number of positive and negative examples first */ + unsigned N=0,P=0; + for(unsigned i = 0; i < n ; i++) + { + if(label[i] == 1) P++; + } + + N = n-P; + + /* Then calculate the actual are under the ROC curve */ + double fprev = INT_MIN; + double A = 0; + unsigned FP = 0, + TP = 0, + FPprev = 0, + TPprev = 0; + + double _fx; + double _label; + + for(unsigned i = 0 ; i < n; i++) + { + _fx = fx[i]; + _label = label[i]; + if(_fx != fprev) + { + /* Divide area here already : a bit slower, but gains in precision and avoids overflows */ + assert(FP>=FPprev); + + A += double(FP-FPprev)/N*double(TP+TPprev)/P; + fprev = _fx; + FPprev = FP; + TPprev = TP; + } + + int label_one = _label==1; + TP += label_one; + FP += 1-label_one; + } + + A += double(N-FPprev)/N*double(P+TPprev)/P; + /* Not the same as Fawcett's original pseudocode though I think his contains a typo */ + t.timer_check(); + + return A*0.5; +} + + +// +//void CFactorizationMachine::evaluate(const double* x) +//{ +// +//} diff --git a/cuAcc/cuAcc_function.h b/cuAcc/cuAcc_function.h new file mode 100755 index 0000000..27a17a7 --- /dev/null +++ b/cuAcc/cuAcc_function.h @@ -0,0 +1,269 @@ +#ifndef __CUACC_H__ +#define __CUACC_H__ + +#include "cuAcc_base.h" + +class CCuMatrix { +public: + unsigned m_ncol; + unsigned m_nrow; //sample size + + bool m_data_one_only; + bool m_label_weighted; + + static std::vector > m_cache_dev_one_per_device; + + CCuMemory* m_p_dev_one; //let's keep in the memory (don't delete) + CCuMemory m_dev_y; //output vector + CCuMemory m_dev_x_std_inv; + CCuMemory m_dev_x_mean; + CCuMemory m_dev_w; //weight vector + double m_label_mean; + double m_label_std; + + CCuMatrix(const int device, const unsigned ncol, const unsigned nrow,const double* y, cudaStream_t stream); + + virtual ~CCuMatrix() + { + } + + virtual void summarize(double weight_sum, CCuMemory* p_dev_tmp_data_sum, CCuMemory* p_dev_tmp_data_sq_sum, CCuMemory* p_dev_tmp_label_info, CCuAcc* p_acc) = 0; + virtual void gemv(unsigned trans, const double* alpha, CCuMemory* x, const double* beta, CCuMemory* y, CCuAcc* p_acc) = 0; + virtual void standardize_orig(CCuAcc* p_acc) = 0; + virtual void standardize_trans(CCuAcc* p_acc) = 0; + virtual void transform(CCuAcc* p_acc) = 0; + + void weighten(double* weight, CCuMemory* p_dev_tmp_weight_sum, CCuAcc* p_acc); +}; + +////////////////////////////////////////////// +//dense matrix +class CCuDenseMatrix : public CCuMatrix{ +public: + CCuMemory m_dev_data; //matrix storage (on host and device partitioned way) + + CCuDenseMatrix(const int device,const unsigned ny, const unsigned nx, const double* data, const double* y, cudaStream_t stream) + :CCuMatrix(device,nx,ny,y,stream) + { + m_dev_data.resize(ny*nx); + m_dev_data.to_dev(data,ny*nx,stream,0); + } + + virtual ~CCuDenseMatrix(){} + + virtual void debug(){} + virtual void summarize(double weight_sum, CCuMemory* p_dev_tmp_data_sum, CCuMemory* p_dev_tmp_data_sq_sum, CCuMemory* p_dev_tmp_label_info, CCuAcc* p_acc){} + virtual void gemv(unsigned trans, const double* alpha, CCuMemory* x, const double* beta, CCuMemory* y, CCuAcc* p_acc){} + virtual void standardize_orig(CCuAcc* p_acc){} + virtual void standardize_trans(CCuAcc* p_acc){} + virtual void transform(CCuAcc* p_acc){} + + static void write(const char* filename, const unsigned ny, const unsigned nx, const double* data, const double* y, const double* x); + static void read(const char* filename, unsigned& ny, unsigned& nx, double*& data, double*& y, double*& x); +}; + +////////////////////////////////////////////// +//sparse matrix +class CCuSparseMatrix : public CCuMatrix{ +public: + unsigned m_nnz; + + cusparseMatDescr_t m_data_descr; + cusparseHybMat_t m_hybA; + cusparseHybMat_t m_hybT; + + CCuMemory m_dev_csr_data; //matrix storage in CRS foramt for cusparse, only on device + CCuMemory m_dev_csr_ridx; + CCuMemory m_dev_csr_cidx; + + CCuMemory m_dev_csc_data; //matrix storage in CSC foramt for cusparse, only on device + CCuMemory m_dev_csc_ridx; + CCuMemory m_dev_csc_cidx; + + CCuSparseMatrix(const int device,const unsigned ny, const unsigned nx, const double* csr_data, const double* y, const int* csr_ridx, const int* csr_cidx, const unsigned nnz, cudaStream_t stream, cusparseHandle_t cusparse_handle); + + virtual ~CCuSparseMatrix() + { + cusparseDestroyMatDescr(m_data_descr); + cusparseDestroyHybMat(m_hybA); + cusparseDestroyHybMat(m_hybT); + } + + virtual void summarize(double weight_sum, + CCuMemory* p_dev_tmp_data_sum, CCuMemory* p_dev_tmp_data_sq_sum, + CCuMemory* p_dev_tmp_label_info, CCuAcc* p_acc); + virtual void gemv(unsigned trans, const double* alpha, CCuMemory* x, const double* beta, CCuMemory* y, CCuAcc* p_acc); + virtual void standardize_orig(CCuAcc* p_acc); + virtual void standardize_trans(CCuAcc* p_acc); + virtual void transform(CCuAcc* p_acc); + + + static void write(const char* filename, const unsigned ny, const unsigned nx, const double* csr_data, const double* y, const int* csr_ridx, const int* csr_cidx, const unsigned nnz, const double* weight); + static void read(const char* filename, unsigned& ny, unsigned& nx, double*& csr_data, double*& y, int*& csr_ridx, int*& csr_cidx, unsigned& nnz, double*& weight); +}; + +class CMachineLearning : public CCuAcc{ +public: + + enum ALGO { + LINEAR_REGRESSION, + LOGISTIC_REGRESSION, + FACTORIZATION_MACHINE + }; + + bool m_intercept; //intercept term + + CCuMatrix* m_p_matrix; + CCuMemory m_dev_buf_xy1; //buffer at the size of max(x,y) + CCuMemory m_dev_buf_x2; //buffer at the size of x + CCuMemory m_dev_buf_y1; //buffer at the size of y + + CCuMemory* m_p_dev_tmp_data_sum; + CCuMemory* m_p_dev_tmp_data_sq_sum; + CCuMemory* m_p_dev_tmp_label_info; + + + CMachineLearning(int device, const bool intercept) + : CCuAcc(device), m_intercept(intercept){} + + virtual ~CMachineLearning() + { + delete m_p_matrix; + } + + + virtual void standardize(double* data_mean, double* data_std, double label_mean, double label_std) = 0; + virtual void evaluate(const double* x, const double weight_sum) = 0; + virtual double get_fx() { return *(m_dev_buf_y1.host());} + virtual double* get_g() { return m_dev_buf_x2.host();} + virtual double aug(double* label, double * fx,unsigned n) = 0; + virtual void predict(const double* w, const double intercept, double* fx, double* label) = 0; + virtual void sq_err(const double* w, const double intercept, double* fx, double* label) = 0; + + void initialize(CCuMatrix* p_matrix) + { + m_p_matrix = p_matrix; + + m_dev_buf_xy1.resize(max(p_matrix->m_ncol,p_matrix->m_nrow)+1); + m_dev_buf_x2.resize(p_matrix->m_ncol+2); + m_dev_buf_y1.resize(p_matrix->m_nrow+1); + + rename_memory(&m_dev_buf_x2,&m_dev_buf_xy1,&m_dev_buf_y1); + } + + void rename_memory(CCuMemory* p_dev_tmp_data_sum,CCuMemory* p_dev_tmp_data_sq_sum,CCuMemory* p_dev_tmp_label_info) + { + m_p_dev_tmp_data_sum = p_dev_tmp_data_sum; + m_p_dev_tmp_data_sq_sum = p_dev_tmp_data_sq_sum; + m_p_dev_tmp_label_info = p_dev_tmp_label_info; + } + + virtual void debug() + { + m_dev_buf_xy1.copy(0); + m_dev_buf_x2.copy(0); + m_dev_buf_y1.copy(0); + + m_p_matrix->m_dev_x_std_inv.copy(0); + m_p_matrix->m_dev_x_mean.copy(0); + m_p_matrix->m_dev_y.copy(0); + + if(m_p_matrix->m_dev_w.m_count) + m_p_matrix->m_dev_w.copy(0); + } +}; + +class CLinearRegression : public CMachineLearning{ +public: + //rename device memory + CCuMemory* m_p_dev_tmp_x; + CCuMemory* m_p_dev_tmp_g; + CCuMemory* m_p_dev_tmp_fx; + CCuMemory* m_p_dev_tmp_m; + + CLinearRegression(const int device, const unsigned ny, const unsigned nx, const double* data, const double* y, const bool intercept) + :CMachineLearning(device, intercept) + { + initialize(new CCuDenseMatrix(device,ny,nx,data,y,m_stream)); + rename_memory(&m_dev_buf_x2,&m_dev_buf_x2,&m_dev_buf_y1,&m_dev_buf_y1); + } + + CLinearRegression(const int device, const unsigned ny, const unsigned nx, const double* data, const double* y, const int* csr_ridx, const int* csr_cidx, const unsigned nnz, const bool intercept) + :CMachineLearning(device, intercept) + { + initialize(new CCuSparseMatrix(device,ny,nx,data,y,csr_ridx,csr_cidx,nnz,m_stream,m_cusparse_handle)); + rename_memory(&m_dev_buf_x2,&m_dev_buf_x2,&m_dev_buf_y1,&m_dev_buf_y1); + } + + virtual ~CLinearRegression(){} + + virtual void evaluate(const double* x, const double weight_sum); + virtual void standardize(double* data_mean, double* data_std, double label_mean, double label_std); + virtual void sq_err(const double* w, const double intercept, double* fx, double* label); + virtual void predict(const double* w, const double intercept, double* fx, double* label){assert(false);} + virtual double aug(double* label, double * fx, unsigned n){assert(false);return -1;} + + void rename_memory(CCuMemory* p_dev_tmp_x, CCuMemory* p_dev_tmp_g, CCuMemory* p_dev_tmp_m, CCuMemory* p_dev_tmp_fx) + { + //rename device memory + m_p_dev_tmp_x = p_dev_tmp_x; + m_p_dev_tmp_g = p_dev_tmp_g; + m_p_dev_tmp_m = p_dev_tmp_m; //margin + m_p_dev_tmp_fx = p_dev_tmp_fx; + } +}; + +class CLogisticRegression : public CMachineLearning{ +public: + CCuMemory* m_p_dev_tmp_x; + CCuMemory* m_p_dev_tmp_g; + CCuMemory* m_p_dev_tmp_m; + CCuMemory* m_p_dev_tmp_fx; + CCuMemory* m_p_dev_tmp_t; + + CLogisticRegression(const int device, const unsigned ny, const unsigned nx, const double* data, const double* y, const bool intercept) + :CMachineLearning(device ,intercept) + { + initialize(new CCuDenseMatrix(device,ny,nx,data,y,m_stream)); + rename_memory(&m_dev_buf_x2,&m_dev_buf_x2,&m_dev_buf_y1,&m_dev_buf_y1,&m_dev_buf_xy1); + } + + CLogisticRegression(const int device, const unsigned ny, const unsigned nx, const double* data, const double* y, const int* csr_ridx, const int* csr_cidx, const unsigned nnz, const bool intercept) + :CMachineLearning(device, intercept) + { + initialize(new CCuSparseMatrix(device,ny,nx,data,y,csr_ridx,csr_cidx,nnz,m_stream,m_cusparse_handle)); + rename_memory(&m_dev_buf_x2,&m_dev_buf_x2,&m_dev_buf_y1,&m_dev_buf_y1,&m_dev_buf_xy1); + } + + virtual ~CLogisticRegression(){} + + virtual void evaluate(const double* x, const double weight_sum); + virtual void standardize(double* data_mean, double* data_std, double label_mean, double label_std); + virtual void sq_err(const double* w, const double intercept, double* fx, double* label){assert(false);} + virtual void predict(const double* w, const double intercept, double* fx, double* label); + virtual double aug(double* label, double * fx, unsigned n); + + void rename_memory(CCuMemory* p_dev_tmp_x, CCuMemory* p_dev_tmp_g, CCuMemory* p_dev_tmp_m, CCuMemory* p_dev_tmp_fx, CCuMemory* p_dev_tmp_t) + { + //rename device memory + m_p_dev_tmp_x = p_dev_tmp_x; + m_p_dev_tmp_g = p_dev_tmp_g; + m_p_dev_tmp_m = p_dev_tmp_m; //margin + m_p_dev_tmp_fx = p_dev_tmp_fx; + m_p_dev_tmp_t = p_dev_tmp_t; //multiplier + } +}; + +//class CFactorizationMachine : public CCuDenseMatrix { +//public: +// CFactorizationMachine(const int device,const unsigned ny, const unsigned nx, const double* data, const double* y) +// :CCuDenseMatrix(device,ny,nx,data,y){} +// +// virtual ~CFactorizationMachine(){} +// +// virtual void standardize(double* std){ CCuDenseMatrix::standardize(std);} +// virtual void summarize(){ CCuDenseMatrix::summarize();} +// virtual void evaluate(const double* x) ; +//}; + +#endif \ No newline at end of file diff --git a/cuAcc/cuAcc_updater.cu b/cuAcc/cuAcc_updater.cu new file mode 100755 index 0000000..83839fe --- /dev/null +++ b/cuAcc/cuAcc_updater.cu @@ -0,0 +1,386 @@ +#include "cuAcc_updater.h" + +#include +#include + + +void CUpdater::write(const char* filename, const unsigned m_nx, const double* previousWeights, const double* x, const double* cgrad, long mini_batch_size, double step_size, int iter, double lambda) +{ + FILE* pFile = fopen(filename,"wb"); + if(!pFile) + { + CSystemInfo::log("cannot write to [%s]\n",filename); + return; + } + + fwrite(&m_nx,sizeof(unsigned),1,pFile); + + fwrite(previousWeights,sizeof(double),m_nx,pFile); + fwrite(x,sizeof(double),m_nx,pFile); + fwrite(cgrad,sizeof(double),m_nx,pFile); + + + fwrite(&mini_batch_size,sizeof(long),1,pFile); + fwrite(&step_size,sizeof(double),1,pFile); + + fwrite(&iter,sizeof(int),1,pFile); + fwrite(&lambda,sizeof(double),1,pFile); + + fclose(pFile); +} + +void CUpdater::read(const char* filename, unsigned& m_nx, double*&previousWeights, double*&x, double*&cgrad, long& mini_batch_size, double& step_size, int& iter, double& lambda) +{ + FILE* pFile = fopen(filename,"rb"); + if(!pFile) + { + CSystemInfo::log("cannot read [%s]\n",filename); + return; + } + + fread(&m_nx,sizeof(unsigned),1,pFile); + + previousWeights = new double[m_nx]; + x = new double[m_nx]; + cgrad = new double[m_nx]; + + fread(previousWeights,sizeof(double),m_nx,pFile); + fread(x,sizeof(double),m_nx,pFile); + fread(cgrad,sizeof(double),m_nx,pFile); + + fread(&mini_batch_size,sizeof(long),1,pFile); + fread(&step_size,sizeof(double),1,pFile); + + fread(&iter,sizeof(int),1,pFile); + fread(&lambda,sizeof(double),1,pFile); + + fclose(pFile); +} + +void CUpdater::convergence_check(CCuMemory& dev_x, double* convergence_info) +{ + ////////////////////////////////////////////////////////// + //convergence timer_check + double* solutionVecDiff = convergence_info; + double* norm_currentBDV = convergence_info+1; + + CUBLAS_CHECK(cublasDaxpy(m_cublas_handle,m_nx,&MONE,dev_x.dev(),1,m_dev_prev_x.dev(),1)); + CUBLAS_CHECK(cublasDnrm2(m_cublas_handle,m_nx,m_dev_prev_x.dev(),1,solutionVecDiff)); + CUBLAS_CHECK(cublasDnrm2(m_cublas_handle,m_nx,dev_x.dev(),1,norm_currentBDV)); +} + +void CUpdater::convergence_check(double* x, double* convergence_info) +{ + ////////////////////////////////////////////////////////// + //convergence timer_check + double* solutionVecDiff = convergence_info; + double* norm_currentBDV = convergence_info+1; + + cblas_daxpy(m_nx,-1,x,1,m_dev_prev_x.host(),1); + + *solutionVecDiff = cblas_dnrm2(m_nx,m_dev_prev_x.host(),1); + *norm_currentBDV = cblas_dnrm2(m_nx,x,1); +} + +double CSimpleUpdater::initialize(unsigned nx, double* x, double lambda) +{ + + return 0; +} + +double CSimpleUpdater::update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info) +{ + CSystemInfo tm(__FUNCTION__); + + + + return 0; +} + +double CL1Updater::initialize(unsigned nx,double* x,double lambda) +{ + m_nx = nx; + m_dev_prev_x.resize(m_nx); + m_dev_prev_x.to_dev(x,m_nx,m_stream,0); + + double norm; + CUBLAS_CHECK(cublasDasum(m_cublas_handle,m_nx,m_dev_prev_x.dev(),1,&norm)); + + CUDA_CHECK(cudaStreamSynchronize(m_stream)); + + return lambda*norm; +} + +__global__ void kernel_L1Update_compute(unsigned nx,double* x, const double soft_threshold) +{ + const unsigned i = threadIdx.x+blockDim.x*blockIdx.x; + + if(i>=nx) return; + + const double v = x[i]; + + if(v>soft_threshold) x[i] = v-soft_threshold; + else if(v<-soft_threshold) x[i] = v+soft_threshold; + else x[i] = 0; +} + +double CL1Updater::update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info) +{ + CSystemInfo tm(__FUNCTION__); + + CUDA_CHECK(cudaSetDevice(m_device)); + + CCuMemory dev_x(x,m_nx,m_stream); + CCuMemory dev_g(cgrad,m_nx,m_stream); + + tm.timer_check(); + + ////////////////////////////////////////////////// + //L1 update + //proximal operator to capture L1 regularization + // w = prox ( w-alpha/batch_size*gradient_sum ) + + double alpha = step_size/sqrt(double(iter)); + double scale = -alpha/mini_batch_size; + + CUBLAS_CHECK(cublasDaxpy(m_cublas_handle,m_nx,&scale,dev_g.dev(),1,dev_x.dev(),1)); + + // for lasso, prox(x) is known that + // prox (x) = x-k if x>k + // -(x-k) if x<-k + // 0 otherwise + kernel_L1Update_compute<<>> + (m_nx,dev_x.dev(),lambda * alpha); + + //now, dev_x is read-only, copy back to host + //TODO: is this synchronous? + dev_x.to_host(x,m_nx,m_stream); + + return compute_convergence_regularization(dev_x,iter,lambda,convergence_info); +} + +double CL1Updater::compute_convergence_regularization(CCuMemory& dev_x, int iter, double lambda, double*& convergence_info) +{ + ///////////////////////////////////////////////////////////// + //evaluate the actual regularization value + double regVal; + CUBLAS_CHECK(cublasDasum(m_cublas_handle,m_nx,dev_x.dev(),1,®Val)); + if(iter>1) + convergence_check(dev_x,convergence_info); + + CUDA_CHECK(cudaStreamSynchronize(m_stream)); + + ///////////////////////////////////////////////////////////// + //keep the last weight for the next iteration + m_dev_prev_x.from_dev(dev_x.dev(),dev_x.m_count,m_stream,0); + + return lambda*regVal; +} + + +double CL1AdaDeltaUpdater::initialize(unsigned nx,double* x,double lambda) +{ + CAdaDelta::initialize(nx); + + return CL1Updater::initialize(nx,x,lambda); +} + +__global__ void kernel_L1AdaDeltaUpdate_compute(unsigned nx, double* x, double* g, + double* avg_squared_g, double* avg_squared_delta_x, const double step_size, const double lambda, const double rho, const double eps) +{ + const unsigned i = threadIdx.x+blockDim.x*blockIdx.x; + + if(i>=nx) return; + + const double cur_x = x[i]; + + avg_squared_g[i] = rho*avg_squared_g[i]+(1-rho)*pow(g[i],2); + + double alpha = step_size*sqrt(avg_squared_delta_x[i]+eps)/sqrt(avg_squared_g[i]+eps); + + x[i] = cur_x-alpha*g[i]; + + double soft_threshold = lambda * alpha; + + //now apply prox(x) + const double v = x[i]; + + if(v>soft_threshold) x[i] = v-soft_threshold; + else if(v<-soft_threshold) x[i] = v+soft_threshold; + else x[i] = 0; + + avg_squared_delta_x[i] = rho*avg_squared_delta_x[i]+(1-rho)*pow(x[i]-cur_x,2); +} + + +double CL1AdaDeltaUpdater::update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info) +{ + CSystemInfo tm(__FUNCTION__); + + CUDA_CHECK(cudaSetDevice(m_device)); + + CCuMemory dev_x(x,m_nx,m_stream); + CCuMemory dev_g(cgrad,m_nx,m_stream); + + tm.timer_check(); + + ////////////////////////////////////////////////// + //L1 AdaDelta update + //proximal operator to capture L1 regularization + // w = prox ( w-alpha/batch_size*gradient_sum ) + // for lasso, it is known that + // prox (x) = x-k if x>k + // -(x-k) if x<-k + // 0 otherwise + + double scale1 = 1.0/mini_batch_size; + double rho = 0.1; + double eps = 1e-2; + + CUBLAS_CHECK(cublasDscal(m_cublas_handle,m_nx,&scale1,dev_g.dev(),1)); + + kernel_L1AdaDeltaUpdate_compute<<>> + (m_nx,dev_x.dev(),dev_g.dev(),m_dev_avg_squared_g.dev(),m_dev_avg_squared_delta_x.dev(), + step_size,lambda,rho,eps); + + dev_x.to_host(x,m_nx,m_stream); + + return compute_convergence_regularization(dev_x,iter,lambda,convergence_info); +} + + +double CL2Updater::initialize(unsigned nx,double* x,double lambda) +{ + CUDA_CHECK(cudaSetDevice(m_device)); + m_nx = nx; + m_dev_prev_x.resize(m_nx); + m_dev_prev_x.to_dev(x,m_nx,m_stream,0); + + double norm; + CUBLAS_CHECK(cublasDnrm2(m_cublas_handle,m_nx,m_dev_prev_x.dev(),1,&norm)); + + return 0.5*lambda*norm*norm; +} + +double CL2Updater::update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info) +{ + CSystemInfo tm(__FUNCTION__); + + CUDA_CHECK(cudaSetDevice(m_device)); + + CCuMemory dev_x(x,m_nx,m_stream); + CCuMemory dev_g(cgrad,m_nx,m_stream); + + tm.timer_check(); + + //////////////////////////////////////////////////// + //L2 update + // w = w - alpha * gradient + //where + // gradient = graident_sum/batch_size+lambda/batch_size*w + //in SPARK, + // gradient = graident_sum/batch_size+lambda*w + //perhaps, it is just a weight factor, so won't matter much with proper lambda + //now, + // w = w - alpha [ (gradient_sum/batch_size) + lambda*w ] + // = (1-alpha*lambda)w - (alpha/batch_size)*gradient_sum + // = scale1*w + (scale2)*gradient_sum + double alpha = step_size/sqrt(double(iter)); + double scale1 = 1.0-alpha*lambda; + double scale2 = -alpha/mini_batch_size; + + CUBLAS_CHECK(cublasDscal(m_cublas_handle,m_nx,&scale1,dev_x.dev(),1)); + CUBLAS_CHECK(cublasDaxpy(m_cublas_handle,m_nx,&scale2,dev_g.dev(),1,dev_x.dev(),1)); + + //now, dev_x is read-only, copy back to host + //TODO, is this "A"synchronous? + dev_x.to_host(x,m_nx,m_stream); + + return compute_convergence_regularization(dev_x,iter,lambda,convergence_info); +} + +double CL2Updater::compute_convergence_regularization(CCuMemory& dev_x, int iter, double lambda, double*& convergence_info) +{ + ///////////////////////////////////////////////////////////// + //evaluate the actual regularization value + double regVal; + if(iter>1) + { + convergence_check(dev_x,convergence_info); + regVal = convergence_info[1]; + } + else + CUBLAS_CHECK(cublasDnrm2(m_cublas_handle,m_nx,dev_x.dev(),1,®Val)); + + CUDA_CHECK(cudaStreamSynchronize(m_stream)); + + ///////////////////////////////////////////////////////////// + //keep the last weight for the next iteration + m_dev_prev_x.from_dev(dev_x.dev(),dev_x.m_count,m_stream,0); + + return 0.5*lambda*regVal*regVal; +} + +double CL2AdaDeltaUpdater::initialize(unsigned nx,double* x,double lambda) +{ + CAdaDelta::initialize(nx); + + return CL2Updater::initialize(nx,x,lambda); +} + +__global__ void kernel_L2AdaDeltaUpdate_compute(unsigned nx, double* x, double* g, + double* avg_squared_g, double* avg_squared_delta_x, const double step_size, const double rho, const double eps) +{ + const unsigned i = threadIdx.x+blockDim.x*blockIdx.x; + + if(i>=nx) return; + + double cur_x = x[i]; + + avg_squared_g[i] = rho*avg_squared_g[i]+(1-rho)*pow(g[i],2); + + double alpha = step_size*sqrt(avg_squared_delta_x[i]+eps)/sqrt(avg_squared_g[i]+eps); + + x[i] = cur_x-alpha*g[i]; + + avg_squared_delta_x[i] = rho*avg_squared_delta_x[i]+(1-rho)*pow(x[i]-cur_x,2); +} + + +double CL2AdaDeltaUpdater::update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info) +{ + CSystemInfo tm(__FUNCTION__); + + CUDA_CHECK(cudaSetDevice(m_device)); + + CCuMemory dev_x(x,m_nx,m_stream); + CCuMemory dev_g(cgrad,m_nx,m_stream); + + tm.timer_check(); + + ////////////////////////////////////////////////////// + // L2 AdaDelta + // w_(i+1) = w_(i) - {RMS[diff w]_(i-1)/RMS[g]_(i)} grad + // L2 updater + // w = w - alpha [ (gradient_sum/batch_size) + (lambda)w ] + // ==================> grad <=============== + // so alpha = {RMS[w]_(i-1)/RMS[g]_(i)} in L2 + + + double scale1 = 1.0/mini_batch_size; + double rho = 0.25; + double eps = 1e-2; + + CUBLAS_CHECK(cublasDscal(m_cublas_handle,m_nx,&scale1,dev_g.dev(),1)); + CUBLAS_CHECK(cublasDaxpy(m_cublas_handle,m_nx,&lambda,dev_x.dev(),1,dev_g.dev(),1)); + + kernel_L2AdaDeltaUpdate_compute<<>> + (m_nx,dev_x.dev(),dev_g.dev(),m_dev_avg_squared_g.dev(),m_dev_avg_squared_delta_x.dev(), + step_size,rho,eps); + + dev_x.to_host(x,m_nx,m_stream); + + return compute_convergence_regularization(dev_x,iter,lambda,convergence_info); +} + + \ No newline at end of file diff --git a/cuAcc/cuAcc_updater.h b/cuAcc/cuAcc_updater.h new file mode 100755 index 0000000..58ca1db --- /dev/null +++ b/cuAcc/cuAcc_updater.h @@ -0,0 +1,105 @@ +#ifndef __CUACC_UPDATER_H__ +#define __CUACC_UPDATER_H__ + +#include "cuAcc_base.h" + +class CAdaDelta { +public: + CCuMemory m_dev_avg_squared_g; + CCuMemory m_dev_avg_squared_delta_x; + + //delta history + void initialize(unsigned nx) + { + m_dev_avg_squared_g.resize(nx); + m_dev_avg_squared_delta_x.resize(nx); + + m_dev_avg_squared_g.memset(0); + m_dev_avg_squared_delta_x.memset(0); + } + + virtual ~CAdaDelta(){} +}; + +class CUpdater : public CCuAcc { +public: + enum ALGO { + L1_UPDATER, + L1_ADADELTA_UPDATER, + L2_UPDATER, + L2_ADADELTA_UPDATER + }; + + CCuMemory m_dev_prev_x; + unsigned m_nx; + + virtual double initialize(unsigned nx,double* x,double lambda) = 0; + virtual double update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info) = 0; + + void convergence_check(double* dev_x, double* convergence_info); + void convergence_check(CCuMemory& dev_x, double* convergence_info); + + static void write(const char* filename, const unsigned nx, const double* previousWeights, const double* x, const double* cgrad, long mini_batch_size, double step_size, int iter, double lambda); + static void read(const char* filename, unsigned& nx, double*&previousWeights, double*&x, double*&cgrad, long& mini_batch_size, double& step_size, int& iter, double& lambda); + + CUpdater() : CCuAcc(0){} + + virtual ~CUpdater(){} + +}; + +class CSimpleUpdater : public CUpdater { +public: + CSimpleUpdater() : CUpdater(){} + + virtual ~CSimpleUpdater(){} + + virtual double initialize(unsigned nx,double* x,double lambda); + virtual double update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info); +}; + +class CL1Updater : public CUpdater { +public: + CL1Updater() : CUpdater(){} + + virtual ~CL1Updater(){} + + virtual double initialize(unsigned nx,double* x,double lambda); + virtual double update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info); + + double compute_convergence_regularization(CCuMemory& dev_x, int iter, double lambda, double*& convergence_info); +}; + +class CL1AdaDeltaUpdater : public CL1Updater, public CAdaDelta { +public: + CL1AdaDeltaUpdater() : CL1Updater(){} + + virtual ~CL1AdaDeltaUpdater(){} + + virtual double initialize(unsigned nx,double* x,double lambda); + virtual double update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info); +}; + +class CL2Updater : public CUpdater { +public: + CL2Updater() : CUpdater(){} + + virtual ~CL2Updater(){} + + virtual double initialize(unsigned nx,double* x,double lambda); + virtual double update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info); + + double compute_convergence_regularization(CCuMemory& dev_weight, int iter, double lambda, double*& convergence_info); +}; + +class CL2AdaDeltaUpdater : public CL2Updater, public CAdaDelta { +public: + CL2AdaDeltaUpdater() : CL2Updater(){} + + virtual ~CL2AdaDeltaUpdater(){} + + virtual double initialize(unsigned nx,double* x,double lambda); + virtual double update(double* x, double* cgrad, long mini_batch_size, double step_size, int iter, double lambda, double* convergence_info); +}; + +#endif \ No newline at end of file diff --git a/cuAcc/radix_sort.h b/cuAcc/radix_sort.h new file mode 100755 index 0000000..906d53d --- /dev/null +++ b/cuAcc/radix_sort.h @@ -0,0 +1,268 @@ +#ifndef __RADIX_SORT_H__ +#define __RADIX_SORT_H__ + +#include +#include + + +#define _ENABLE_PREFETCH_ +#ifdef _ENABLE_PREFETCH_ +#ifdef _MSC_VER +#include +#define PREFETCH(addr) _mm_prefetch((char*)(addr),_MM_HINT_T0) +#else +#define PREFETCH(addr) __builtin_prefetch((addr),1,1) +#endif +#else +#define PREFETCH(addr) +#endif + + +#define BUCKET_BIT (8) +#define BUCKET_SIZE (1< +unsigned get_key_udp(const KT* key, const int shift) +{ + if(sizeof(KT)==4) + { + unsigned long _key = *(unsigned long*)key; + return (_key>>shift)&BUCKET_MASK; + } + else + { + unsigned long long _key = *(unsigned long long*)key; + return (_key>>shift)&BUCKET_MASK; + } +} + +template +void sort5(KT* key, DT* data, const size_t size, const int e0=0, const int e1=1, const int e2=2, const int e3=3, const int e4=4) +{ + if (key[e1] < key[e0]) + { + std::swap(key[e1],key[e0]); + std::swap(data[e1],data[e0]); + } + + if(size==2) return; + + KT k; + DT d; + + k = key[e2]; + d = data[e2]; + if (k < key[e1]) + { + key[e2] = key[e1]; + data[e2] = data[e1]; + if (k < key[e0]) + { + key[e1] = key[e0]; + key[e0] = k; + data[e1] = data[e0]; + data[e0] = d; + } + else + { + key[e1] = k; + data[e1] = d; + } + } + + if(size==3) return; + + k = key[e3]; + d = data[e3]; + if (k < key[e2]) + { + key[e3] = key[e2]; + data[e3] = data[e2]; + if (k < key[e1]) + { + key[e2] = key[e1]; + data[e2] = data[e1]; + if (k < key[e0]) + { + key[e1] = key[e0]; + key[e0] = k; + data[e1] = data[e0]; + data[e0] = d; + } + else + { + key[e1] = k; + data[e1] = d; + } + } + else + { + key[e2] = k; + data[e2] = d; + } + } + + if(size==4) return; + + k = key[e4]; + d = data[e4]; + if (k < key[e3]) + { + key[e4] = key[e3]; + data[e4] = data[e3]; + if (k < key[e2]) + { + key[e3] = key[e2]; + data[e3] = data[e2]; + if (k < key[e1]) + { + key[e2] = key[e1]; + data[e2] = data[e1]; + if (k < key[e0]) + { + key[e1] = key[e0]; + key[e0] = k; + data[e1] = data[e0]; + data[e0] = d; + } + else + { + key[e1] = k; + data[e1] = d; + } + } + else + { + key[e2] = k; + data[e2] = d; + } + } + else + { + key[e3] = k; + data[e3] = d; + } + } +} + +template +void insertion_sort(KT* key, DT* data, const size_t size) +{ + KT k; + DT d; + + unsigned src,dst; + + for(src = 1; src < size; src++) + { + k = key[src]; + d = data[src]; + + for(dst=src; dst-- > 0 && k < key[dst]; ); + /* when we get out of the loop, + ** table[dst] is the element BEFORE our insertion point, + ** so re-increment it to point at the insertion point */ + if (++dst == src) continue; + + memmove(key+dst+1,key+dst, (src-dst) * sizeof(KT)); + memmove(data+dst+1,data+dst, (src-dst) * sizeof(DT)); + + key[dst] = k; + data[dst] = d; + } +} + +template +void radix_sort(KT* key, DT* data, const size_t size, int shift=(sizeof(KT)-1)*BUCKET_BIT) +{ + size_t head[BUCKET_SIZE]; + size_t first[BUCKET_SIZE+1]; + size_t count[BUCKET_SIZE] = {0,}; + size_t* last = first+1; + size_t i,id; + KT k; + DT d; + + for (i=0;i(key+i,shift)]; + + head[0] = 0; + memcpy(head+1,count,sizeof(count)-sizeof(size_t)); //head -> has the start address + + for (i=1;i has the last address + + if(head[i]==size) + break; + + PREFETCH(key+head[i]); + } + + const unsigned last_bucket = i; + memcpy(first,head,sizeof(size_t)*last_bucket); + + size_t begin,end; + for (i=0;i(&k,shift); + assert(idi); + do + { + //keep moving until you hit the right bucket + std::swap(d,data[head[id]]); + std::swap(k,key[head[id]]); + + head[id]++; + + PREFETCH(key+head[id]); + PREFETCH(data+head[id]); + + id= get_key_udp(&k,shift); + } + while(i!=id); + + //done with this key, and proceed for the next + //cannot be id as it might break previously + key[begin] = k; + data[begin] = d; + begin++; + } + } + } + + if(shift>=BUCKET_BIT) + { + shift -= BUCKET_BIT; + + for (i=0;i(key+first[i],data+first[i],cur_size); + else if(cur_size<=64) + insertion_sort(key+first[i],data+first[i],cur_size); + else + radix_sort(key+first[i],data+first[i],cur_size,shift); + } + } +} + +#endif \ No newline at end of file diff --git a/cuAcc/run.bat b/cuAcc/run.bat new file mode 100755 index 0000000..fd61e2a --- /dev/null +++ b/cuAcc/run.bat @@ -0,0 +1,19 @@ +cd ../../../../../ + +$SCALA_HOME/bin/scalac org/apache/spark/ml/optim/NativeCuAcc.scala + + +javah -o org/apache/spark/ml/optim/NativeCuAcc.h -cp $SCALA_CP:. org.apache.spark.ml.optim.NativeCuAcc + +cd - + +rm -f *.so + + +echo "**** nvcc compile" +nvcc -shared -I/usr/include -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -I/opt/share/OpenBLAS-0.2.14/include *.cpp *.cu -o libCuAcc_nvcc.so -Xcompiler "-fPIC -g" -g -lcublas -lcusparse -m64 --use_fast_math --ptxas-options=-v -rdc=true -gencode arch=compute_35,code=sm_35 -gencode arch=compute_35,code=compute_35 /opt/share/OpenBLAS-0.2.14/lib/libopenblas.a -lpthread -lrt + +ln -sf libCuAcc_nvcc.so libCuAcc.so + +nvcc -I/usr/include -I$JAVA_HOME/include -I$JAVA_HOME/include/linux -I/opt/share/OpenBLAS-0.2.14/include *.cpp *.cu -o cuAcc -Xcompiler "-fPIC" -lcublas -lcusparse -m64 --use_fast_math --ptxas-options=-v -rdc=true -gencode arch=compute_35,code=sm_35 -gencode arch=compute_35,code=compute_35 /opt/share/OpenBLAS-0.2.14/lib/libopenblas.a -lpthread -lrt +