diff --git a/HugeCTR/embedding/all2all_embedding_collection.cu b/HugeCTR/embedding/all2all_embedding_collection.cu index b585221852..8eba46bd6d 100644 --- a/HugeCTR/embedding/all2all_embedding_collection.cu +++ b/HugeCTR/embedding/all2all_embedding_collection.cu @@ -206,7 +206,7 @@ void weighted_sparse_forward_per_gpu( const core23::Tensor &sp_weights_all_gather_recv_buffer, ILookup *emb_storage, std::vector &emb_vec_model_buffer, int64_t *num_model_key, int64_t *num_model_offsets, core23::Tensor &ret_model_key, core23::Tensor &ret_model_offset, - core23::Tensor &ret_sp_weight) { + core23::Tensor &ret_sp_weight, bool use_filter) { HugeCTR::CudaDeviceContext context(core->get_device_id()); int tensor_device_id = core->get_device_id(); @@ -369,6 +369,79 @@ void weighted_sparse_forward_per_gpu( *num_model_offsets = model_offsets.num_elements(); } +template +__global__ void cal_lookup_idx(size_t lookup_num, offset_t *bucket_after_filter, size_t batch_size, + offset_t *lookup_offset, size_t bucket_num) { + int32_t i = blockIdx.x * blockDim.x + threadIdx.x; + int32_t step = blockDim.x * gridDim.x; + for (; i < (lookup_num); i += step) { + lookup_offset[i] = bucket_after_filter[i * batch_size]; + } +} + +template +__global__ void count_ratio_filter(size_t bucket_num, char *filterd, const offset_t *bucket_range, + offset_t *bucket_after_filter) { + int32_t i = blockIdx.x * blockDim.x + threadIdx.x; + int32_t step = blockDim.x * gridDim.x; + for (; i < (bucket_num); i += step) { + offset_t start = bucket_range[i]; + offset_t end = bucket_range[i + 1]; + bucket_after_filter[i + 1] = 0; + for (offset_t idx = start; idx < end; idx++) { + if (filterd[idx] == 1) { + bucket_after_filter[i + 1]++; + } + } + if (i == 0) { + bucket_after_filter[i] = 0; + } + } +} + +void filter(std::shared_ptr core, + const UniformModelParallelEmbeddingMeta &meta, const core23::Tensor &filterd, + core23::Tensor &bucket_range, core23::Tensor &bucket_after_filter, + core23::TensorParams ¶ms, EmbeddingInput &emb_input, core23::Tensor &lookup_offset, + core23::Tensor &temp_scan_storage, core23::Tensor &temp_select_storage, + size_t temp_scan_bytes, size_t temp_select_bytes, core23::Tensor &keys_after_filter) { + auto stream = core->get_local_gpu()->get_stream(); + // bucket_range length = bucket_num+1 , so here we minus 1. + int bucket_num = bucket_range.num_elements() - 1; + const int block_size = 256; + const int grid_size = + core->get_kernel_param().num_sms * core->get_kernel_param().max_thread_per_block / block_size; + + DISPATCH_INTEGRAL_FUNCTION_CORE23(bucket_range.data_type().type(), offset_t, [&] { + DISPATCH_INTEGRAL_FUNCTION_CORE23(keys_after_filter.data_type().type(), key_t, [&] { + offset_t *bucket_after_filter_ptr = bucket_after_filter.data(); + const offset_t *bucket_range_ptr = bucket_range.data(); + char *filterd_ptr = filterd.data(); + count_ratio_filter<<>>( + bucket_num, filterd_ptr, bucket_range_ptr, bucket_after_filter_ptr); + cub::DeviceScan::InclusiveSum( + temp_scan_storage.data(), temp_scan_bytes, bucket_after_filter.data(), + bucket_after_filter.data(), bucket_after_filter.num_elements(), stream); + + key_t *keys_ptr = emb_input.keys.data(); + + cub::DeviceSelect::Flagged(temp_select_storage.data(), temp_select_bytes, keys_ptr, + filterd_ptr, keys_after_filter.data(), + emb_input.num_keys.data(), emb_input.h_num_keys, stream); + + size_t batch_size = (bucket_num) / meta.num_lookup_; + + cal_lookup_idx<<<1, block_size, 0, stream>>>(meta.num_lookup_ + 1, + bucket_after_filter.data(), batch_size, + lookup_offset.data(), bucket_num); + HCTR_LIB_THROW(cudaStreamSynchronize(stream)); + emb_input.h_num_keys = static_cast(emb_input.num_keys.data()[0]); + emb_input.keys = keys_after_filter; + emb_input.bucket_range = bucket_after_filter; + }); + }); +} + void sparse_forward_per_gpu(std::shared_ptr core, const EmbeddingCollectionParam &ebc_param, const UniformModelParallelEmbeddingMeta &meta, @@ -376,7 +449,8 @@ void sparse_forward_per_gpu(std::shared_ptr core, const core23::Tensor &row_lengths_all_gather_recv_buffer, ILookup *emb_storage, std::vector &emb_vec_model_buffer, int64_t *num_model_key, int64_t *num_model_offsets, - core23::Tensor *ret_model_key, core23::Tensor *ret_model_offset) { + core23::Tensor *ret_model_key, core23::Tensor *ret_model_offset, + bool use_filter) { /* There are some steps in this function: 1.reorder key to feature major @@ -500,8 +574,56 @@ void sparse_forward_per_gpu(std::shared_ptr core, compress_offset_.compute(embedding_input.bucket_range, batch_size, &num_key_per_lookup_offset); HCTR_LIB_THROW(cudaStreamSynchronize(stream)); + if (use_filter) { + core23::Tensor bucket_range_after_filter; + core23::Tensor keys_after_filter; + core23::Tensor filtered; + + filtered = core23::Tensor( + params.shape({(int64_t)embedding_input.h_num_keys}).data_type(core23::ScalarType::Char)); + bucket_range_after_filter = + core23::Tensor(params.shape({embedding_input.bucket_range.num_elements()}) + .data_type(embedding_input.bucket_range.data_type().type())); + keys_after_filter = core23::Tensor(params.shape({(int64_t)embedding_input.h_num_keys + 1}) + .data_type(embedding_input.keys.data_type().type())); + + core23::Tensor temp_scan_storage; + core23::Tensor temp_select_storage; + + size_t temp_scan_bytes = 0; + size_t temp_select_bytes = 0; + + DISPATCH_INTEGRAL_FUNCTION_CORE23( + embedding_input.bucket_range.data_type().type(), offset_t, [&] { + DISPATCH_INTEGRAL_FUNCTION_CORE23(embedding_input.keys.data_type().type(), key_t, [&] { + cub::DeviceScan::InclusiveSum(nullptr, temp_scan_bytes, (offset_t *)nullptr, + (offset_t *)nullptr, + bucket_range_after_filter.num_elements()); + + temp_scan_storage = core23::Tensor(params.shape({static_cast(temp_scan_bytes)}) + .data_type(core23::ScalarType::Char)); + + cub::DeviceSelect::Flagged(nullptr, temp_select_bytes, (key_t *)nullptr, + (char *)nullptr, (key_t *)nullptr, (uint64_t *)nullptr, + embedding_input.h_num_keys); + + temp_select_storage = + core23::Tensor(params.shape({static_cast(temp_select_bytes)}) + .data_type(core23::ScalarType::Char)); + }); + }); + + emb_storage->ratio_filter(embedding_input.keys, embedding_input.h_num_keys, + num_key_per_lookup_offset, meta.num_local_lookup_ + 1, + meta.d_local_table_id_list_, filtered); + + filter(core, meta, filtered, embedding_input.bucket_range, bucket_range_after_filter, params, + embedding_input, num_key_per_lookup_offset, temp_scan_storage, temp_select_storage, + temp_scan_bytes, temp_select_bytes, keys_after_filter); + } core23::Tensor embedding_vec = core23::init_tensor_list( key_all_gather_recv_buffer.num_elements(), params.device().index()); + emb_storage->lookup(embedding_input.keys, embedding_input.h_num_keys, num_key_per_lookup_offset, meta.num_local_lookup_ + 1, meta.d_local_table_id_list_, embedding_vec); diff --git a/HugeCTR/embedding/all2all_embedding_collection.hpp b/HugeCTR/embedding/all2all_embedding_collection.hpp index 98021c7606..6c97567bc1 100644 --- a/HugeCTR/embedding/all2all_embedding_collection.hpp +++ b/HugeCTR/embedding/all2all_embedding_collection.hpp @@ -57,7 +57,7 @@ void weighted_sparse_forward_per_gpu( const core23::Tensor &sp_weights_all_gather_recv_buffer, ILookup *emb_storage, std::vector &emb_vec_model_buffer, int64_t *num_model_key, int64_t *num_model_offsets, core23::Tensor &ret_model_key, core23::Tensor &ret_model_offset, - core23::Tensor &ret_sp_weight); + core23::Tensor &ret_sp_weight, bool use_filter); void weighted_copy_model_keys_and_offsets( std::shared_ptr core, const core23::Tensor &model_key, @@ -71,7 +71,8 @@ void sparse_forward_per_gpu(std::shared_ptr core, const core23::Tensor &row_lengths_all_gather_recv_buffer, ILookup *emb_storage, std::vector &emb_vec_model_buffer, int64_t *num_model_key, int64_t *num_model_offsets, - core23::Tensor *ret_model_key, core23::Tensor *ret_model_offset); + core23::Tensor *ret_model_key, core23::Tensor *ret_model_offset, + bool use_filter); void copy_model_keys_and_offsets(std::shared_ptr core, const core23::Tensor &model_key, diff --git a/HugeCTR/embedding/embedding_table.hpp b/HugeCTR/embedding/embedding_table.hpp index 3cae66986c..9a28186ed1 100644 --- a/HugeCTR/embedding/embedding_table.hpp +++ b/HugeCTR/embedding/embedding_table.hpp @@ -23,9 +23,13 @@ class ILookup { public: virtual ~ILookup() = default; - virtual void lookup(const core23::Tensor &keys, size_t num_keys, - const core23::Tensor &num_keys_per_table_offset, size_t num_table_offset, - const core23::Tensor &table_id_list, core23::Tensor &embedding_vec) = 0; + virtual void lookup(const core23::Tensor& keys, size_t num_keys, + const core23::Tensor& num_keys_per_table_offset, size_t num_table_offset, + const core23::Tensor& table_id_list, core23::Tensor& embedding_vec) = 0; + + virtual void ratio_filter(const core23::Tensor& keys, size_t num_keys, + const core23::Tensor& id_space_offset, size_t num_id_space_offset, + const core23::Tensor& id_space, core23::Tensor& filtered){}; }; } // namespace embedding diff --git a/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.cu b/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.cu index 4886e85b83..95a02307db 100644 --- a/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.cu +++ b/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.cu @@ -380,6 +380,56 @@ void DummyVarAdapter::lookup( } } +template +void DummyVarAdapter::ratio_filter( + const core23::Tensor& keys, size_t num_keys, const core23::Tensor& id_space_offset, + size_t num_id_space_offset, const core23::Tensor& id_space, core23::Tensor& filtered) { + // clang-format off + id_space_offset_.clear(); + id_space_.clear(); + id_space_offset_.resize(num_id_space_offset); + CUDACHECK(cudaMemcpyAsync(id_space_offset_.data(), + id_space_offset.data(), + sizeof(OffsetType) * (num_id_space_offset), + cudaMemcpyDeviceToHost, stream_)); + id_space_.resize(num_id_space_offset - 1); + CUDACHECK(cudaMemcpyAsync(id_space_.data(), + id_space.data(), + sizeof(int) * (num_id_space_offset - 1), + cudaMemcpyDeviceToHost, stream_)); + // clang-format on + CUDACHECK(cudaStreamSynchronize(stream_)); + const KeyType* input = keys.data(); + bool* output_filtered = filtered.data(); + int start_index = 0; + size_t num = 0; + bool is_lookup = false; + + for (int i = 0; i < num_id_space_offset - 1; ++i) { + if (i == num_id_space_offset - 2) { + num += id_space_offset_[i + 1] - id_space_offset_[i]; + is_lookup = true; + } else { + if (same_table_[i + 1] != same_table_[i]) { + num += id_space_offset_[i + 1] - id_space_offset_[i]; + is_lookup = true; + } else { + num += id_space_offset_[i + 1] - id_space_offset_[i]; + } + } + if (num != 0 && is_lookup) { + auto var = vars_[id_space_[start_index]]; + var->ratio_filter(input, output_filtered, num, stream_); + CUDACHECK(cudaStreamSynchronize(stream_)); + input += num; + output_filtered += num; + num = 0; + is_lookup = false; + start_index = i + 1; + } + } +} + template class DummyVarAdapter; template class DummyVarAdapter; // template class DummyVarAdapter; diff --git a/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.h b/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.h index cf0d69e3a0..1acd7cfcab 100644 --- a/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.h +++ b/sparse_operation_kit/kit_src/lookup/impl/embedding_collection_adapter.h @@ -88,6 +88,10 @@ class DummyVarAdapter : public ::embedding::ILookup { size_t num_id_space_offset, const core23::Tensor& id_space, core23::Tensor& embedding_vec) override; + void ratio_filter(const core23::Tensor& keys, size_t num_keys, + const core23::Tensor& id_space_offset, size_t num_id_space_offset, + const core23::Tensor& id_space, core23::Tensor& filtered) override; + private: std::shared_ptr tf_backend_; int sm_count_; diff --git a/sparse_operation_kit/kit_src/lookup/kernels/embedding_collection.cc b/sparse_operation_kit/kit_src/lookup/kernels/embedding_collection.cc index 63a4cad2c9..83dbef22df 100644 --- a/sparse_operation_kit/kit_src/lookup/kernels/embedding_collection.cc +++ b/sparse_operation_kit/kit_src/lookup/kernels/embedding_collection.cc @@ -72,6 +72,7 @@ class EmbeddingCollectionBase : public OpKernel { int global_gpu_id_; int num_local_lookups_; bool use_sp_weight_; + bool use_filter_; HugeCTR::core23::KernelParams kernel_params_; std::unique_ptr ebc_param_; @@ -143,6 +144,7 @@ class EmbeddingCollectionBase : public OpKernel { OP_REQUIRES_OK(ctx, ctx->GetAttr("id_in_local_rank", &id_in_local_rank_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("num_gpus", &num_gpus_)); OP_REQUIRES_OK(ctx, ctx->GetAttr("use_sp_weight", &use_sp_weight_)); + OP_REQUIRES_OK(ctx, ctx->GetAttr("use_filter", &use_filter_)); // check rank/num_ranks/id_in_local_rank/num_gpus OP_REQUIRES(ctx, rank_ >= 0 && rank_ < num_ranks_, errors::InvalidArgument("Invalid rank.")); @@ -477,13 +479,13 @@ class LookupFowardBase : public EmbeddingCollectionBasemeta_, this->global_gpu_id_, key_recv_buffer_tensor, row_length_recv_buffer_tensor, sp_weight_recv_buffer_tensor, &adapter_, emb_vec_model_buffer, &num_model_key, &num_model_offsets, ret_model_key, ret_model_offset, - ret_sp_weight); + ret_sp_weight,this->use_filter_); } else { ::embedding::tf::model_forward::sparse_forward_per_gpu( tf_backend, *this->ebc_param_, *this->meta_, key_recv_buffer_tensor, row_length_recv_buffer_tensor, &adapter_, emb_vec_model_buffer, &num_model_key, - &num_model_offsets, &ret_model_key, &ret_model_offset); + &num_model_offsets, &ret_model_key, &ret_model_offset,this->use_filter_); } // Prepare model_key & model_offsets diff --git a/sparse_operation_kit/kit_src/lookup/ops/embedding_collection.cc b/sparse_operation_kit/kit_src/lookup/ops/embedding_collection.cc index c3344c1f4f..6c3cd8d2c1 100644 --- a/sparse_operation_kit/kit_src/lookup/ops/embedding_collection.cc +++ b/sparse_operation_kit/kit_src/lookup/ops/embedding_collection.cc @@ -52,6 +52,7 @@ REGISTER_OP("PreprocessingForward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -80,6 +81,7 @@ REGISTER_OP("PreprocessingForwardWithWeight") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -112,6 +114,7 @@ REGISTER_OP("LookupForward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -165,6 +168,7 @@ REGISTER_OP("LookupForwardVariable") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -218,6 +222,7 @@ REGISTER_OP("LookupForwardDynamic") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -273,6 +278,7 @@ REGISTER_OP("LookupForwardEmbeddingVarGPU") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -324,6 +330,7 @@ REGISTER_OP("LookupBackward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -362,6 +369,7 @@ REGISTER_OP("PostprocessingForward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") @@ -403,6 +411,7 @@ REGISTER_OP("PostprocessingBackward") .Attr("id_in_local_rank: int") .Attr("num_gpus: int") .Attr("use_sp_weight: bool") + .Attr("use_filter: bool") .Attr("Tindices: {int32, int64} = DT_INT64") .Attr("Toffsets: {int32, int64} = DT_INT64") .Attr("dtype: {float32, float16} = DT_FLOAT") diff --git a/sparse_operation_kit/kit_src/variable/impl/det_variable.cu b/sparse_operation_kit/kit_src/variable/impl/det_variable.cu index a9c1a84def..cb324aed0f 100644 --- a/sparse_operation_kit/kit_src/variable/impl/det_variable.cu +++ b/sparse_operation_kit/kit_src/variable/impl/det_variable.cu @@ -248,6 +248,12 @@ void DETVariable::scatter_update(const KeyType* keys, const map_->scatter_update(keys, values, num_keys, stream); } +template +void DETVariable::ratio_filter(const KeyType* keys, bool* filtered, + size_t num_keys, cudaStream_t stream) { + throw std::runtime_error("SOK dynamic variable with DET backend don't support ratio_filter!"); +} + template class DETVariable; template class DETVariable; diff --git a/sparse_operation_kit/kit_src/variable/impl/det_variable.h b/sparse_operation_kit/kit_src/variable/impl/det_variable.h index 243308bd7b..065fcaaf13 100644 --- a/sparse_operation_kit/kit_src/variable/impl/det_variable.h +++ b/sparse_operation_kit/kit_src/variable/impl/det_variable.h @@ -56,10 +56,13 @@ class DETVariable : public VariableBase { cudaStream_t stream = 0) override; void scatter_update(const KeyType *keys, const ValueType *values, size_t num_keys, cudaStream_t stream = 0) override; + void ratio_filter(const KeyType *keys, bool *filtered, size_t num_keys, + cudaStream_t stream = 0) override; private: std::unique_ptr> map_; + float filter_ratio_; size_t dimension_; size_t initial_capacity_; std::string initializer_; diff --git a/sparse_operation_kit/kit_src/variable/impl/hkv_variable.cu b/sparse_operation_kit/kit_src/variable/impl/hkv_variable.cu index 9a9fa2ab22..919ea561fd 100644 --- a/sparse_operation_kit/kit_src/variable/impl/hkv_variable.cu +++ b/sparse_operation_kit/kit_src/variable/impl/hkv_variable.cu @@ -276,12 +276,13 @@ HKVVariable::HKVVariable(int64_t dimension, int64_t initial_ size_t max_hbm_for_vectors, size_t max_bucket_size, float max_load_factor, int block_size, int device_id, bool io_by_cpu, const std::string& evict_strategy, - cudaStream_t stream) + cudaStream_t stream, float filter_ratio) : dimension_(dimension), initial_capacity_(initial_capacity), initializer_(initializer), stream_(stream), - curand_states_(nullptr) { + curand_states_(nullptr), + filter_ratio_(filter_ratio) { if (dimension_ <= 0) { throw std::invalid_argument("dimension must > 0 but got " + std::to_string(dimension)); } @@ -559,5 +560,34 @@ void HKVVariable::scatter_update(const KeyType* keys, const CUDACHECK(cudaStreamSynchronize(stream)); } +template +__global__ void ratio_filter_flag(curandState* state, const KeyType *keys, bool *filtered, size_t num_keys, float filter_ratio) { + int idx = blockIdx.x * blockDim.x + threadIdx.x; + curandState localState; + localState = state[GlobalThreadId()]; + for (int i = idx; i < num_keys; i += blockDim.x * gridDim.x) { + if (!filtered[i]) { + auto ratio = curand_uniform(&localState); + if (ratio < filter_ratio) { + filtered[i] = true; + } + } + } + state[GlobalThreadId()] = localState; +} +template +void HKVVariable::ratio_filter(const KeyType *keys, bool *filtered, + size_t num_keys, cudaStream_t stream) { + // TODO: update hkv, use exist; + ValueType** p_values; + CUDACHECK(cudaMallocAsync(&p_values, num_keys * sizeof(ValueType*),stream)); + hkv_table_->find(num_keys, keys, p_values, filtered, nullptr, stream); + uint32_t grid_dim = SM_NUM * (NTHREAD_PER_SM/256); + // filter + ratio_filter_flag<<>>(curand_states_, keys, filtered, num_keys, filter_ratio_); + CUDACHECK(cudaFreeAsync(p_values,stream)); + //CUDACHECK(cudaStreamSynchronize(stream)); +} + template class HKVVariable; } // namespace sok diff --git a/sparse_operation_kit/kit_src/variable/impl/hkv_variable.h b/sparse_operation_kit/kit_src/variable/impl/hkv_variable.h index 631d57b3be..a831a04950 100644 --- a/sparse_operation_kit/kit_src/variable/impl/hkv_variable.h +++ b/sparse_operation_kit/kit_src/variable/impl/hkv_variable.h @@ -33,7 +33,7 @@ class HKVVariable : public VariableBase { size_t max_capacity = 0, size_t max_hbm_for_vectors = 0, size_t max_bucket_size = 128, float max_load_factor = 0.5f, int block_size = 128, int device_id = 0, bool io_by_cpu = false, const std::string &evict_strategy = "kLru", - cudaStream_t stream = 0); + cudaStream_t stream = 0, float filter_ratio = 1.0); ~HKVVariable() override; int64_t rows() override; @@ -61,12 +61,15 @@ class HKVVariable : public VariableBase { cudaStream_t stream = 0) override; void scatter_update(const KeyType *keys, const ValueType *values, size_t num_keys, cudaStream_t stream = 0) override; + void ratio_filter(const KeyType *keys, bool *filtered, size_t num_keys, + cudaStream_t stream = 0) override; private: using HKVTable = nv::merlin::HashTable; std::unique_ptr hkv_table_ = std::make_unique(); nv::merlin::HashTableOptions hkv_table_option_; + float filter_ratio_; size_t dimension_; size_t initial_capacity_; std::string initializer_; diff --git a/sparse_operation_kit/kit_src/variable/impl/variable_base.cu b/sparse_operation_kit/kit_src/variable/impl/variable_base.cu index 7c89ea13e3..1b2eb5574c 100644 --- a/sparse_operation_kit/kit_src/variable/impl/variable_base.cu +++ b/sparse_operation_kit/kit_src/variable/impl/variable_base.cu @@ -87,9 +87,16 @@ std::shared_ptr> VariableFactory::create( if (evict_strategy_it != config_json.end()) { evict_strategy = io_by_cpu_it->get(); } + // When we encounter a feature that is not already in our model, we only add it to + // the model with probability p. + float filter_ratio = 1.0f; ///< low_frequency_filter probability p. default 100% + auto filter_ratio_it = config_json.find("filter_ratio"); + if (filter_ratio_it != config_json.end()) { + filter_ratio = filter_ratio_it->get(); + } return std::make_shared>( cols, init_capacity, initializer, max_capacity, max_hbm_for_vectors, max_bucket_size, - max_load_factor, block_size, device_id, io_by_cpu, evict_strategy, stream); + max_load_factor, block_size, device_id, io_by_cpu, evict_strategy, stream, filter_ratio); } } template <> diff --git a/sparse_operation_kit/kit_src/variable/impl/variable_base.h b/sparse_operation_kit/kit_src/variable/impl/variable_base.h index 544ecd0571..63994105f7 100644 --- a/sparse_operation_kit/kit_src/variable/impl/variable_base.h +++ b/sparse_operation_kit/kit_src/variable/impl/variable_base.h @@ -57,6 +57,8 @@ class VariableBase { cudaStream_t stream = 0) = 0; virtual void scatter_update(const KeyType *keys, const ValueType *values, size_t num_keys, cudaStream_t stream = 0) = 0; + virtual void ratio_filter(const KeyType *keys, bool *filtered, size_t num_keys, + cudaStream_t stream = 0) = 0; }; class VariableFactory { diff --git a/sparse_operation_kit/sparse_operation_kit/lookup.py b/sparse_operation_kit/sparse_operation_kit/lookup.py index 6831fd919e..965eccc049 100644 --- a/sparse_operation_kit/sparse_operation_kit/lookup.py +++ b/sparse_operation_kit/sparse_operation_kit/lookup.py @@ -232,6 +232,7 @@ def _LookupBackward(op, *top_grads): "id_in_local_rank", # "Toffsets", "use_sp_weight", + "use_filter", ] kwargs = {} for attr in attr_list: @@ -268,6 +269,7 @@ def _LookupBackward(op, *top_grads): "id_in_local_rank", # "Toffsets", "use_sp_weight", + "use_filter", ] kwargs = {} for attr in attr_list: @@ -304,6 +306,7 @@ def _LookupDynamicBackward(op, *top_grads): "id_in_local_rank", # "Toffsets", "use_sp_weight", + "use_filter", ] kwargs = {} for attr in attr_list: @@ -342,6 +345,7 @@ def _LookupBackwardEmbeddingVarGPU(op, *top_grads): "id_in_local_rank", # "Toffsets", "use_sp_weight", + "use_filter", ] kwargs = {} for attr in attr_list: @@ -385,7 +389,8 @@ def _PostprocessingBackward(op, *top_grads): "id_in_local_rank", "num_gpus", "Tindices", - "use_sp_weight" + "use_sp_weight", + "use_filter", # "Toffsets", ] kwargs = {} @@ -412,7 +417,7 @@ def to_list(any_obj): return any_obj -def lookup_sparse_impl(params, sp_ids, sp_weights=None, combiners=None): +def lookup_sparse_impl(params, sp_ids, sp_weights=None, combiners=None, use_filter=False): shard, dimensions = [], [] for param in params: shard.append(param.target_gpu) @@ -467,6 +472,7 @@ def lookup_sparse_impl(params, sp_ids, sp_weights=None, combiners=None): "num_ranks": num_ranks(), "id_in_local_rank": id_in_rank(), "use_sp_weight": use_sp_weight, + "use_filter": use_filter, } # Step1 @@ -529,7 +535,7 @@ def lookup_sparse_impl(params, sp_ids, sp_weights=None, combiners=None): return emb_vec -def lookup_sparse(params, sp_ids, sp_weights=None, combiners=None): +def lookup_sparse(params, sp_ids, sp_weights=None, combiners=None, use_low_frequency_filter=False): """ Abbreviated as ``sok.lookup_sparse``. @@ -552,6 +558,8 @@ def lookup_sparse(params, sp_ids, sp_weights=None, combiners=None): combiners: list, tuple,optional a list or tuple of string to specify the combiner of each lookup,for now only suupport "mean" "sum". if don't specify , indicate all elements(numbero of elements is same with number of sok.Variables) in combiners will should be set to be mean. + use_low_frequency_filter: bool,optional + For new indices that are not in the embedding table, should low-frequency filtering be performed to enter the embedding table Returns ------- @@ -655,7 +663,11 @@ def lookup_sparse(params, sp_ids, sp_weights=None, combiners=None): selected_combiners = [combiners[i] for i in selected_idx] selected_emb_vec = lookup_sparse_impl( - selected_params, selected_sp_ids, selected_sp_weights, selected_combiners + selected_params, + selected_sp_ids, + selected_sp_weights, + selected_combiners, + use_low_frequency_filter, ) for ii, i in enumerate(selected_idx): emb_vec[i] = selected_emb_vec[ii] diff --git a/sparse_operation_kit/sparse_operation_kit/test/function_test/tf2/lookup/lookup_sparse_hkv_low_frequency_test.py b/sparse_operation_kit/sparse_operation_kit/test/function_test/tf2/lookup/lookup_sparse_hkv_low_frequency_test.py new file mode 100644 index 0000000000..7715576bed --- /dev/null +++ b/sparse_operation_kit/sparse_operation_kit/test/function_test/tf2/lookup/lookup_sparse_hkv_low_frequency_test.py @@ -0,0 +1,163 @@ +""" + Copyright (c) 2022, NVIDIA CORPORATION. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import time +import numpy as np +import tensorflow as tf +import horovod.tensorflow as hvd +import sparse_operation_kit as sok + +np.set_printoptions(threshold=np.inf) + +if __name__ == "__main__": + hvd.init() + gpus = tf.config.experimental.list_physical_devices("GPU") + for gpu in gpus: + tf.config.experimental.set_memory_growth(gpu, True) + if gpus: + tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU") + sok.init() + + rows = [8192 * 2048, 8192 * 8192] + cols = [128, 4] + hotness = [1, 1] + combiners = ["sum", "mean"] + batch_size = 8192 + iters = 1 + filter_iters = 5 + initial_vals = [13, 17] + + # sok variables + sok_vars = [ + sok.DynamicVariable( + dimension=cols[i], + var_type="hybrid", + initializer=str(initial_vals[i]), + init_capacity=1024 * 1024, + max_capacity=1024 * 1024, + ) + for i in range(len(cols)) + ] + print("HKV var created") + + # indices + total_indices = [] + total_indices_np = [] + for i in range(len(rows)): + offsets = np.random.randint(1, hotness[i] + 1, iters * batch_size) + offsets = tf.convert_to_tensor(offsets, dtype=tf.int64) + offsets = hvd.broadcast(offsets, root_rank=0) + values = np.random.randint(0, rows[i], tf.reduce_sum(offsets)) + values = tf.convert_to_tensor(values, dtype=tf.int64) + values = hvd.broadcast(values, root_rank=0) + total_indices_np.append(values) + total_indices.append(tf.RaggedTensor.from_row_lengths(values, offsets)) + left = batch_size // hvd.size() * hvd.rank() + right = batch_size // hvd.size() * (hvd.rank() + 1) + + unique_indices = [] + for i in range(len(total_indices_np)): + unique_indices.append(np.unique(total_indices_np[i])) + + # initialize optimizer + optimizer = tf.optimizers.SGD(learning_rate=1.0, momentum=0.9) + sok_optimizer = sok.OptimizerWrapper(optimizer) + + def step(params, indices, use_filter=False): + with tf.GradientTape() as tape: + embeddings = sok.lookup_sparse( + params, indices, combiners=combiners, use_low_frequency_filter=use_filter + ) + loss = 0 + for i in range(len(embeddings)): + loss = loss + tf.reduce_sum(embeddings[i]) + grads = tape.gradient(loss, params) + sok_optimizer.apply_gradients(zip(grads, params)) + loss = hvd.allreduce(loss, op=hvd.Sum) + return loss, embeddings + + indices_records = [] + for i in range(iters): + loss, embeddings = step(sok_vars, total_indices) + print("____________pre lookup is done!________________".format(str(i))) + + # indices + total_indices_filter = [] + total_indices_filter_np = [] + for i in range(len(rows)): + offsets = np.random.randint(1, hotness[i] + 1, filter_iters * batch_size) + offsets = tf.convert_to_tensor(offsets, dtype=tf.int64) + offsets = hvd.broadcast(offsets, root_rank=0) + values = np.random.randint(0, rows[i], tf.reduce_sum(offsets)) + values = tf.convert_to_tensor(values, dtype=tf.int64) + values = hvd.broadcast(values, root_rank=0) + total_indices_filter_np.append(values) + total_indices_filter.append(tf.RaggedTensor.from_row_lengths(values, offsets)) + + left = batch_size // hvd.size() * hvd.rank() + right = batch_size // hvd.size() * (hvd.rank() + 1) + + def check_zero_line(arr): + zero_rows = [idx for idx, row in enumerate(arr) if np.all(row == 0)] + rows = arr.shape[0] + if rows == 0: + return 0, 0, 0 + zero_count = 0 + for i in range(rows): + tmp_line = arr[i, :] + if np.all(tmp_line == 0): + zero_count += 1 + return zero_count, rows, zero_count / rows + + for i in range(iters): + indices = [] + indices_np = [] + indices_new_np = [] + masks = [] + for j in range(len(total_indices_filter)): + tmp_indices_tensor = total_indices_filter[j][ + i * batch_size + left : i * batch_size + right + ] + indices.append(tmp_indices_tensor) + indices_np.append(np.squeeze(tmp_indices_tensor.numpy())) + mask = np.isin(indices_np[j], unique_indices[j]) + masks.append(mask) + loss, embeddings = step(sok_vars, indices, use_filter=True) + for k, embedding in enumerate(embeddings): + embedding_np = embedding.numpy() + mask_no_filter_index = np.where( + masks[k] == True, + )[0] + mask_filter_index = np.where( + masks[k] == False, + )[0] + + print("mask_no_filter_index = ", mask_no_filter_index) + print("mask_filter_index = ", mask_filter_index) + embedding_no_filter_np = embedding_np[mask_no_filter_index, :] + embedding_filter_np = embedding_np[mask_filter_index, :] + + print("embedding_no_filter_np = ", embedding_no_filter_np) + print("embedding_filter_np = ", embedding_filter_np) + print("embedding_np = ", embedding_np.shape) + print("mask = ", mask.shape) + zero_count_no_filter, _, zero_rate_no_filter = check_zero_line(embedding_no_filter_np) + zero_count_filter, _, zero_rate_filter = check_zero_line(embedding_filter_np) + print("zero_rate_no_filter = ", zero_rate_no_filter) + print("zero_rate_filter = ", zero_rate_filter) + assert zero_count_filter >= 0 + assert zero_count_no_filter == 0 + print("low frequency filter is pass")