Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More CosmoFlow Fixes #2461

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions applications/physics/cosmology/cosmoflow/cosmoflow_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ def construct_cosmoflow_model(parallel_strategy,
min_distconv_width,
mlperf,
transform_input,
dropout_keep_prob=0.5):
dropout_keep_prob=0.5,
cosine_schedule=None):

# Construct layer graph
universes = lbann.Input(data_field='samples')
Expand Down Expand Up @@ -66,15 +67,20 @@ def construct_cosmoflow_model(parallel_strategy,
# lbann.CallbackLinearGrowthLearningRate(target=learning_rate, num_epochs=5),
# lbann.CallbackSetLearningRate(step=32, val=0.25 * learning_rate),
# lbann.CallbackSetLearningRate(step=64, val=0.125 * learning_rate),
# lbann.CallbackCosineDecayLearningRate(
# lr_max=1e-3,
# lr_min=1e-5,
# decay_steps=10000,
# initial_warmup_learning_rate=0,
# warmup_steps=100
# ),
lbann.CallbackProgressBar(newline_interval=1, print_mem_usage=True)
]

if cosine_schedule:
callbacks.append(
lbann.CallbackCosineDecayLearningRate(
lr_max=learning_rate,
lr_min=cosine_schedule['lr_min'],
decay_steps=cosine_schedule['decay_steps'],
initial_warmup_learning_rate=cosine_schedule['init_warmup_lr'],
warmup_steps=cosine_schedule['warmup_steps']
)
)

return lbann.Model(
epochs=num_epochs,
layers=layers,
Expand Down
28 changes: 26 additions & 2 deletions applications/physics/cosmology/cosmoflow/train_cosmoflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,21 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
parser.add_argument(
'--dropout-keep-prob', action='store', type=float, default=0.5,
help='Probability of keeping activations in dropout layers (default: 0.5). Set to 1 to disable dropout')
parser.add_argument(
'--cosine-schedule', action='store_true',
help='Use cosine learning rate scheduler')
parser.add_argument(
'--lr-min', action='store', type=float, default=0.,
help='Minimum leaning rate for cosine scheduler')
parser.add_argument(
'--decay-steps', action='store', type=int, default=50000,
help='Steps to decay learning rate over for cosine scheduler')
parser.add_argument(
'--init-warmup-lr', action='store', type=float, default=0.,
help='Initial warmup learning rate for cosine scheduler')
parser.add_argument(
'--warmup-steps', action='store', type=int, default=1000,
help='Number of steps to warmup learnign rate over with cosine scheduler')

# Parallelism arguments
parser.add_argument(
Expand Down Expand Up @@ -226,6 +241,15 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
channel_groups=args.channel_groups,
filter_groups=args.filter_groups,
depth_groups=args.depth_groups)

cosine_scheduler_args = None
if args.cosine_schedule:
cosine_scheduler_args = {
'lr_min': args.lr_min,
'decay_steps': args.decay_steps,
'init_warmup_lr': args.init_warmup_lr,
'warmup_steps': args.warmup_steps
}

model = cosmoflow_model.construct_cosmoflow_model(parallel_strategy=parallel_strategy,
local_batchnorm=args.local_batchnorm,
Expand All @@ -237,14 +261,14 @@ def create_synthetic_data_reader(input_width: int, num_responses: int) -> Any:
min_distconv_width=args.min_distconv_width,
mlperf=args.mlperf,
transform_input=args.transform_input,
dropout_keep_prob=args.dropout_keep_prob)
dropout_keep_prob=args.dropout_keep_prob,
cosine_schedule=cosine_scheduler_args)

# Add profiling callbacks if needed.
model.callbacks.extend(lbann.contrib.args.create_profile_callbacks(args))

# Setup optimizer
optimizer = lbann.contrib.args.create_optimizer(args)
# optimizer.learn_rate *= 1e-2

# Setup data reader
serialize_io = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ class python_dataset_reader : public generic_data_reader
#ifdef LBANN_HAS_DISTCONV
/** @brief Whether or not tensor needs shuffling for distconv. */
bool m_tensor_shuffle_required = true;
/** @brief The current number of minibatches in the epoch that have been
* fetched and returned by fetch_data_block. This is not the same as
* m_dataset_minibatch_offset as that variable tracks the number of
* minibatches that have been queued which can be several minibatches ahead of
* the current one we are returning. */
uint64_t m_fetched_minibatch_count;
#endif // LBANN_HAS_DISTCONV
};

Expand Down
27 changes: 25 additions & 2 deletions src/data_ingestion/readers/data_reader_python_dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr)
// in a batch that can't be split evenly will be split evenly across the
// first n ranks (or subsets of ranks in the distconv case).

#ifndef LBANN_BUILT_WITH_SPECTRUM
auto syncInfo = El::SyncInfo<El::Device::CPU>{};
El::mpi::EnsureComm<DataType, El::Collective::SEND>(m_comm->get_world_comm(),
syncInfo);
El::mpi::EnsureComm<DataType, El::Collective::RECV>(m_comm->get_world_comm(),
syncInfo);
#endif

uint64_t rank = m_comm->get_rank_in_trainer();
uint64_t nprocs = m_comm->get_procs_per_trainer();
uint64_t trainer_rank = m_comm->get_trainer_rank();
Expand All @@ -187,13 +195,14 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr)
execution_mode mode = exec_mode_from_string(get_role());
dataset& ds = get_trainer().get_data_coordinator().get_dataset(mode);
uint64_t global_mb_size{};
if (m_dataset_minibatch_offset < (ds.get_num_iterations_per_epoch() - 1)) {
if (m_fetched_minibatch_count < (ds.get_num_iterations_per_epoch() - 1)) {
global_mb_size = ds.get_mini_batch_size();
}
else if (m_dataset_minibatch_offset ==
else if (m_fetched_minibatch_count ==
(ds.get_num_iterations_per_epoch() - 1)) {
global_mb_size = ds.get_last_mini_batch_size();
}
m_fetched_minibatch_count++;

uint64_t local_mb_size = global_mb_size / nprocs;
uint64_t extra_samples = global_mb_size % nprocs;
Expand Down Expand Up @@ -234,10 +243,21 @@ void python_dataset_reader::shuffle_responses(DataType* responses_ptr)
}
}
else if (rank == recv_rank) {
#ifdef LBANN_BUILT_WITH_SPECTRUM
EL_CHECK_MPI_CALL(
MPI_Recv(&responses_ptr[recv_rank_count * m_num_responses],
m_num_responses * sizeof(DataType),
MPI_BYTE,
m_comm->get_world_rank(trainer_rank, send_rank),
0,
m_comm->get_world_comm().GetMPIComm(),
nullptr));
#else
m_comm->recv(&responses_ptr[recv_rank_count * m_num_responses],
m_num_responses,
trainer_rank,
send_rank);
#endif
}

send_rank_count += 1;
Expand Down Expand Up @@ -344,6 +364,9 @@ void python_dataset_reader::queue_epoch()
m_dataset_minibatch_offset = 0;
m_dataset_sample_offset = 0;
m_queued_samples = 0;
#ifdef LBANN_HAS_DISTCONV
m_fetched_minibatch_count = 0;
#endif // LBANN_HAS_DISTCONV

// Prefetch the first set of samples (if less than minibatch size, the first
// minibatch read will take care of the rest)
Expand Down
Loading