From df9f6e9a900f6ae6b44610fa3774befc75386b31 Mon Sep 17 00:00:00 2001 From: tjb3 Date: Mon, 8 Jan 2024 23:56:26 -0500 Subject: [PATCH] Fixed incorrect usage of trainer main process checks Added with open usage to ensure better file closing as suggested from PR Added rotate_checkpoints into main process logic --- src/transformers/trainer.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 33d8700e6d5b77..d4f07aa97c7c92 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -2382,25 +2382,23 @@ def _save_checkpoint(self, model, trial, metrics=None): # Place checkpoint in final location after all saving is finished. # First wait for everyone to finish writing self.args.distributed_state.wait_for_everyone() - # Then go through the rewriting process, only renaming from main process(es) - if staging_output_dir != output_dir: - if ( - self.is_local_process_zero - if self.args.save_on_each_node - else self.is_world_process_zero - ): + + # Then go through the rewriting process, only renaming and rotating from main process(es) + if self.is_local_process_zero() if self.args.save_on_each_node else self.is_world_process_zero(): + if staging_output_dir != output_dir: if os.path.exists(staging_output_dir): os.rename(staging_output_dir, output_dir) # Ensure rename completed in cases where os.rename is not atomic - fd = os.open(output_dir, os.O_RDONLY) - os.fsync(fd) + with open(output_dir, "r") as f: + f.flush() + os.fsync(f.fileno()) - self.args.distributed_state.wait_for_everyone() + # Maybe delete some older checkpoints. + if self.args.should_save: + self._rotate_checkpoints(use_mtime=True, output_dir=run_dir) - # Maybe delete some older checkpoints. - if self.args.should_save: - self._rotate_checkpoints(use_mtime=True, output_dir=run_dir) + self.args.distributed_state.wait_for_everyone() def _save_rng_state(self, output_dir): # Save RNG state in non-distributed training