From 6015d0ad6ce946e763fa1052614317ff34825d33 Mon Sep 17 00:00:00 2001 From: Zach Mueller Date: Wed, 10 Jan 2024 06:03:13 -0500 Subject: [PATCH] Support `DeepSpeed` when using auto find batch size (#28088) Fixup test --- src/transformers/integrations/deepspeed.py | 13 ++++-- src/transformers/trainer.py | 42 +++++++++++++------- tests/trainer/test_trainer.py | 46 ++++++++++++++++++++++ 3 files changed, 84 insertions(+), 17 deletions(-) diff --git a/src/transformers/integrations/deepspeed.py b/src/transformers/integrations/deepspeed.py index 101610af555f1c..92cc1a4b0e5947 100644 --- a/src/transformers/integrations/deepspeed.py +++ b/src/transformers/integrations/deepspeed.py @@ -129,7 +129,7 @@ def fill_match(self, ds_key_long, hf_val, hf_key=None, must_match=True): fill_only = partialmethod(fill_match, must_match=False) - def trainer_config_process(self, args): + def trainer_config_process(self, args, auto_find_batch_size=False): """ Adjust the config with `TrainingArguments` values. This stage is run during `TrainingArguments` object creation. @@ -138,10 +138,15 @@ def trainer_config_process(self, args): # train_batch_size = world_size * train_micro_batch_size_per_gpu * gradient_accumulation_steps train_batch_size = args.world_size * args.per_device_train_batch_size * args.gradient_accumulation_steps self.fill_match( - "train_micro_batch_size_per_gpu", args.per_device_train_batch_size, "per_device_train_batch_size" + "train_micro_batch_size_per_gpu", + args.per_device_train_batch_size, + "per_device_train_batch_size", + not auto_find_batch_size, ) self.fill_match("gradient_accumulation_steps", args.gradient_accumulation_steps, "gradient_accumulation_steps") - self.fill_match("train_batch_size", train_batch_size, "train_batch_size (calculated)") + self.fill_match( + "train_batch_size", train_batch_size, "train_batch_size (calculated)", not auto_find_batch_size + ) self.fill_match("gradient_clipping", args.max_grad_norm, "max_grad_norm") self.fill_match("optimizer.params.lr", args.learning_rate, "learning_rate") @@ -336,6 +341,8 @@ def deepspeed_init(trainer, num_training_steps, inference=False): num_training_steps: per single gpu resume_from_checkpoint: path to a checkpoint if to resume from after normal DeepSpeedEngine load inference: launch in inference mode (no optimizer and no lr scheduler) + auto_find_batch_size: whether to ignore the `train_micro_batch_size_per_gpu` argument as it's being + set automatically by the auto batch size finder Returns: optimizer, lr_scheduler diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 2bac51fdf04910..cd46eb4d1c14bb 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -1506,13 +1506,9 @@ def train( if resume_from_checkpoint is None: raise ValueError(f"No valid checkpoint found in output directory ({args.output_dir})") - if ( - resume_from_checkpoint is not None - and not is_sagemaker_mp_enabled() - and not self.is_deepspeed_enabled - and not self.is_fsdp_enabled - ): - self._load_from_checkpoint(resume_from_checkpoint) + if resume_from_checkpoint is not None: + if not is_sagemaker_mp_enabled() and not self.is_deepspeed_enabled and not self.is_fsdp_enabled: + self._load_from_checkpoint(resume_from_checkpoint) # In case of repeating the find_executable_batch_size, set `self._train_batch_size` properly state = TrainerState.load_from_json(os.path.join(resume_from_checkpoint, TRAINER_STATE_NAME)) if state.train_batch_size is not None: @@ -1553,6 +1549,19 @@ def _inner_training_loop( self.accelerator.free_memory() self._train_batch_size = batch_size if self.args.auto_find_batch_size: + if self.state.train_batch_size != self._train_batch_size: + from accelerate.utils import release_memory + + (self.model_wrapped,) = release_memory(self.model_wrapped) + self.model_wrapped = self.model + + # Check for DeepSpeed *after* the intial pass and modify the config + if self.is_deepspeed_enabled: + # Temporarily unset `self.args.train_batch_size` + original_bs = self.args.per_device_train_batch_size + self.args.per_device_train_batch_size = self._train_batch_size // max(1, self.args.n_gpu) + self.propagate_args_to_deepspeed(True) + self.args.per_device_train_batch_size = original_bs self.state.train_batch_size = self._train_batch_size logger.debug(f"Currently training with a batch size of: {self._train_batch_size}") # Data loader and number of training steps @@ -3944,12 +3953,17 @@ def create_accelerator_and_postprocess(self): "when using FSDP." ) - if self.is_deepspeed_enabled: - if getattr(self.args, "hf_deepspeed_config", None) is None: - from transformers.integrations.deepspeed import HfTrainerDeepSpeedConfig + if self.is_deepspeed_enabled and getattr(self.args, "hf_deepspeed_config", None) is None: + self.propagate_args_to_deepspeed() + + def propagate_args_to_deepspeed(self, auto_find_batch_size=False): + """ + Sets values in the deepspeed plugin based on the Trainer args + """ + from transformers.integrations.deepspeed import HfTrainerDeepSpeedConfig - ds_plugin = self.accelerator.state.deepspeed_plugin + ds_plugin = self.accelerator.state.deepspeed_plugin - ds_plugin.hf_ds_config = HfTrainerDeepSpeedConfig(ds_plugin.hf_ds_config.config) - ds_plugin.deepspeed_config = ds_plugin.hf_ds_config.config - ds_plugin.hf_ds_config.trainer_config_process(self.args) + ds_plugin.hf_ds_config = HfTrainerDeepSpeedConfig(ds_plugin.hf_ds_config.config) + ds_plugin.deepspeed_config = ds_plugin.hf_ds_config.config + ds_plugin.hf_ds_config.trainer_config_process(self.args, auto_find_batch_size) diff --git a/tests/trainer/test_trainer.py b/tests/trainer/test_trainer.py index 813312c53338ac..d34a72b2ca5140 100644 --- a/tests/trainer/test_trainer.py +++ b/tests/trainer/test_trainer.py @@ -57,6 +57,7 @@ get_tests_dir, is_staging_test, require_accelerate, + require_deepspeed, require_intel_extension_for_pytorch, require_optuna, require_ray, @@ -1551,6 +1552,51 @@ def test_auto_batch_size_finder(self): with patch.object(sys, "argv", testargs): run_glue.main() + @require_deepspeed + def test_auto_batch_size_with_resume_from_checkpoint_with_deepspeed(self): + train_dataset = RegressionDataset(length=128) + + config = RegressionModelConfig(a=0, b=2) + model = RegressionRandomPreTrainedModel(config) + + tmp_dir = self.get_auto_remove_tmp_dir() + + class MockCudaOOMCallback(TrainerCallback): + def on_step_end(self, args, state, control, **kwargs): + # simulate OOM on the first step + if state.train_batch_size >= 16: + raise RuntimeError("CUDA out of memory.") + + deepspeed = { + "zero_optimization": { + "stage": 1, + }, + "train_batch_size": "auto", + "train_micro_batch_size_per_gpu": "auto", + } + + args = RegressionTrainingArguments( + tmp_dir, + do_train=True, + max_steps=2, + save_steps=1, + per_device_train_batch_size=16, + auto_find_batch_size=True, + deepspeed=deepspeed, + ) + trainer = Trainer(model, args, train_dataset=train_dataset, callbacks=[MockCudaOOMCallback()]) + trainer.train() + # After `auto_find_batch_size` is ran we should now be at 8 + self.assertEqual(trainer._train_batch_size, 8) + + # We can then make a new Trainer + trainer = Trainer(model, args, train_dataset=train_dataset) + # Check we are at 16 to start + self.assertEqual(trainer._train_batch_size, 16 * max(trainer.args.n_gpu, 1)) + trainer.train(resume_from_checkpoint=True) + # We should be back to 8 again, picking up based upon the last ran Trainer + self.assertEqual(trainer._train_batch_size, 8) + def test_auto_batch_size_with_resume_from_checkpoint(self): train_dataset = RegressionDataset(length=128)