Skip to content

Commit

Permalink
Merge branch 'fea-decouple_collective_with_resource_manager-aleliu' i…
Browse files Browse the repository at this point in the history
…nto 'main'

[READY]Deprecate AsyncDataReader, LocalizedOneHot and Hybrid Embedding

See merge request dl/hugectr/hugectr!1507
  • Loading branch information
minseokl committed Jan 19, 2024
2 parents 483f52b + 8092533 commit 2f28608
Show file tree
Hide file tree
Showing 149 changed files with 3,214 additions and 20,266 deletions.
55 changes: 0 additions & 55 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -294,47 +294,6 @@ criteo_multi_node:
DGXNNODES: 2
TEST_CMD: ./ci/integration_test/criteo/criteo_multi_node.sub

dlrm_benchmark_14node:
extends: .cluster_test_job_daily
needs:
- build_train_multi_node
variables:
GPFSFOLDER: $LOGDIR/dlrm_benchmark_14node
GIT_CLONE_PATH: ${GIT_CLONE_PATH_SELENE}
CONT: $TRAIN_IMAGE_MULTINODE_VERSIONED
MOUNTS: /raid:/raid
WALLTIME: "00:15:00"
SBATCH_OTHER_PARAMS: --network sharp
DGXNNODES: 14
TEST_CMD: ./ci/integration_test/dlrm/benchmark_14node.sub

dlrm_ib_nvlink_1node:
extends: .cluster_test_job_daily
needs:
- build_train_multi_node
variables:
GPFSFOLDER: $LOGDIR/dlrm_ib_nvlink_1node
GIT_CLONE_PATH: ${GIT_CLONE_PATH_SELENE}
CONT: $TRAIN_IMAGE_MULTINODE_VERSIONED
MOUNTS: /raid/datasets/criteo/mlperf/40m.limit_preshuffled:/data
WALLTIME: "00:10:00"
DGXNNODES: 1
TEST_CMD: ./ci/integration_test/dlrm/ib_nvlink_1node.sub

dlrm_ib_nvlink_8node:
extends: .cluster_test_job_daily
needs:
- build_train_multi_node
variables:
GPFSFOLDER: $LOGDIR/dlrm_ib_nvlink_8node
GIT_CLONE_PATH: ${GIT_CLONE_PATH_SELENE}
CONT: $TRAIN_IMAGE_MULTINODE_VERSIONED
MOUNTS: /raid/datasets/criteo/mlperf/40m.limit_preshuffled:/data
WALLTIME: "00:10:00"
SBATCH_OTHER_PARAMS: --comment=metrics
DGXNNODES: 8
TEST_CMD: ./ci/integration_test/dlrm/ib_nvlink_8node.sub

dlrm_dcnv2_benchmark_8node:
extends: .cluster_test_job_daily
needs:
Expand Down Expand Up @@ -576,20 +535,6 @@ inference_CPU_Memory_check:
DGXNNODES: 1
TEST_CMD: ./ci/post_test/check_cpu_usage.sub

dlrm_14node_check:
# Push logs to gitlab
extends: .cluster_post_test_job_daily
needs:
- dlrm_benchmark_14node
variables:
GPFSFOLDER: $LOGDIR/dlrm_14node_check
GIT_CLONE_PATH: ${GIT_CLONE_PATH_SELENE}
CONT: $TRAIN_IMAGE_VERSIONED
MOUNTS: $LOGDIR/dlrm_benchmark_14node:/logs
WALLTIME: "00:15:00"
DGXNNODES: 1
TEST_CMD: ./ci/post_test/check_dlrm_14node.sub

dlrm_dcnv2_8node_check:
# Push logs to gitlab
extends: .cluster_post_test_job_daily
Expand Down
53 changes: 53 additions & 0 deletions HugeCTR/include/collectives/collective.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <collectives/all_reduce_comm.hpp>
#include <collectives/ib_comm.hpp>
#include <resource_manager.hpp>

namespace HugeCTR {

/**
* @brief GPU resources manager which holds all the resources required by training
*
* An extended GPU Resource manager
*/
class CollectiveManager {
std::shared_ptr<ResourceManager> core_;

#ifdef ENABLE_MPI
std::unique_ptr<IbComm> ib_comm_ = NULL;
#endif
std::shared_ptr<AllReduceInPlaceComm> ar_comm_ = NULL;

public:
CollectiveManager() = default;
CollectiveManager(const std::shared_ptr<ResourceManager>& core) : core_(core) {}

HCTR_DISALLOW_COPY_AND_MOVE(CollectiveManager);

#ifdef ENABLE_MPI
void init_ib_comm();
IbComm* get_ib_comm() const { return ib_comm_.get(); }
void set_ready_to_transfer() {
if (ib_comm_) ib_comm_->set_ready_to_transfer();
}
#endif
void set_ar_comm(AllReduceAlgo algo, bool use_mixed_precision);
AllReduceInPlaceComm* get_ar_comm() const { return ar_comm_.get(); }
};
} // namespace HugeCTR
46 changes: 2 additions & 44 deletions HugeCTR/include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,8 @@ namespace HugeCTR {

#define WARP_SIZE 32

namespace hybrid_embedding {

enum class HybridEmbeddingType;
enum class CommunicationType;

} // namespace hybrid_embedding

enum class Check_t { Sum, None, Unknown };

enum class DataReaderSparse_t { Distributed, Localized };

enum class DataReaderType_t { Norm, Raw, Parquet, RawAsync };

enum class SourceType_t { FileList, Mmap, Parquet };
Expand Down Expand Up @@ -154,36 +145,17 @@ enum class Layer_t {
enum class Embedding_t {
DistributedSlotSparseEmbeddingHash,
LocalizedSlotSparseEmbeddingHash,
LocalizedSlotSparseEmbeddingOneHot,
HybridSparseEmbedding,
None
};

enum class Initializer_t { Default, Uniform, XavierNorm, XavierUniform, Sinusoidal, Zero };

enum class TrainState_t {
Init,
BottomMLPFprop,
TopMLPFprop,
BottomMLPBprop,
TopMLPBprop,
MLPExchangeWgrad,
MLPUpdate,
Finalize
};

enum class Distribution_t { Uniform, PowerLaw };

enum class PowerLaw_t { Long, Medium, Short, Specific };

enum class Tensor_t { Train, Evaluate };

// TODO: Consider to move them into a separate file
struct TrainState {
TrainState_t state = TrainState_t::Init;
cudaEvent_t* event = nullptr;
};

struct AsyncParam {
int num_threads;
int num_batches_per_thread;
Expand All @@ -209,17 +181,6 @@ struct AsyncParam {
is_dense_float(is_dense_float) {}
};

struct HybridEmbeddingParam {
size_t max_num_frequent_categories;
int64_t max_num_infrequent_samples;
double p_dup_max;
double max_all_reduce_bandwidth;
double max_all_to_all_bandwidth;
double efficiency_bandwidth_ratio;
hybrid_embedding::CommunicationType communication_type;
hybrid_embedding::HybridEmbeddingType hybrid_embedding_type;
};

typedef struct DataSetHeader_ {
long long error_check; // 0: no error check; 1: check_sum
long long number_of_records; // the number of samples in this data file
Expand Down Expand Up @@ -278,7 +239,6 @@ struct DataReaderSparseParam {
std::vector<bool> is_slot_fixed_length;
int slot_num;

DataReaderSparse_t type;
int max_feature_num;
int max_nnz;

Expand All @@ -289,8 +249,7 @@ struct DataReaderSparseParam {
nnz_per_slot(nnz_per_slot_),
is_fixed_length(is_fixed_length_),
is_slot_fixed_length(std::vector<bool>(slot_num_, is_fixed_length_)),
slot_num(slot_num_),
type(DataReaderSparse_t::Distributed) {
slot_num(slot_num_) {
HCTR_CHECK_HINT(slot_num_ > 0, "Illegal value for slot_num!");
if (static_cast<size_t>(slot_num_) != nnz_per_slot_.size()) {
HCTR_OWN_THROW(Error_t::WrongInput, "slot num != nnz_per_slot.size().");
Expand All @@ -312,8 +271,7 @@ struct DataReaderSparseParam {
nnz_per_slot(slot_num_, nnz_per_slot_),
is_fixed_length(is_fixed_length_),
is_slot_fixed_length(std::vector<bool>(slot_num_, is_fixed_length_)),
slot_num(slot_num_),
type(DataReaderSparse_t::Distributed) {
slot_num(slot_num_) {
HCTR_CHECK_HINT(slot_num_ > 0, "Illegal value for slot_num!");
for (size_t i = 0; i < nnz_per_slot.size(); i++) {
if (nnz_per_slot[i] == 1) {
Expand Down
2 changes: 1 addition & 1 deletion HugeCTR/include/data_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <fstream>
#include <gpu_resource.hpp>
#include <io/filesystem.hpp>
#include <resource_managers/resource_manager_ext.hpp>
#include <resource_managers/resource_manager_core.hpp>
#include <tensor2.hpp>
#include <utils.hpp>
#include <vector>
Expand Down
74 changes: 0 additions & 74 deletions HugeCTR/include/data_readers/async_reader/async_reader.hpp

This file was deleted.

Loading

0 comments on commit 2f28608

Please sign in to comment.