Skip to content

Commit

Permalink
More CosmoFlow Fixes (#2461)
Browse files Browse the repository at this point in the history
* Fix minibatch counting in python dataset reader

* Add clarifying comment

* Fix hang with hydrogen updates

* Add cosine scheduler to args
  • Loading branch information
fiedorowicz1 authored Jul 25, 2024
1 parent 68484bd commit 7d69781
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 12 deletions.
22 changes: 14 additions & 8 deletions applications/physics/cosmology/cosmoflow/cosmoflow_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ def construct_cosmoflow_model(parallel_strategy,
min_distconv_width,
mlperf,
transform_input,
dropout_keep_prob=0.5):
dropout_keep_prob=0.5,
cosine_schedule=None):

# Construct layer graph
universes = lbann.Input(data_field='samples')
Expand Down Expand Up @@ -66,15 +67,20 @@ def construct_cosmoflow_model(parallel_strategy,
# lbann.CallbackLinearGrowthLearningRate(target=learning_rate, num_epochs=5),
# lbann.CallbackSetLearningRate(step=32, val=0.25 * learning_rate),
# lbann.CallbackSetLearningRate(step=64, val=0.125 * learning_rate),
# lbann.CallbackCosineDecayLearningRate(
# lr_max=1e-3,
# lr_min=1e-5,
# decay_steps=10000,
# initial_warmup_learning_rate=0,
# warmup_steps=100
# ),
lbann.CallbackProgressBar(newline_interval=1, print_mem_usage=True)
]

if cosine_schedule:
callbacks.append(
lbann.CallbackCosineDecayLearningRate(
lr_max=learning_rate,
lr_min=cosine_schedule['lr_min'],
decay_steps=cosine_schedule['decay_steps'],
initial_warmup_learning_rate=cosine_schedule['init_warmup_lr'],
warmup_steps=cosine_schedule['warmup_steps']
)
)

return lbann.Model(
epochs=num_epochs,
layers=layers,
Expand Down
28 changes: 26 additions & 2 deletions applications/physics/cosmology/cosmoflow/train_cosmoflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,21 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
parser.add_argument(
'--dropout-keep-prob', action='store', type=float, default=0.5,
help='Probability of keeping activations in dropout layers (default: 0.5). Set to 1 to disable dropout')
parser.add_argument(
'--cosine-schedule', action='store_true',
help='Use cosine learning rate scheduler')
parser.add_argument(
'--lr-min', action='store', type=float, default=0.,
help='Minimum leaning rate for cosine scheduler')
parser.add_argument(
'--decay-steps', action='store', type=int, default=50000,
help='Steps to decay learning rate over for cosine scheduler')
parser.add_argument(
'--init-warmup-lr', action='store', type=float, default=0.,
help='Initial warmup learning rate for cosine scheduler')
parser.add_argument(
'--warmup-steps', action='store', type=int, default=1000,
help='Number of steps to warmup learnign rate over with cosine scheduler')

# Parallelism arguments
parser.add_argument(
Expand Down Expand Up @@ -226,6 +241,15 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
channel_groups=args.channel_groups,
filter_groups=args.filter_groups,
depth_groups=args.depth_groups)

cosine_scheduler_args = None
if args.cosine_schedule:
cosine_scheduler_args = {
'lr_min': args.lr_min,
'decay_steps': args.decay_steps,
'init_warmup_lr': args.init_warmup_lr,
'warmup_steps': args.warmup_steps
}

model = cosmoflow_model.construct_cosmoflow_model(parallel_strategy=parallel_strategy,
local_batchnorm=args.local_batchnorm,
Expand All @@ -237,14 +261,14 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
min_distconv_width=args.min_distconv_width,
mlperf=args.mlperf,
transform_input=args.transform_input,
dropout_keep_prob=args.dropout_keep_prob)
dropout_keep_prob=args.dropout_keep_prob,
cosine_schedule=cosine_scheduler_args)

# Add profiling callbacks if needed.
model.callbacks.extend(lbann.contrib.args.create_profile_callbacks(args))

# Setup optimizer
optimizer = lbann.contrib.args.create_optimizer(args)
# optimizer.learn_rate *= 1e-2

# Setup data reader
serialize_io = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class python_dataset_reader : public generic_data_reader
#ifdef LBANN_HAS_DISTCONV
/** @brief Whether or not tensor needs shuffling for distconv. */
bool m_tensor_shuffle_required = true;
/** @brief The current number of minibatches in the epoch that have been
* fetched and returned by fetch_data_block. This is not the same as
* m_dataset_minibatch_offset as that variable tracks the number of
* minibatches that have been queued which can be several minibatches ahead of
* the current one we are returning. */
uint64_t m_fetched_minibatch_count;
#endif // LBANN_HAS_DISTCONV
};

Expand Down
27 changes: 25 additions & 2 deletions src/data_ingestion/readers/data_reader_python_dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr)
// in a batch that can't be split evenly will be split evenly across the
// first n ranks (or subsets of ranks in the distconv case).

#ifndef LBANN_BUILT_WITH_SPECTRUM
auto syncInfo = El::SyncInfo<El::Device::CPU>{};
El::mpi::EnsureComm<DataType, El::Collective::SEND>(m_comm->get_world_comm(),
syncInfo);
El::mpi::EnsureComm<DataType, El::Collective::RECV>(m_comm->get_world_comm(),
syncInfo);
#endif

uint64_t rank = m_comm->get_rank_in_trainer();
uint64_t nprocs = m_comm->get_procs_per_trainer();
uint64_t trainer_rank = m_comm->get_trainer_rank();
Expand All @@ -187,13 +195,14 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr)
execution_mode mode = exec_mode_from_string(get_role());
dataset& ds = get_trainer().get_data_coordinator().get_dataset(mode);
uint64_t global_mb_size{};
if (m_dataset_minibatch_offset < (ds.get_num_iterations_per_epoch() - 1)) {
if (m_fetched_minibatch_count < (ds.get_num_iterations_per_epoch() - 1)) {
global_mb_size = ds.get_mini_batch_size();
}
else if (m_dataset_minibatch_offset ==
else if (m_fetched_minibatch_count ==
(ds.get_num_iterations_per_epoch() - 1)) {
global_mb_size = ds.get_last_mini_batch_size();
}
m_fetched_minibatch_count++;

uint64_t local_mb_size = global_mb_size / nprocs;
uint64_t extra_samples = global_mb_size % nprocs;
Expand Down Expand Up @@ -234,10 +243,21 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr)
}
}
else if (rank == recv_rank) {
#ifdef LBANN_BUILT_WITH_SPECTRUM
EL_CHECK_MPI_CALL(
MPI_Recv(&responses_ptr[recv_rank_count * m_num_responses],
m_num_responses * sizeof(DataType),
MPI_BYTE,
m_comm->get_world_rank(trainer_rank, send_rank),
0,
m_comm->get_world_comm().GetMPIComm(),
nullptr));
#else
m_comm->recv(&responses_ptr[recv_rank_count * m_num_responses],
m_num_responses,
trainer_rank,
send_rank);
#endif
}

send_rank_count += 1;
Expand Down Expand Up @@ -344,6 +364,9 @@ void python_dataset_reader::queue_epoch()
m_dataset_minibatch_offset = 0;
m_dataset_sample_offset = 0;
m_queued_samples = 0;
#ifdef LBANN_HAS_DISTCONV
m_fetched_minibatch_count = 0;
#endif // LBANN_HAS_DISTCONV

// Prefetch the first set of samples (if less than minibatch size, the first
// minibatch read will take care of the rest)
Expand Down

0 comments on commit 7d69781

Please sign in to comment.