From bd2b2ef1405845696d546a02c6d80a4fe5a8316e Mon Sep 17 00:00:00 2001 From: Max Kovalenko <75629718+deepcharm@users.noreply.github.com> Date: Wed, 22 May 2024 23:28:02 +0300 Subject: [PATCH] Add throughput timer configuration (#5363) The new "timers" section describes configuration for different timers. Specifically, in the "throughput" section, it is possible to disable the throughput timer (enabled by default). This allows to avoid the performance degradation whenever the throughput measurement is not needed, for example in production environment. No device synchronize() is invoked when "synchronized" is set to False (default is True). This allows to produce approximate throughput measurements with minimal performance penalty. --------- Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> Co-authored-by: Olatunji Ruwase --- deepspeed/runtime/config.py | 4 +++ deepspeed/runtime/engine.py | 9 +++---- deepspeed/runtime/pipe/engine.py | 3 ++- deepspeed/utils/config.py | 46 ++++++++++++++++++++++++++++++++ deepspeed/utils/timer.py | 20 +++++++------- 5 files changed, 65 insertions(+), 17 deletions(-) create mode 100644 deepspeed/utils/config.py diff --git a/deepspeed/runtime/config.py b/deepspeed/runtime/config.py index 19b169086be1..04b122963a38 100755 --- a/deepspeed/runtime/config.py +++ b/deepspeed/runtime/config.py @@ -66,6 +66,8 @@ from .data_pipeline.config import get_data_efficiency_enabled, get_data_efficiency_config, get_curriculum_enabled_legacy, get_curriculum_params_legacy from .data_pipeline.constants import * +from ..utils.config import get_timers_config + TENSOR_CORE_ALIGN_SIZE = 8 ADAGRAD_OPTIMIZER = 'adagrad' @@ -911,6 +913,8 @@ def _initialize_params(self, param_dict): self.compile_config = get_compile_config(param_dict) + self.timers_config = get_timers_config(param_dict) + def _batch_assertion(self): train_batch = self.train_batch_size diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index 13f335cae6d5..be153b4b4948 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -271,11 +271,10 @@ def __init__(self, # Configure wall clock timers self.timers = SynchronizedWallClockTimer() # Throughput timer - self.tput_timer = ThroughputTimer( - batch_size=self.train_batch_size(), - steps_per_output=self.steps_per_print(), - monitor_memory=False, - ) + self.tput_timer = ThroughputTimer(self._config.timers_config, + batch_size=self.train_batch_size(), + steps_per_output=self.steps_per_print(), + monitor_memory=False) log_dist(f"DeepSpeed Flops Profiler Enabled: {self.flops_profiler_enabled()}", ranks=[0]) diff --git a/deepspeed/runtime/pipe/engine.py b/deepspeed/runtime/pipe/engine.py index be8fe1a368c6..f661e779954b 100644 --- a/deepspeed/runtime/pipe/engine.py +++ b/deepspeed/runtime/pipe/engine.py @@ -117,7 +117,8 @@ def __init__(self, has_bool_tensors=False, *super_args, **super_kwargs): self._force_grad_boundary = False - self.batch_timer = ThroughputTimer(batch_size=self.train_batch_size(), + self.batch_timer = ThroughputTimer(self._config.timers_config, + batch_size=self.train_batch_size(), logging_fn=self.tput_log, monitor_memory=False, steps_per_output=self.steps_per_print()) diff --git a/deepspeed/utils/config.py b/deepspeed/utils/config.py new file mode 100644 index 000000000000..15f37ca7d874 --- /dev/null +++ b/deepspeed/utils/config.py @@ -0,0 +1,46 @@ +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: Apache-2.0 + +# DeepSpeed Team + +from deepspeed.runtime.config_utils import DeepSpeedConfigModel + +######################################### +# Timers +######################################### +# Timers. By default, timers are enabled. +# Users can configure in ds_config.json as below example: +TIMERS_FORMAT = ''' +Timers should be enabled as: +"timers": { + "throughput": { + "enabled": true, + "synchronized": true + } +} +''' + +TIMERS = "timers" +TIMERS_THROUGHPUT = "throughput" + + +def get_timers_config(param_dict): + if param_dict and TIMERS in param_dict and TIMERS_THROUGHPUT in param_dict[TIMERS]: + timers_config_dict = param_dict[TIMERS][TIMERS_THROUGHPUT] + else: + timers_config_dict = {} + return DeepSpeedThroughputTimerConfig(**timers_config_dict) + + +class DeepSpeedThroughputTimerConfig(DeepSpeedConfigModel): + """ Configure throughput timers """ + + enabled: bool = True + """ Turn on/off throughput timers """ + + synchronized: bool = True + """ Whether to synchronize a device when measuring the time. + Synchronizing a device is required to produce the most accurate timer measurements. + However, this comes at the expense of performance degradation. The CPU timer provides + sufficient accuracy in many cases. + """ diff --git a/deepspeed/utils/timer.py b/deepspeed/utils/timer.py index 11ef54fe4665..dd78b207cc37 100755 --- a/deepspeed/utils/timer.py +++ b/deepspeed/utils/timer.py @@ -197,15 +197,9 @@ def get_mean(self, names, normalizer=1.0, reset=True): class ThroughputTimer: - def __init__( - self, - batch_size, - start_step=2, - steps_per_output=50, - monitor_memory=False, - logging_fn=None, - ): + def __init__(self, config, batch_size, start_step=2, steps_per_output=50, monitor_memory=False, logging_fn=None): from deepspeed.utils import logger + self.config = config self.start_time = 0 self.end_time = 0 self.started = False @@ -234,14 +228,17 @@ def _init_timer(self): self.initialized = True def start(self): + if not self.config.enabled: + return self._init_timer() self.started = True if self.global_step_count >= self.start_step: - get_accelerator().synchronize() + if self.config.synchronized: + get_accelerator().synchronize() self.start_time = time.time() def stop(self, global_step=False, report_speed=True): - if not self.started: + if not self.config.enabled or not self.started: return self.started = False self.micro_step_count += 1 @@ -249,7 +246,8 @@ def stop(self, global_step=False, report_speed=True): self.global_step_count += 1 if self.start_time > 0: - get_accelerator().synchronize() + if self.config.synchronized: + get_accelerator().synchronize() self.end_time = time.time() duration = self.end_time - self.start_time self.total_elapsed_time += duration