From fac1df9983510ac7a04799b2829bf6e1cea08bd8 Mon Sep 17 00:00:00 2001 From: Nadav Elyahu <88962733+nelyahu@users.noreply.github.com> Date: Thu, 15 Feb 2024 21:37:55 +0200 Subject: [PATCH 01/18] tensorboard logging: avoid item() outside gas to improve performance (#5135) item() causes device/host synchronization Co-authored-by: Olatunji Ruwase --- deepspeed/runtime/engine.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index fe0fa8ad793d..eb17c6955b8f 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -230,7 +230,7 @@ def __init__(self, self._is_gradient_accumulation_boundary = None self.scale_wrt_gas = None - self.losses = 0.0 + self.losses = None # for debug purposes - can then debug print: debug_get_module_name(module) debug_extract_module_and_param_names(model) @@ -1951,13 +1951,14 @@ def backward(self, loss, allreduce_gradients=True, release_loss=False, retain_gr loss = self._scale_loss_by_gas(loss.float()) # Log training loss - self.losses += loss.mean().item() + mean_loss = loss.mean().detach() + self.losses = mean_loss if self.losses is None else self.losses + mean_loss if self.monitor.enabled: if self.is_gradient_accumulation_boundary(): if self.global_rank == 0: self.summary_events = [( f"Train/Samples/train_loss", - self.losses, + self.losses.item(), self.global_samples, )] self.monitor.write_events(self.summary_events) @@ -2123,7 +2124,7 @@ def _take_model_step(self, lr_kwargs, block_eigenvalue={}): if report_progress and (self.global_steps + 1) % self.steps_per_print() == 0: self._report_progress(self.global_steps + 1) - self.losses = 0.0 + self.losses = None self.global_steps += 1 self.global_samples += self.train_batch_size() From 1c9e5ef2d214b0dac0a66e17df20ef6d0c6092a9 Mon Sep 17 00:00:00 2001 From: BacharL Date: Thu, 15 Feb 2024 21:39:03 +0200 Subject: [PATCH 02/18] Check overflow on device without host synchronization for each tensor (#5115) Co-authored-by: Olatunji Ruwase --- deepspeed/runtime/zero/stage_1_and_2.py | 47 +++++++++---------------- 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/deepspeed/runtime/zero/stage_1_and_2.py b/deepspeed/runtime/zero/stage_1_and_2.py index 18b58403f1d7..18448adcabc3 100755 --- a/deepspeed/runtime/zero/stage_1_and_2.py +++ b/deepspeed/runtime/zero/stage_1_and_2.py @@ -1970,24 +1970,26 @@ def _check_overflow(self, partition_gradients=True): self.overflow = self.has_overflow(partition_gradients) # `params` is a list / generator of torch.Variable - def has_overflow_serial(self, params, is_grad_list=False): + def has_overflow_serial(self, params): + invalid_grad_count = torch.zeros([1], dtype=torch.float, device=get_accelerator().current_device_name()) for p in params: - if p.grad is not None and self._has_inf_or_nan(p.grad.data): - return True - - return False + if p.grad is not None: + invalid_grad_count += self._has_inf_or_nan(p.grad) + return invalid_grad_count.bool() def has_overflow_partitioned_grads_serial(self): + invalid_grad_count = torch.zeros([1], dtype=torch.float, device=get_accelerator().current_device_name()) for i in range(len(self.bit16_groups)): for j, grad in enumerate(self.averaged_gradients[i]): - if grad is not None and self._has_inf_or_nan(grad.data, j): - return True - return False + if grad is not None: + invalid_grad_count += self._has_inf_or_nan(grad) + return invalid_grad_count.bool() def has_overflow(self, partition_gradients=True): if partition_gradients: overflow = self.local_overflow if self.cpu_offload else self.has_overflow_partitioned_grads_serial() - overflow_gpu = get_accelerator().ByteTensor([overflow]) + overflow_gpu = get_accelerator().ByteTensor([overflow]) if self.cpu_offload else overflow.byte().to( + get_accelerator().current_device_name()) '''This will capture overflow across all data parallel and expert parallel process Since expert parallel process are a subset of data parallel process''' dist.all_reduce(overflow_gpu, op=dist.ReduceOp.MAX, group=self.dp_process_group) @@ -1997,9 +1999,7 @@ def has_overflow(self, partition_gradients=True): for group in self.bit16_groups: for param in group: params.append(param) - - overflow = self.has_overflow_serial(params, is_grad_list=partition_gradients) - overflow_gpu = get_accelerator().ByteTensor([overflow]) + overflow_gpu = self.has_overflow_serial(params).byte().to(get_accelerator().current_device_name()) # Since each model parallel GPU carries only part of the model, # make sure overflow flag is synced across all the model parallel GPUs @@ -2011,24 +2011,11 @@ def has_overflow(self, partition_gradients=True): # `x` is a torch.Tensor @staticmethod def _has_inf_or_nan(x, j=None): - try: - # if x is half, the .float() incurs an additional deep copy, but it's necessary if - # Pytorch's .sum() creates a one-element tensor of the same type as x - # (which is true for some recent version of pytorch). - cpu_sum = float(x.float().sum()) - # More efficient version that can be used if .sum() returns a Python scalar - # cpu_sum = float(x.sum()) - except RuntimeError as instance: - # We want to check if inst is actually an overflow exception. - # RuntimeError could come from a different error. - # If so, we still want the exception to propagate. - if "value cannot be converted" not in instance.args[0]: - raise - return True - else: - if cpu_sum == float('inf') or cpu_sum == -float('inf') or cpu_sum != cpu_sum: - return True - return False + float_x = x.float() + nan = float_x.isnan() + inf = float_x.isinf() + inf_or_nan = nan.logical_or(inf) + return inf_or_nan.float().max() def backward(self, loss, retain_graph=False): """ From 39f6ee59769fd5b1d9bfc4a3daedd63195ee73f6 Mon Sep 17 00:00:00 2001 From: Logan Adams <114770087+loadams@users.noreply.github.com> Date: Thu, 15 Feb 2024 13:41:41 -0800 Subject: [PATCH 03/18] Update nv-inference torch version (#5128) We previously had the nv-inference tests pinned to use torch 1.13.1, this updates us to use a newer version. --- .github/workflows/nv-inference.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/nv-inference.yml b/.github/workflows/nv-inference.yml index cc55136d1a4b..0cbe212666de 100644 --- a/.github/workflows/nv-inference.yml +++ b/.github/workflows/nv-inference.yml @@ -32,7 +32,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116 + pip install -U --cache-dir $TORCH_CACHE torch==2.1.2 torchvision==0.16.2 --extra-index-url https://download.pytorch.org/whl/cu118 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -57,6 +57,6 @@ jobs: run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch cd tests - #pytest $PYTEST_OPTS -m 'seq_inference' unit/ --torch_ver="1.13" --cuda_ver="11.6" - pytest $PYTEST_OPTS -m 'inference_ops' unit/ --torch_ver="1.13" --cuda_ver="11.6" - pytest $PYTEST_OPTS --forked -n 4 -m 'inference' unit/ --torch_ver="1.13" --cuda_ver="11.6" + #pytest $PYTEST_OPTS -m 'seq_inference' unit/ --torch_ver="2.1" --cuda_ver="11.8" + pytest $PYTEST_OPTS -m 'inference_ops' unit/ --torch_ver="2.1" --cuda_ver="11.8" + pytest $PYTEST_OPTS --forked -n 4 -m 'inference' unit/ --torch_ver="2.1" --cuda_ver="11.8" From 95be499a6801f7e05a962aa19a03a9e1542b9021 Mon Sep 17 00:00:00 2001 From: Bruno Magalhaes Date: Fri, 16 Feb 2024 09:54:08 +0100 Subject: [PATCH 04/18] Method `run_map_reduce` to fix errors when running `run_map` followed by `run_reduce` (#5131) In the map-reduce in data analysis, the run_reduce will merge several files into one. There are two open issues: - when running `run_map` followed by `run_reduce`, the `run_reduce` may start before all nodes finished the `run_map`, leading to having nodes loading files that are not populated/flush (zero-sized error) - when running `run_reduce`, all nodes are loading the partial result files output by all nodes, and all nodes will write the same files that result from the merge. This leads to strange IO errors. `run_reduce` should only be run by one node, and all nodes should wait for `run_reduce` to finish before they feed the dataset to `deepspeed.initialize()`. This PR fixes both these issues when running `run_map` followed by `run_reduce`, by providing a method `run_map_reduce` that fixes this logic. It adds `dist.barrier`s to both steps (where barrier runs on an user-specifed `comm_group`), and non-master running the reduce operation. **NOTE:** an alternative workaround is by not providing the `run_map_reduce` method in this PR but having the correct barriers and safeguards added to `run_map` and `run_reduce` as in [5130](https://github.com/microsoft/DeepSpeed/pull/5130). --------- Co-authored-by: Conglong Li --- .../data_pipeline/data_sampling/data_analyzer.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py index f8ce0e3fa2bf..a4fbe491f76f 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py @@ -13,6 +13,7 @@ from torch.utils.data import BatchSampler, SequentialSampler, DataLoader, Subset from deepspeed.utils import logger +import deepspeed.comm as dist from .indexed_dataset import MMapIndexedDataset, valid_dtypes from .utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype @@ -432,3 +433,12 @@ def run_reduce(self): else: self.custom_reduce(self.dataset, self.metric_names, self.metric_types, self.save_path, self.num_workers, self.num_threads, self.num_threads_reduce) + + def run_map_reduce(self, comm_group=None): + self.run_map() + # wait for the mapping operation, where all nodes outputs their own (partial) result files + dist.barrier(group=comm_group) + if self.worker_id == 0: + self.run_reduce() + # wait for the reduce, where rank 0 merges all (partial) files. Dataset can then be used by all nodes. + dist.barrier(group=comm_group) From 9bd62e0b2675eae64dedc625d73dd16fa93254c2 Mon Sep 17 00:00:00 2001 From: Bruno Magalhaes Date: Fri, 16 Feb 2024 11:43:51 +0100 Subject: [PATCH 05/18] Added missing `isinstance` check in PR 5112 (#5142) Added missing `ininstance` check in [https://github.com/microsoft/DeepSpeed/pull/5112](https://github.com/microsoft/DeepSpeed/pull/5112). --------- Co-authored-by: Conglong Li --- deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py index a4fbe491f76f..16203e7d7a84 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py @@ -107,7 +107,7 @@ def update_metric_results(self, if metric_type == 'single_value_per_sample': for row in range(metric_values.size()[0]): sample_idx = batch_start_idx + row # sample idx following dataset iteration order - if 'index' in data: # Megatron use case, sample idx provided in 'index' field + if isinstance(data, dict) and 'index' in data: # Megatron use case, idx provided in 'index' field sample_idx = data['index'][row][0].item() elif self.sample_indices is not None: # user defined shuffling of indices sample_idx = self.sample_indices[sample_idx] From a3be0d441ac5ac6df29a650f5ee7ca7c0d1d337e Mon Sep 17 00:00:00 2001 From: Nadav Elyahu <88962733+nelyahu@users.noreply.github.com> Date: Fri, 16 Feb 2024 18:09:07 +0200 Subject: [PATCH 06/18] TestEmptyParameterGroup: replace fusedAdam with torch.optim.AdamW (#5139) to avoid cuda specific optimizer, with general one. Co-authored-by: Olatunji Ruwase --- tests/unit/runtime/zero/test_zero.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/runtime/zero/test_zero.py b/tests/unit/runtime/zero/test_zero.py index 2594d910acff..5a8af95bb0f8 100644 --- a/tests/unit/runtime/zero/test_zero.py +++ b/tests/unit/runtime/zero/test_zero.py @@ -1604,7 +1604,7 @@ def test_empty_param_groups(self, dtype, use_client_optimizer, empty_weight_grou } if use_client_optimizer: - optimizer = deepspeed.ops.adam.FusedAdam(param_groups, lr=0.1) + optimizer = torch.optim.AdamW(param_groups, lr=0.1) model_parameters = model.parameters() else: config_dict["optimizer"] = {"type": "adamw"} From 177dc14331a64e61f6dcce2c4b8071576bcb22db Mon Sep 17 00:00:00 2001 From: Shukant Pal Date: Fri, 16 Feb 2024 08:09:44 -0800 Subject: [PATCH 07/18] =?UTF-8?q?Fix=20UserWarning:=20The=20torch.cuda.*Dt?= =?UTF-8?q?ypeTensor=20constructors=20are=20no=20long=E2=80=A6=20(#5018)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit > …er recommended. It's best to use methods such as torch.tensor(data, dtype=*, device='cuda') to create tensors. By currying the returned tensor properties in CUDA_Accelerator, the above warning in PyTorch is avoided. Co-authored-by: Olatunji Ruwase Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- accelerator/cuda_accelerator.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/accelerator/cuda_accelerator.py b/accelerator/cuda_accelerator.py index 2030f36631e9..3d5e9c168c16 100644 --- a/accelerator/cuda_accelerator.py +++ b/accelerator/cuda_accelerator.py @@ -3,6 +3,7 @@ # DeepSpeed Team +import functools import os import pkgutil import importlib @@ -260,31 +261,31 @@ def replay_graph(self, graph): @property def BFloat16Tensor(self): - return torch.cuda.BFloat16Tensor + return functools.partial(torch.tensor, dtype=torch.bfloat16, device='cuda') @property def ByteTensor(self): - return torch.cuda.ByteTensor + return functools.partial(torch.tensor, dtype=torch.uint8, device='cuda') @property def DoubleTensor(self): - return torch.cuda.DoubleTensor + return functools.partial(torch.tensor, dtype=torch.double, device='cuda') @property def FloatTensor(self): - return torch.cuda.FloatTensor + return functools.partial(torch.tensor, dtype=torch.float, device='cuda') @property def HalfTensor(self): - return torch.cuda.HalfTensor + return functools.partial(torch.tensor, dtype=torch.half, device='cuda') @property def IntTensor(self): - return torch.cuda.IntTensor + return functools.partial(torch.tensor, dtype=torch.int, device='cuda') @property def LongTensor(self): - return torch.cuda.LongTensor + return functools.partial(torch.tensor, dtype=torch.long, device='cuda') def pin_memory(self, tensor, align_bytes=1): return tensor.pin_memory() From a37e59b590a42d425ef962640762c99ced17b752 Mon Sep 17 00:00:00 2001 From: Michael Wyatt Date: Fri, 16 Feb 2024 11:55:31 -0800 Subject: [PATCH 08/18] Update deprecated HuggingFace function (#5144) Changing `list_files_info` to `list_repo_tree`: ``` FutureWarning: 'list_files_info' (from 'huggingface_hub.hf_api') is deprecated and will be removed from version '0.23'. Use `list_repo_tree` and `get_paths_info` instead. ``` --- deepspeed/inference/v2/checkpoint/huggingface_engine.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/deepspeed/inference/v2/checkpoint/huggingface_engine.py b/deepspeed/inference/v2/checkpoint/huggingface_engine.py index bec53a742a84..46a84c61f884 100644 --- a/deepspeed/inference/v2/checkpoint/huggingface_engine.py +++ b/deepspeed/inference/v2/checkpoint/huggingface_engine.py @@ -40,16 +40,13 @@ def _fetch_checkpoint_files(self): # currently coming from the ckpt engine init but maybe a catch all kwargs for other # snapshot download parameters would be more flexible. - # NOTE(jeff): allow_patterns here are explicitly not using safetensors or other - # checkpoint files that may be present. Example of all files in the llama-2-7b - # repo here: https://huggingface.co/meta-llama/Llama-2-7b-hf/tree/main - from huggingface_hub import snapshot_download, list_files_info + from huggingface_hub import snapshot_download, list_repo_tree def model_has_safetensors(model_name_or_path: str) -> bool: if os.path.isdir(model_name_or_path): file_list = os.listdir(model_name_or_path) else: - file_list = [rf.rfilename for rf in list_files_info(model_name_or_path)] + file_list = [rf.path for rf in list_repo_tree(model_name_or_path)] for f in file_list: if f.endswith(".safetensors"): return True From fa3662f7f5e7c5633dc85caf46010c9cefcdb18e Mon Sep 17 00:00:00 2001 From: Logan Adams <114770087+loadams@users.noreply.github.com> Date: Tue, 20 Feb 2024 09:37:04 -0800 Subject: [PATCH 09/18] Pin to PyTest 8.0.0 (#5163) Fix failures in nv-accelerate-v100 unit tests. Fix running on this PR: https://github.com/microsoft/DeepSpeed/actions/runs/7923894998/job/21634533848 Original failure [here](https://github.com/microsoft/DeepSpeed/actions/runs/7961285826/job/21748664124?pr=5129#step:7:415): ``` from _pytest.doctest import ( E ImportError: cannot import name 'import_path' from '_pytest.doctest' (/tmp/actions-runner/_work/DeepSpeed/DeepSpeed/unit-test-venv/lib/python3.8/site-packages/_pytest/doctest.py) ``` --- requirements/requirements-dev.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 7ae81d14aa18..f28c1ecb165c 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -6,7 +6,7 @@ future importlib-metadata>=4 mup pre-commit>=2.20.0 -pytest +pytest<=8.0.0 pytest-forked pytest-randomly pytest-xdist From f062a1b24d3c20178ce3d2abd69999340d8ec670 Mon Sep 17 00:00:00 2001 From: Nadav Elyahu <88962733+nelyahu@users.noreply.github.com> Date: Wed, 21 Feb 2024 00:17:52 +0200 Subject: [PATCH 10/18] get_grad_norm_direct: fix a case of empty norm group (#5148) fix for [#5145 ](https://github.com/microsoft/DeepSpeed/issues/5145) empty norm group create a norm tensor with shape=[1], while other norms will be shapeless. torch.stack does not support such case. Fixing empty group norm to be shapless as well, instead of shape=[1]. --------- Co-authored-by: Lev Kurilenko <113481193+lekurile@users.noreply.github.com> Co-authored-by: Lev Kurilenko --- deepspeed/runtime/zero/stage_1_and_2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deepspeed/runtime/zero/stage_1_and_2.py b/deepspeed/runtime/zero/stage_1_and_2.py index 18448adcabc3..e4009f6ac883 100755 --- a/deepspeed/runtime/zero/stage_1_and_2.py +++ b/deepspeed/runtime/zero/stage_1_and_2.py @@ -1686,7 +1686,7 @@ def get_grad_norm_direct(self, gradients, params, norm_type=2): if len(all_norms) > 0: total_norm = torch.stack(all_norms).square().sum().float() else: - total_norm = torch.FloatTensor([0.0]).to(self.device) + total_norm = torch.tensor(0.0, dtype=torch.float32).to(self.device) # Sum across all model parallel Device. dist.all_reduce(total_norm, op=dist.ReduceOp.SUM, group=self.dp_process_group) From e977c7d6630d160c5563248621fd85e48f8cf2ae Mon Sep 17 00:00:00 2001 From: Bruno Magalhaes Date: Tue, 20 Feb 2024 23:41:46 +0100 Subject: [PATCH 11/18] Distributed in-memory map-reduce for data analyzer (#5129) Adds class `DistributedDataAnlyzer` that implements a map-reduce on distributed memory. - instead of writing hundred/thousands of temp files as intermediate storage in the map/reduce as in `DataAnalyzer`, each node holds disjoints ordered subsets of pairs of `(metric, sample id)` as a distributed tensor. - Also removes the need to specify `metric_dtypes` as it's automatically inferred by the return of `metric_function(data)`. - Removes the need to have a distributed file system where all nodes need to be able to write to: here only rank 0 does the writing. - Much faster than the original map-reduce based on writing and loading several temp files into the disk, requires less memory, no temp files, and is simpler. ## How does it work - for each metric, the only results storage is `metric_result` as a list of `(sample_id, metric_value)` tuples. - `metric_result` is converted to a 2D tensor when whole dataset has been iterated. - `sample_idx_dtype` and `metric_value_dtype` are collected by an `all_reduce(op=MIN)` and `MAX` across the `metric_result` of all nodes. - each node holds a `metric_result` tensor of `N` samples as `N x (metric, sample)`, sorted by metric, with different `metric` values across nodes. E.g.: - node 1 holds `[ [1,20], [1, 30], [2,10], [2, 30]]`, node 2 holds `[ [3,10], [3, 20], [3,15], [3, 40]]`, and node 3 holds `[ [4,20], [4, 30], [5,25], [5, 50]]`. - to convert the list `[(metric,sample)]` to a `dict{ metric = [samples]}` each node iterates only its own dataset, as dictionary keys do not overlap across nodes. In this case, node 1 builds `{ 1: [20, 30], 2: [10,30]}`, node 2 builds `{ 3: [10, 20, 15, 40] }`, and node 3 holds `{ 4: [20, 30], 5: [25, 50]}`. - To write the merged files: (1) rank 0 opens the file, (2) iteratively receives buffers of values, dict keys and dict values from other ranks and writes them, and (3) closes the file. ## Future work Ideally, one could take this and do the curriculum setup on-the-fly when calling deepspeed `initialize`, i.e. without writing/loading map-reduce files and without forcing the user to call `.map()` and `.reduce()` beforehand. It takes less than 10 seconds so it's totally feasible. ## References - `file_write_ordered()` implements a sequential shared write similar to [`MPI_File_write_ordered`](https://www.open-mpi.org/doc/v3.0/man3/MPI_File_write_ordered.3.php). It is however adapted to communicate and write a list of tensors, instead of a single tensor. And it is also adapted to have only rank 0 writing to the file, instead of using a shared pointer. - `dist_sample_sort()` implements a distributed sample sort, as detailed [here](https://brunomaga.github.io/Distributed-Sort) and illustrated below. The ranges in step 3 guarantee disjoint subsets of keys (metric values) across nodes. ![sample_sort](https://github.com/microsoft/DeepSpeed/assets/150697676/53828103-370f-4f3b-9074-3e3bb8603000) --------- Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- .../data_sampling/data_analyzer.py | 427 ++++++++++++++++-- 1 file changed, 401 insertions(+), 26 deletions(-) diff --git a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py index 16203e7d7a84..5da11097319d 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py @@ -12,10 +12,10 @@ import torch from torch.utils.data import BatchSampler, SequentialSampler, DataLoader, Subset -from deepspeed.utils import logger +from deepspeed.utils import logger, groups import deepspeed.comm as dist -from .indexed_dataset import MMapIndexedDataset, valid_dtypes -from .utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype +from deepspeed.runtime.data_pipeline.data_sampling.indexed_dataset import MMapIndexedDataset, valid_dtypes +from deepspeed.runtime.data_pipeline.data_sampling.utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype class DataAnalyzer(object): @@ -98,9 +98,9 @@ def update_metric_results(self, metric_values = metric_function(data) assert torch.is_tensor(metric_values) or isinstance(metric_values, np.ndarray), \ - "metric_function must return a tensor or array" + "metric_function must return a tensor or array" assert metric_values.dtype == metric_dtype, \ - f"metric_function result dtype {metric_values.dtype} does not match metric_dtype {metric_dtype}" + f"metric_function result dtype {metric_values.dtype} does not match metric_dtype {metric_dtype}" if isinstance(metric_values, np.ndarray): metric_values = torch.from_numpy(metric_values) @@ -179,7 +179,7 @@ def run_map_helper(self, thread_id): else: self.custom_map_update(data, self.metric_types, self.metric_dtypes, self.metric_functions, metric_results, batch_start_idx) - processed_sample += self.batch_size + processed_sample += len(data) duration = (time.time() - start) / 3600.0 remain_duration = duration * total_sample / processed_sample - duration logger.info( @@ -385,26 +385,10 @@ def merge_map_results(self, dataset, metric_names, metric_types, save_path, num_ index_to_metric_builder.merge_file_(chunk_im_fname) close_mmap_dataset_builder(index_to_sample_builder, index_to_sample_fname) close_mmap_dataset_builder(index_to_metric_builder, index_to_metric_fname) - num_sample_per_value = {} - index_to_sample = MMapIndexedDataset(index_to_sample_fname, skip_warmup=True) - index_to_metric = MMapIndexedDataset(index_to_metric_fname, skip_warmup=True) - index_to_sample_merged_fname = f"{metric_save_path}/{metric_name}_index_to_sample_percentile_merged" - index_to_sample_merged_builder = create_mmap_dataset_builder(index_to_sample_merged_fname, - sample_idx_dtype) - for v_idx in range(len(index_to_sample)): - if v_idx > 0: - assert index_to_metric[v_idx] > index_to_metric[v_idx - 1] - num_sample_per_value[index_to_metric[v_idx][0]] = len(index_to_sample[v_idx]) - assert sum(num_sample_per_value.values()) == total_num_samples - merge_step = max(1, len(index_to_sample) // 100) - for v_idx in range(0, len(index_to_sample), merge_step): - merged_samples = np.copy( - np.concatenate(index_to_sample[v_idx:min(len(index_to_sample), (v_idx + merge_step))], - axis=None)) - index_to_sample_merged_builder.add_item( - torch.tensor(merged_samples.astype(np.int64), dtype=torch.long)) - logger.info(f"Finished merging index_to_sample {v_idx} to {v_idx+merge_step}.") - close_mmap_dataset_builder(index_to_sample_merged_builder, index_to_sample_merged_fname) + + num_sample_per_value = DataAnalyzer.output_index_to_sample_percentile( + index_to_sample_fname, index_to_metric_fname, metric_name, metric_save_path, total_num_samples, + sample_idx_dtype) self.get_metric_value_percentiles(metric_name, num_sample_per_value, total_num_samples) elif metric_type == 'accumulate_value_over_samples': metric_save_path = f"{save_path}/{metric_name}/" @@ -426,6 +410,29 @@ def merge_map_results(self, dataset, metric_names, metric_types, save_path, num_ metric_value_builder.add_item(torch.tensor(metric_value.astype(np.int64), dtype=torch.long)) close_mmap_dataset_builder(metric_value_builder, metric_value_fname) + @staticmethod + def output_index_to_sample_percentile(index_to_sample_fname, index_to_metric_fname, metric_name, metric_save_path, + total_num_samples, sample_idx_dtype): + """ read index_to_metric and index_to_sample files and write distribution to index_to_sample_percentage_merged """ + num_sample_per_value = {} + index_to_sample = MMapIndexedDataset(index_to_sample_fname, skip_warmup=True) + index_to_metric = MMapIndexedDataset(index_to_metric_fname, skip_warmup=True) + index_to_sample_merged_fname = f"{metric_save_path}/{metric_name}_index_to_sample_percentile_merged" + index_to_sample_merged_builder = create_mmap_dataset_builder(index_to_sample_merged_fname, sample_idx_dtype) + for v_idx in range(len(index_to_sample)): + if v_idx > 0: + assert index_to_metric[v_idx] > index_to_metric[v_idx - 1] + num_sample_per_value[index_to_metric[v_idx][0]] = len(index_to_sample[v_idx]) + assert sum(list(num_sample_per_value.values())) == total_num_samples + merge_step = max(1, len(index_to_sample) // 100) + for v_idx in range(0, len(index_to_sample), merge_step): + merged_samples = np.copy( + np.concatenate(index_to_sample[v_idx:min(len(index_to_sample), (v_idx + merge_step))], axis=None)) + index_to_sample_merged_builder.add_item(torch.tensor(merged_samples.astype(np.int64), dtype=torch.long)) + logger.info(f"Finished merging index_to_sample {v_idx} to {v_idx+merge_step}.") + close_mmap_dataset_builder(index_to_sample_merged_builder, index_to_sample_merged_fname) + return num_sample_per_value + def run_reduce(self): if self.custom_reduce is None: self.merge_map_results(self.dataset, self.metric_names, self.metric_types, self.save_path, @@ -442,3 +449,371 @@ def run_map_reduce(self, comm_group=None): self.run_reduce() # wait for the reduce, where rank 0 merges all (partial) files. Dataset can then be used by all nodes. dist.barrier(group=comm_group) + + +class DistributedDataAnalyzer(object): + + def __init__( + self, + dataset, + num_workers=1, + worker_id=0, + batch_size=1, + metric_names=[], + metric_functions=[], + metric_types=[], + save_path="./", + collate_fn=None, + device='cuda', + comm_group=None, + sample_indices=None, + ) -> None: + self.dataset = dataset + self.batch_size = batch_size + self.metric_names = metric_names + self.metric_functions = metric_functions + self.metric_types = metric_types + self.save_path = save_path + self.collate_fn = collate_fn + self.device = device + self.sample_indices = sample_indices + + if not dist.is_initialized(): + dist.init_distributed() + + # comm_group and worker_id+num_workers are mutually exclusive + if comm_group is not None: + self.comm_group = comm_group + self.num_workers = self.comm_group.size() + self.worker_id = self.comm_group.rank() + else: + self.comm_group = groups._clone_world_group() + self.num_workers = self.comm_group.size() + self.worker_id = self.comm_group.rank() + + if self.worker_id == 0: + logger.info(f"Data analyzer initialized with {self.num_workers} workers.") + + def run_map_reduce(self): + + # setup individual dataloaders + worker_splits, _ = split_dataset(self.dataset, self.num_workers, self.worker_id, num_threads=1) + start_idx, end_idx = worker_splits[self.worker_id] + logger.info(f"worker {self.worker_id}: start working on data subset {start_idx} to {end_idx}") + worker_dataset = Subset(self.dataset, list(range(start_idx, end_idx))) + sampler = BatchSampler(SequentialSampler(worker_dataset), batch_size=self.batch_size, drop_last=False) + dataloader = DataLoader(dataset=worker_dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=self.collate_fn, + pin_memory=False) + + # set initial results list + metric_results = [] + for metric_type in self.metric_types: + assert metric_type in ['single_value_per_sample', 'accumulate_value_over_samples'], \ + f"metric_type {metric_type} not implemented." + metric_results.append([] if metric_type == 'single_value_per_sample' else None) + + # iterate dataloader and store metric results + batch_start_idx = start_idx + for data in dataloader: + for m_idx in range(len(self.metric_names)): + metric_type, metric_function = self.metric_types[m_idx], self.metric_functions[m_idx] + metric_values = metric_function(data) + assert torch.is_tensor(metric_values) or isinstance(metric_values, np.ndarray), \ + "metric_function must return a tensor or array" + if isinstance(metric_values, np.ndarray): + metric_values = torch.from_numpy(metric_values) + assert metric_values.dtype in valid_dtypes, \ + f"metric_function result dtype {metric_values.dtype} not supported. Supported dtypes {valid_dtypes}" + + if metric_type == 'single_value_per_sample': + for row in range(metric_values.size()[0]): + value = metric_values[row].item() + sample_idx = batch_start_idx + row # sample idx following dataset iteration order + if isinstance(data, dict) and 'index' in data: # Megatron use case + sample_idx = data['index'][row][0].item() + elif self.sample_indices is not None: # user defined shuffling of indices + sample_idx = self.sample_indices[sample_idx] + metric_results[m_idx].append((value, sample_idx)) + elif metric_type == 'accumulate_value_over_samples': + if metric_results[m_idx] is None: + metric_results[m_idx] = metric_values + else: + metric_results[m_idx].add_(metric_values) + batch_start_idx += len(data) + + # compute dtype for sample ids + total_num_samples = len(self.dataset) + sample_idx_dtype = find_fit_int_dtype(0, total_num_samples - 1) + logger.info(f"Total number of data samples: {total_num_samples}.") + logger.info(f"Will use {sample_idx_dtype} to store the sample indexes.") + + # convert to list of tensors + metric_results = [torch.tensor(m).to(self.device) for m in metric_results] + + for m_idx in range(len(self.metric_names)): + metric_values, metric_name, metric_type = \ + metric_results[m_idx], self.metric_names[m_idx], self.metric_types[m_idx] + metric_save_path = f"{self.save_path}/{metric_name}/" + os.makedirs(metric_save_path, exist_ok=True) + + if metric_type == 'single_value_per_sample': + + # Compute sample and metric value dtypes based on range + values, samples = metric_values[:, 0], metric_values[:, 1] + value_min, value_max = Dist.min_max(values, self.comm_group) + sample_min, sample_max = Dist.min_max(samples, self.comm_group) + metric_value_dtype = find_fit_int_dtype(value_min, value_max) + sample_value_dtype = find_fit_int_dtype(sample_min, sample_max) + + # sample_to_metric maps sample ids to metric values, as a list of metric values + sample_to_metric_fname = f"{metric_save_path}/{metric_name}_sample_to_metric" + values = [torch.tensor([x]) for x in metric_values[:, 0]] + self.file_write_ordered(values, sample_to_metric_fname, metric_value_dtype) + + # distributed sorting by values, gives an ordered disjoint subset of keys on nodes + metric_values = Dist.sample_sort(metric_values, self.comm_group, self.num_workers) + metric_to_samples_dict = {} + if len(metric_values) > 0: + for value, sample in metric_values: + if value.item() not in metric_to_samples_dict: + metric_to_samples_dict[value.item()] = [] + metric_to_samples_dict[value.item()].append(sample.item()) + + # index_to_metric and index_to_sample serialize a dicitonary from metric to samples + # index_to_metric stores a key per row, index_to_sample stores the values per row + values = [torch.tensor([x]) for x in metric_to_samples_dict.keys()] + samples = [torch.tensor(metric_to_samples_dict[x]) for x in metric_to_samples_dict.keys()] + index_to_metric_fname = f"{metric_save_path}/{metric_name}_index_to_metric" #dict keys + index_to_sample_fname = f"{metric_save_path}/{metric_name}_index_to_sample" #dict values + self.file_write_ordered(values, index_to_metric_fname, metric_value_dtype) + self.file_write_ordered(samples, index_to_sample_fname, sample_value_dtype) + + if self.worker_id == 0: + DataAnalyzer.output_index_to_sample_percentile(index_to_sample_fname, index_to_metric_fname, + metric_name, metric_save_path, total_num_samples, + sample_idx_dtype) + dist.barrier(self.comm_group) + + elif metric_type == 'accumulate_value_over_samples': + metric_value_fname = f"{metric_save_path}/{metric_name}_metric_value" + dist.reduce(metric_values, dst=0, op=dist.ReduceOp.SUM, group=self.comm_group) + metric_value_dtype = find_fit_int_dtype(metric_values.min(), metric_values.max()) + + if self.worker_id == 0: + builder = create_mmap_dataset_builder(metric_value_fname, metric_value_dtype) + builder.add_item(metric_values.cpu()) + close_mmap_dataset_builder(builder, metric_value_fname) + dist.barrier(self.comm_group) + + def file_write_ordered(self, tensor_list, fname, numpy_dtype): + """ MPI_file_write_ordered extended to write a list of tensors, by one rank, iteratively """ + + # each not has a list of rows (tensors) to be written to the file. + # we will serialize it to communicate it in one comm step. + + tkwargs = dict(dtype=torch.int64, device=self.device) + + # 1. gather on rank 0 the number of rows to be sent/recv + row_count = torch.tensor([len(tensor_list)], **tkwargs) + row_counts = torch.zeros(self.num_workers, **tkwargs) + dist.all_gather_into_tensor(row_counts, row_count, group=self.comm_group) + assert row_counts[self.worker_id] == row_count == len(tensor_list), "all_gather failed" + + # 2. gather on rank 0 the sizes of the rows to be sent/recv + row_len = torch.tensor([len(l) for l in tensor_list], **tkwargs) + row_lens = Dist.gather_v(row_len, 0, self.comm_group, self.num_workers, self.worker_id) + + # 4. gather on rank 0 of the total size (sum of all row lengths) to be received + size = torch.tensor([sum(row_len).item()], **tkwargs) + sizes = torch.zeros(self.num_workers, **tkwargs) + dist.all_gather_into_tensor(sizes, size, group=self.comm_group) + assert sizes[self.worker_id] == size.item(), "all_gather did not return the same sizes" #sanity check + + # method to deserializes a buffer into rows of different lengths and write them to file + def write_buffer_to_file(buff, src, builder): + assert self.worker_id == 0, "only rank 0 can write to file" + for row_len in row_lens[src]: + builder.add_item(buff[:row_len].cpu()) + buff = buff[row_len:] + + # 5. rank 0 prepares output folder and file + if self.worker_id == 0: + os.makedirs(os.path.dirname(fname), exist_ok=True) + builder = create_mmap_dataset_builder(fname, numpy_dtype) + + # iterate through ranks that have data to be sent/recv/written + for src in [rank for rank, count in enumerate(row_counts) if count > 0]: + + dist.barrier(group=self.comm_group) + if self.worker_id == 0 and src == 0: # rank 0's write its own data + buffer = torch.cat(tensor_list, dim=0).to(self.device) + write_buffer_to_file(buffer, 0, builder) + elif self.worker_id == 0 and src > 0: # rank 0 receives other rank's data and writes it + buffer = torch.empty(sizes[src].item(), dtype=buffer.dtype, device=buffer.device) + err = dist.recv(buffer, src=src, group=self.comm_group, tag=src) + assert err == src and len(buffer) > 0, "recv failed" + write_buffer_to_file(buffer, src, builder) + elif self.worker_id == src: # current rank sends data to rank 0 + buffer = torch.cat(tensor_list, dim=0).to(self.device) + dist.send(buffer, 0, group=self.comm_group, tag=src) + + # rank 0 closes the file + if self.worker_id == 0: + close_mmap_dataset_builder(builder, fname) # close file + dist.barrier(self.comm_group) + + +class Dist: + """ auxiliary class to perform distributed operations on tensors""" + + @staticmethod + def min_max(tensor, comm_group): + """ given a distributed tensor, return the min/max values across all ranks""" + + value_min, value_max = tensor.min(), tensor.max() + dist.reduce(value_min, 0, op=dist.ReduceOp.MIN, group=comm_group) + dist.reduce(value_max, 0, op=dist.ReduceOp.MAX, group=comm_group) + return value_min.item(), value_max.item() + + @staticmethod + def gather_v(tensor, dst, comm_group, num_workers, worker_id): + """ MPI_Gatherv. gather tensors of variable sizes in a single rank """ + + # gather the number of rows to be sent/recv + size = torch.tensor([len(tensor)], dtype=torch.int64, device=tensor.device) + sizes = torch.zeros(num_workers, dtype=torch.int64, device=tensor.device) + dist.all_gather_into_tensor(sizes, size, group=comm_group) + assert sizes[worker_id] == size, "all_gather failed" + + # all_gather requires all tensors to be of same size so we need to pad them + max_size = max(sizes).item() + buffer = torch.empty(max_size, dtype=tensor.dtype, device=tensor.device) + buffer[0:size] = torch.tensor(tensor, dtype=tensor.dtype, device=tensor.device) + buffer_list = None + if worker_id == 0: # create padded recv buffers + buffer_list = [torch.empty(max_size, dtype=tensor.dtype, device=tensor.device) for _ in range(num_workers)] + dist.gather(buffer, buffer_list, dst=dst, group=comm_group) + + # revert padding and return value + if worker_id == 0: + buffer_list = [r[:s.item()] for r, s in zip(buffer_list, sizes)] + return buffer_list + + @staticmethod + def sample_sort(tensor, comm_group, num_workers, n_samples=100): + """ perform a distributed random sort of a tensor, and returns the sorted partial tensor""" + device, dims = tensor.device, tensor.size()[1] + + # 1 - sort rows by first column, then second column, then third, etc... + tensor = torch.tensor(sorted(tensor.tolist()), dtype=tensor.dtype, device=tensor.device) + + # 2 - collect few samples per rank + idx = torch.round(torch.linspace(0, len(tensor) - 1, n_samples)).to(int) + samples = tensor[idx][:, 0].contiguous().to(device) #only first column, all but last row + + # 2 - Allgather samples + all_samples = [torch.zeros(n_samples, dtype=samples.dtype, device=device) for _ in range(num_workers)] + dist.all_gather(all_samples, samples, group=comm_group) + all_samples = torch.cat(all_samples, dim=0).to(device) + + # 3 - Sort all samples and collect the ranges of each rank as equidistant + all_samples = all_samples.sort()[0] + idx = torch.round(torch.linspace(0, len(all_samples) - 1, num_workers + 1)).to(int) + ranges = all_samples[idx] # range of each rank r as ranges[r] <= x < ranges[r+1] + ranges[-1] += 1 # increase upper limit of last rank so that x < ranges[r+1]. + + # 4 - collect elements to send to each rank, based on the rank ranges + send = [] + for rank in range(num_workers): + mask = (tensor[:, 0] >= ranges[rank]) & (tensor[:, 0] < ranges[rank + 1]) + send.append(tensor[mask]) + + # 5. all to all to communicate the sizes to be sent/recv + send_count = [torch.tensor([len(s) * dims], dtype=torch.int64, device=device) for s in send] + recv_count = list(torch.empty([num_workers], dtype=torch.int64, device=device).chunk(num_workers)) + dist.all_to_all(recv_count, send_count, group=comm_group) + + # 6. all-to-all-v to communicate the elements to be sent/recv as a single tensor + send = torch.cat(send, dim=0).flatten().to(device) + recv = torch.zeros(sum(recv_count), dtype=send.dtype).to(device) + send_count = [s.item() for s in send_count] # convert to list of ints + recv_count = [r.item() for r in recv_count] + dist.all_to_all_single(recv, send, recv_count, send_count, group=comm_group) + del send + + # 7. the received tensor is the 1D disjoint subset of the distributed tensor. + # We will recover the original dimensionality and sort it by columns again. + recv = recv.view(-1, dims) + recv = torch.tensor(sorted(recv.tolist()), dtype=recv.dtype, device=recv.device) + return recv + + +def test_compare_both_data_analyzers(dataset): + """ given a dataset, compare file and memory based data analyser""" + + id = lambda t: torch.tensor(t).to(torch.int64) # identity + batch_sum = lambda t: id(t).sum() #sum batch + kwargs = dict( + dataset=dataset, + batch_size=3, + worker_id=int(os.environ['RANK']), + num_workers=int(os.environ['WORLD_SIZE']), + metric_names=["mod", "batch_sum"], + metric_functions=[id, batch_sum], + metric_types=['single_value_per_sample', 'accumulate_value_over_samples'], + ) + + dda = DistributedDataAnalyzer( + save_path="./output_dist", + device=f"cuda:{int(os.environ['LOCAL_RANK'])}", + **kwargs, + ) + start_time = time.time() + dda.run_map_reduce() + if dda.worker_id == 0: + print("DistributedDataAnalyzer runtime: %s seconds " % (time.time() - start_time)) + + da = DataAnalyzer(num_threads=2, + num_threads_reduce=2, + metric_dtypes=[torch.int64, torch.int64], + save_path="./output_disk", + **kwargs) + start_time = time.time() + da.run_map_reduce() + if da.worker_id == 0: + print("DataAnalyzer runtime: %s seconds " % (time.time() - start_time)) + + output_paths = [ + "batch_sum/batch_sum_metric_value.bin", "batch_sum/batch_sum_metric_value.idx", \ + "mod/mod_index_to_metric.bin", "mod/mod_index_to_metric.idx", \ + "mod/mod_index_to_sample.bin", "mod/mod_index_to_sample.idx", \ + "mod/mod_index_to_sample_percentile_merged.bin", "mod/mod_index_to_sample_percentile_merged.idx", \ + "mod/mod_sample_to_metric.bin", "mod/mod_sample_to_metric.idx" + ] + + if dda.worker_id == 0: + for path in output_paths: + with open(os.path.join(da.save_path, path), 'rb') as f1, \ + open(os.path.join(dda.save_path, path), 'rb') as f2: + if f1.read() != f2.read(): + print(f"files {path} are not identical.") + + +if __name__ == "__main__": + + class TestDataset(torch.utils.data.Dataset): + + def __init__(self, size=20): + self.values = [1001 + x % 6 for x in range(size)] + self.size = size + + def __len__(self): + return self.size + + def __getitem__(self, idx): + return self.values[idx] + + test_compare_both_data_analyzers(TestDataset()) From 7f0950f819c0482df793dcf7d8f3ffd7b7969241 Mon Sep 17 00:00:00 2001 From: Nadav Elyahu <88962733+nelyahu@users.noreply.github.com> Date: Wed, 21 Feb 2024 04:16:47 +0200 Subject: [PATCH 12/18] DeepSpeedZeroOptimizer_Stage3: remove cuda specific optimizer (#5138) during cpu offload there was a usage in cuda fused adam, as a backup optimizer. This is a specific accelerator code, also in non-critical path. --------- Co-authored-by: Michael Wyatt --- deepspeed/runtime/zero/stage3.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/deepspeed/runtime/zero/stage3.py b/deepspeed/runtime/zero/stage3.py index 42008236a9ea..4132a4527d6a 100644 --- a/deepspeed/runtime/zero/stage3.py +++ b/deepspeed/runtime/zero/stage3.py @@ -21,7 +21,7 @@ from deepspeed.runtime.zero.offload_config import OffloadDeviceEnum from deepspeed.runtime.zero.parameter_offload import DeepSpeedZeRoOffload from deepspeed.runtime.zero.utils import apply_to_tensors_only -from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam +from deepspeed.ops.adam import DeepSpeedCPUAdam from deepspeed.runtime.swap_tensor.partitioned_param_swapper import PartitionedParamStatus from deepspeed.runtime.swap_tensor.optimizer_utils import OptimizerSwapper from deepspeed.runtime.swap_tensor.partitioned_optimizer_swapper import PartitionedOptimizerSwapper @@ -201,13 +201,12 @@ def __init__( backup_gpu_tensor = torch.randn(1, device=get_accelerator().device_name()).to(self.dtype) backup_gpu_param = torch.nn.Parameter(backup_gpu_tensor) assert type(init_optimizer) == DeepSpeedCPUAdam, 'Hybrid Optimizer Only Supports DeepSpeedCPUAdam' - self.backup_optimizer = FusedAdam([backup_gpu_param], - lr=self.optimizer.param_groups[0]["lr"], - bias_correction=self.optimizer.param_groups[0]["bias_correction"], - betas=self.optimizer.param_groups[0]["betas"], - eps=self.optimizer.param_groups[0]["eps"], - weight_decay=self.optimizer.param_groups[0]["weight_decay"], - amsgrad=self.optimizer.param_groups[0]["amsgrad"]) + self.backup_optimizer = torch.optim.AdamW([backup_gpu_param], + lr=self.optimizer.param_groups[0]["lr"], + betas=self.optimizer.param_groups[0]["betas"], + eps=self.optimizer.param_groups[0]["eps"], + weight_decay=self.optimizer.param_groups[0]["weight_decay"], + amsgrad=self.optimizer.param_groups[0]["amsgrad"]) # Multiple param_groups configs for back-up optimizer if len(self.optimizer.param_groups) > 1: for i in range(1, len(self.optimizer.param_groups)): From a84d07c51ea9d984c9aa324240983b6ab5ffadb3 Mon Sep 17 00:00:00 2001 From: Moshe Island Date: Wed, 21 Feb 2024 04:17:09 +0200 Subject: [PATCH 13/18] MOE: Fix save checkpoint when TP > 1 (#5157) When using MOE, currently, only mp_rank_00_model_states.pt is saved. This fails when using TP > 1. Fix it by saving all required mp_rank_xx_model_states.pt files. Signed-off-by: Moshe Island Co-authored-by: Moshe Island --- deepspeed/runtime/engine.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index eb17c6955b8f..e15cc49339ff 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -3224,22 +3224,21 @@ def _save_moe_checkpoint(self, save_dir, tag, client_state={}, exclude_frozen_pa # In the case of E + D parallelism, only the # first expert parallel group should save the expert weights # since each expert parallel group is a copy of the model's experts - if exp_dp_rank != 0: - return - - # Save optimizer states. They are different across each exp parallel rank. - optimizer_state = { - 'optimizer': self.optimizer.state_dict() if self.optimizer and not self.zero_optimization() else None - } - # TODO: why use BufferedWriter not the path - file_path = self._get_optimizer_ckpt_name(save_dir, tag, expp_rank) - self.checkpoint_engine.save(optimizer_state, file_path) - - # get non-moe parameters - model_state_dict = self._get_non_moe_state_dict( - self.module_state_dict(exclude_frozen_parameters=exclude_frozen_parameters)) - - if expp_rank == 0: + if exp_dp_rank == 0: + # Save optimizer states. They are different across each exp parallel rank. + optimizer_state = { + 'optimizer': self.optimizer.state_dict() if self.optimizer and not self.zero_optimization() else None + } + # TODO: why use BufferedWriter not the path + file_path = self._get_optimizer_ckpt_name(save_dir, tag, expp_rank) + self.checkpoint_engine.save(optimizer_state, file_path) + + # Load flow uses below saved file for model parameters, RNG and more + if groups._get_data_parallel_rank() == 0: + # get non-moe parameters + model_state_dict = self._get_non_moe_state_dict( + self.module_state_dict(exclude_frozen_parameters=exclude_frozen_parameters)) + # TODO: update num experts info,.. in checkpoint state = { 'module': From 005afe124f56b2243785067a898134fa2bf8735c Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka <81312776+tohtana@users.noreply.github.com> Date: Tue, 20 Feb 2024 18:19:31 -0800 Subject: [PATCH 14/18] Fix gradient clipping (#5150) The gradient clipping API doesn't apply the coefficient correctly. This PR resolves the issue and adds a test case. Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- deepspeed/runtime/utils.py | 2 +- tests/unit/runtime/test_runtime_utils.py | 25 ++++++++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/deepspeed/runtime/utils.py b/deepspeed/runtime/utils.py index d7a35b7dbbe9..d1ebe4b2f83d 100755 --- a/deepspeed/runtime/utils.py +++ b/deepspeed/runtime/utils.py @@ -407,7 +407,7 @@ def clip_grad_norm_(parameters, max_norm, norm_type=2, mpu=None): max_norm = torch.tensor([float(max_norm)], device=parameters[0].device) clip_coef = max_norm / (total_norm + 1e-6) tmp_tensor = torch.tensor([1.0], device=parameters[0].device) - clip_coef = torch.max(tmp_tensor, clip_coef) + clip_coef = torch.min(tmp_tensor, clip_coef) for p in parameters: p.grad.data.mul_(clip_coef) return total_norm diff --git a/tests/unit/runtime/test_runtime_utils.py b/tests/unit/runtime/test_runtime_utils.py index 5d8478b249be..6fdeb2074246 100644 --- a/tests/unit/runtime/test_runtime_utils.py +++ b/tests/unit/runtime/test_runtime_utils.py @@ -26,10 +26,10 @@ def test_call_to_str(): assert c2s('hello', 1138, val=3) == 'hello(1138, val=3)' -class TestClibGradNorm(DistributedTest): +class TestClipGradNorm(DistributedTest): world_size = 2 - def test(self): + def test_gather(self): param1 = torch.nn.Parameter(torch.Tensor([0])) param1.grad = torch.Tensor([1]) param2 = torch.nn.Parameter(torch.Tensor([0])) @@ -50,6 +50,27 @@ def test(self): assert gathered_norm[0] == gathered_norm[1], "norm at rank 0 does not match the norm at rank 1" + def test_clipped_val(self): + max_norm = 0.1 + + def test_params(): + param1 = torch.nn.Parameter(torch.Tensor([0])) + param1.grad = torch.Tensor([1]) + param2 = torch.nn.Parameter(torch.Tensor([0])) + param2.grad = torch.Tensor([1]) + return [param1, param2] + + # This assumes gradients are same on all the ranks and doesn't consider multiple ranks + params_expected = test_params() + torch.nn.utils.clip_grad_norm_(params_expected, max_norm) + + params_actual = test_params() + ds_utils.clip_grad_norm_(params_actual, max_norm=max_norm) + + # This can be allclose + assert torch.equal(params_expected[0].grad, params_actual[0].grad) + assert torch.equal(params_expected[1].grad, params_actual[1].grad) + @pytest.mark.parametrize("check_using_norm", [(False), (True)]) class TestCheckOverflow(DistributedTest): From b00533e47975597e88bb5ea14b3a315a04728add Mon Sep 17 00:00:00 2001 From: Jinzhen Lin Date: Wed, 21 Feb 2024 10:20:11 +0800 Subject: [PATCH 15/18] Use ninja to speed up build (#5088) Deepspeed have too many ops now, and it take too many time to pre-build all ops. I notice deepspeed disabled `ninja` 4 years ago (https://github.com/microsoft/DeepSpeed/pull/298) and I think we should consider enable it now. The issue mentioned in https://github.com/microsoft/DeepSpeed/pull/298 can be solved by resolving `include_dirs` to absolute path. --------- Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> Co-authored-by: Logan Adams Co-authored-by: Olatunji Ruwase Co-authored-by: Michael Wyatt --- ...ias_activation.cu => bias_activation_cuda.cu} | 0 .../{layer_norm.cu => layer_norm_cuda.cu} | 0 .../{rms_norm.cu => rms_norm_cuda.cu} | 0 ...rnels.cu => gated_activation_kernels_cuda.cu} | 0 .../ragged_ops/embed/{embed.cu => embed_cuda.cu} | 0 ...ed_kv_rotary.cu => blocked_kv_rotary_cuda.cu} | 0 .../{logits_gather.cu => logits_gather_cuda.cu} | 0 .../{moe_gather.cu => moe_gather_cuda.cu} | 0 .../{moe_scatter.cu => moe_scatter_cuda.cu} | 0 .../{top_k_gating.cu => top_k_gating_cuda.cu} | 0 op_builder/builder.py | 16 ++++++++++++---- op_builder/cpu/builder.py | 6 ++++-- op_builder/hpu/builder.py | 6 ++++-- op_builder/inference_core_ops.py | 8 ++++---- op_builder/ragged_ops.py | 12 ++++++------ op_builder/xpu/builder.py | 4 ++-- setup.py | 3 ++- 17 files changed, 34 insertions(+), 21 deletions(-) rename deepspeed/inference/v2/kernels/core_ops/bias_activations/{bias_activation.cu => bias_activation_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/{layer_norm.cu => layer_norm_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/{rms_norm.cu => rms_norm_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/core_ops/gated_activations/{gated_activation_kernels.cu => gated_activation_kernels_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/ragged_ops/embed/{embed.cu => embed_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/{blocked_kv_rotary.cu => blocked_kv_rotary_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/ragged_ops/logits_gather/{logits_gather.cu => logits_gather_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/ragged_ops/moe_gather/{moe_gather.cu => moe_gather_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/{moe_scatter.cu => moe_scatter_cuda.cu} (100%) rename deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/{top_k_gating.cu => top_k_gating_cuda.cu} (100%) diff --git a/deepspeed/inference/v2/kernels/core_ops/bias_activations/bias_activation.cu b/deepspeed/inference/v2/kernels/core_ops/bias_activations/bias_activation_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/core_ops/bias_activations/bias_activation.cu rename to deepspeed/inference/v2/kernels/core_ops/bias_activations/bias_activation_cuda.cu diff --git a/deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm.cu b/deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm.cu rename to deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm_cuda.cu diff --git a/deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm.cu b/deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm.cu rename to deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm_cuda.cu diff --git a/deepspeed/inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels.cu b/deepspeed/inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels.cu rename to deepspeed/inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/embed/embed.cu b/deepspeed/inference/v2/kernels/ragged_ops/embed/embed_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/embed/embed.cu rename to deepspeed/inference/v2/kernels/ragged_ops/embed/embed_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary.cu b/deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary.cu rename to deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/logits_gather/logits_gather.cu b/deepspeed/inference/v2/kernels/ragged_ops/logits_gather/logits_gather_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/logits_gather/logits_gather.cu rename to deepspeed/inference/v2/kernels/ragged_ops/logits_gather/logits_gather_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/moe_gather/moe_gather.cu b/deepspeed/inference/v2/kernels/ragged_ops/moe_gather/moe_gather_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/moe_gather/moe_gather.cu rename to deepspeed/inference/v2/kernels/ragged_ops/moe_gather/moe_gather_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter.cu b/deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter.cu rename to deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating.cu b/deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating.cu rename to deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating_cuda.cu diff --git a/op_builder/builder.py b/op_builder/builder.py index fec39f2b4feb..dd77f967cc60 100644 --- a/op_builder/builder.py +++ b/op_builder/builder.py @@ -453,9 +453,10 @@ def deepspeed_src_path(self, code_path): def builder(self): from torch.utils.cpp_extension import CppExtension + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] return CppExtension(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, extra_compile_args={'cxx': self.strip_empty_entries(self.cxx_args())}, extra_link_args=self.strip_empty_entries(self.extra_ldflags())) @@ -638,7 +639,7 @@ def builder(self): from torch.utils.cpp_extension import CppExtension as ExtensionBuilder else: from torch.utils.cpp_extension import CUDAExtension as ExtensionBuilder - + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] compile_args = {'cxx': self.strip_empty_entries(self.cxx_args())} if self.build_for_cpu else \ {'cxx': self.strip_empty_entries(self.cxx_args()), \ 'nvcc': self.strip_empty_entries(self.nvcc_args())} @@ -651,7 +652,7 @@ def builder(self): cuda_ext = ExtensionBuilder(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, libraries=self.strip_empty_entries(self.libraries_args()), extra_compile_args=compile_args, extra_link_args=self.strip_empty_entries(self.extra_ldflags())) @@ -702,11 +703,18 @@ def nvcc_args(self): '-DROCM_VERSION_MINOR=%s' % ROCM_MINOR ] else: + try: + nvcc_threads = int(os.getenv("DS_NVCC_THREADS", "")) + if nvcc_threads <= 0: + raise ValueError("") + except ValueError: + nvcc_threads = min(os.cpu_count(), 8) + cuda_major, _ = installed_cuda_version() args += [ '-allow-unsupported-compiler' if sys.platform == "win32" else '', '--use_fast_math', '-std=c++17' if cuda_major > 10 else '-std=c++14', '-U__CUDA_NO_HALF_OPERATORS__', - '-U__CUDA_NO_HALF_CONVERSIONS__', '-U__CUDA_NO_HALF2_OPERATORS__' + '-U__CUDA_NO_HALF_CONVERSIONS__', '-U__CUDA_NO_HALF2_OPERATORS__', f'--threads={nvcc_threads}' ] if os.environ.get('DS_DEBUG_CUDA_BUILD', '0') == '1': args.append('--ptxas-options=-v') diff --git a/op_builder/cpu/builder.py b/op_builder/cpu/builder.py index f6a71c7d1971..d2bc8eacfa25 100644 --- a/op_builder/cpu/builder.py +++ b/op_builder/cpu/builder.py @@ -3,6 +3,8 @@ # DeepSpeed Team +import os + try: # is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed # if successful this also means we're doing a local install and not JIT compile path @@ -16,12 +18,12 @@ class CPUOpBuilder(OpBuilder): def builder(self): from torch.utils.cpp_extension import CppExtension as ExtensionBuilder - + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] compile_args = {'cxx': self.strip_empty_entries(self.cxx_args())} cpp_ext = ExtensionBuilder(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, libraries=self.strip_empty_entries(self.libraries_args()), extra_compile_args=compile_args) diff --git a/op_builder/hpu/builder.py b/op_builder/hpu/builder.py index 5a538c84040c..3c86128fffd6 100644 --- a/op_builder/hpu/builder.py +++ b/op_builder/hpu/builder.py @@ -4,6 +4,8 @@ # DeepSpeed Team +import os + try: # is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed # if successful this also means we're doing a local install and not JIT compile path @@ -17,12 +19,12 @@ class CPUOpBuilder(OpBuilder): def builder(self): from torch.utils.cpp_extension import CppExtension as ExtensionBuilder - + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] compile_args = {'cxx': self.strip_empty_entries(self.cxx_args())} cpp_ext = ExtensionBuilder(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, libraries=self.strip_empty_entries(self.libraries_args()), extra_compile_args=compile_args) diff --git a/op_builder/inference_core_ops.py b/op_builder/inference_core_ops.py index 229b500bebda..8073b63ad16b 100755 --- a/op_builder/inference_core_ops.py +++ b/op_builder/inference_core_ops.py @@ -60,13 +60,13 @@ def sources(self): sources = [ "inference/v2/kernels/core_ops/core_ops.cpp", "inference/v2/kernels/core_ops/bias_activations/bias_activation.cpp", - "inference/v2/kernels/core_ops/bias_activations/bias_activation.cu", + "inference/v2/kernels/core_ops/bias_activations/bias_activation_cuda.cu", "inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm.cpp", - "inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm.cu", + "inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm_cuda.cu", "inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm.cpp", - "inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm.cu", + "inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm_cuda.cu", "inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels.cpp", - "inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels.cu", + "inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels_cuda.cu", ] prefix = self.get_prefix() diff --git a/op_builder/ragged_ops.py b/op_builder/ragged_ops.py index 8cb372e96c37..ec7cab91885f 100644 --- a/op_builder/ragged_ops.py +++ b/op_builder/ragged_ops.py @@ -63,18 +63,18 @@ def sources(self): "inference/v2/kernels/ragged_ops/atom_builder/atom_builder.cpp", "inference/v2/kernels/ragged_ops/blocked_flash/blocked_flash.cpp", "inference/v2/kernels/ragged_ops/embed/embed.cpp", - "inference/v2/kernels/ragged_ops/embed/embed.cu", + "inference/v2/kernels/ragged_ops/embed/embed_cuda.cu", "inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary.cpp", - "inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary.cu", + "inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary_cuda.cu", "inference/v2/kernels/ragged_ops/logits_gather/logits_gather.cpp", - "inference/v2/kernels/ragged_ops/logits_gather/logits_gather.cu", + "inference/v2/kernels/ragged_ops/logits_gather/logits_gather_cuda.cu", "inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter.cpp", - "inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter.cu", + "inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter_cuda.cu", "inference/v2/kernels/ragged_ops/moe_gather/moe_gather.cpp", - "inference/v2/kernels/ragged_ops/moe_gather/moe_gather.cu", + "inference/v2/kernels/ragged_ops/moe_gather/moe_gather_cuda.cu", "inference/v2/kernels/ragged_ops/ragged_helpers/ragged_kernel_helpers.cpp", "inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating.cpp", - "inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating.cu", + "inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating_cuda.cu", ] prefix = self.get_prefix() diff --git a/op_builder/xpu/builder.py b/op_builder/xpu/builder.py index 2f0feba1d4e1..459dcce6bfae 100644 --- a/op_builder/xpu/builder.py +++ b/op_builder/xpu/builder.py @@ -23,11 +23,11 @@ def builder(self): from intel_extension_for_pytorch.xpu.cpp_extension import DPCPPExtension except ImportError: from intel_extension_for_pytorch.xpu.utils import DPCPPExtension - + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] print("dpcpp sources = {}".format(self.sources())) dpcpp_ext = DPCPPExtension(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, extra_compile_args={ 'cxx': self.strip_empty_entries(self.cxx_args()), }, diff --git a/setup.py b/setup.py index d9aed9b47bd8..418c1f11a0e0 100755 --- a/setup.py +++ b/setup.py @@ -119,7 +119,8 @@ def get_env_if_set(key, default: typing.Any = ""): # For any pre-installed ops force disable ninja. if torch_available: from accelerator import get_accelerator - cmdclass['build_ext'] = get_accelerator().build_extension().with_options(use_ninja=False) + use_ninja = not is_env_set("DS_DISABLE_NINJA") + cmdclass['build_ext'] = get_accelerator().build_extension().with_options(use_ninja=use_ninja) if torch_available: TORCH_MAJOR = torch.__version__.split('.')[0] From 98c96e790b4e8c42afb7c759bfda1992fc6be23b Mon Sep 17 00:00:00 2001 From: KimmiShi Date: Thu, 22 Feb 2024 00:53:43 +0800 Subject: [PATCH 16/18] Update flops profiler to handle attn and __matmul__ (#4724) Fixes #4723 - handle `F.scaled_dot_product_attention` in transformer models. - handle expreesions like `a@b` --------- Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- deepspeed/profiling/flops_profiler/profiler.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/deepspeed/profiling/flops_profiler/profiler.py b/deepspeed/profiling/flops_profiler/profiler.py index 79e682a73b90..de847e59e82e 100644 --- a/deepspeed/profiling/flops_profiler/profiler.py +++ b/deepspeed/profiling/flops_profiler/profiler.py @@ -827,6 +827,15 @@ def _elementwise_flops_compute(input, other): return flops, 0 +def _attn_flops_compute(q, k, v, *args, **kwargs): + """ + Count flops for the scaled_dot_product_attention operation. + """ + macs = _prod(q.shape) * k.shape[-2] + macs += _prod(q.shape[:-1]) * k.shape[-2] * v.shape[-1] + return 2 * macs, macs + + def wrapFunc(func, funcFlopCompute): oldFunc = func name = func.__str__ @@ -899,10 +908,14 @@ def _patch_functionals(): # embedding F.embedding = wrapFunc(F.embedding, _embedding_flops_compute) + # attn + F.scaled_dot_product_attention = wrapFunc(F.scaled_dot_product_attention, _attn_flops_compute) + def _patch_tensor_methods(): torch.matmul = wrapFunc(torch.matmul, _matmul_flops_compute) torch.Tensor.matmul = wrapFunc(torch.Tensor.matmul, _matmul_flops_compute) + torch.Tensor.__matmul__ = wrapFunc(torch.Tensor.__matmul__, _matmul_flops_compute) torch.mm = wrapFunc(torch.mm, _matmul_flops_compute) torch.Tensor.mm = wrapFunc(torch.Tensor.mm, _matmul_flops_compute) torch.bmm = wrapFunc(torch.bmm, _matmul_flops_compute) From dd3690c5348ddfa475cef112eb01aebe6a409847 Mon Sep 17 00:00:00 2001 From: Masahiro Tanaka <81312776+tohtana@users.noreply.github.com> Date: Wed, 21 Feb 2024 11:07:33 -0800 Subject: [PATCH 17/18] Fix allreduce for BF16 and ZeRO0 (#5170) This PR fixes an issue with allreducing for ZeRO0 + BF16. (This replaces #5154) DeepSpeed uses `BF16_Optimizer` when ZeRO0 and BF16 are enabled. The optimizer accumulates gradients on FP32 buffer soon after a backward pass completes. However, DeepSpeed engine performs allreduce on BF16 gradients. This PR fixes the issue by performing allreduce on the FP32 buffer. It also eliminates an assertion that prohibits BF16+PP+Z1, which is actually runnable. This shows loss curves of the following conditions: - BF16/Z0,Z1,Z2,Z3/NoPP - BF16/Z0,Z1/PP(2 stages) (all used 8GPUs, gradient accumulation step: 4) ![image](https://github.com/microsoft/DeepSpeed/assets/81312776/0dc1e9ef-43bc-4b47-8b9e-d6aca137a217) --------- Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- deepspeed/runtime/engine.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index e15cc49339ff..5c1202ba06ae 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -1911,9 +1911,6 @@ def print_forward_breakdown(self, fwd_time): @instrument_w_nvtx def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE): - assert not (self.bfloat16_enabled() and self.pipeline_parallelism), \ - f'allreduce_gradients() is not valid when bfloat+pipeline_parallelism is enabled' - # Pass (PP) gas boundary flag to optimizer (required for zero) self.optimizer.is_gradient_accumulation_boundary = self.is_gradient_accumulation_boundary() # ZeRO stage >= 2 communicates during non gradient accumulation boundaries as well @@ -1926,7 +1923,11 @@ def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE): self.optimizer, 'reduce_gradients'): self.optimizer.reduce_gradients(pipeline_parallel=self.pipeline_parallelism) else: - self.buffered_allreduce_fallback(elements_per_buffer=bucket_size) + grads = None + if hasattr(self.optimizer, "get_grads_for_reduction"): + # This is currently for BF16 optimizer + grads = self.optimizer.get_grads_for_reduction() + self.buffered_allreduce_fallback(grads=grads, elements_per_buffer=bucket_size) @instrument_w_nvtx def backward(self, loss, allreduce_gradients=True, release_loss=False, retain_graph=False, scale_wrt_gas=True): From d5fa87ff3f9ca181856258d437c2f2ef4cf9efc3 Mon Sep 17 00:00:00 2001 From: Bruno Magalhaes Date: Thu, 22 Feb 2024 13:07:03 +0100 Subject: [PATCH 18/18] Write multiple items to output file at once, in distributed data analyzer. (#5169) Minor improvements of [https://github.com/microsoft/DeepSpeed/pull/5129](https://github.com/microsoft/DeepSpeed/pull/5129). - Writes all buffers at once to the output file, instead of iteratively (`indexed_dataset.py`, method `add_items()`). - Fixes the wrong initialisation of `num_workers` and `worker_id` that were being ignored when they were provided by the user. --------- Co-authored-by: Conglong Li --- .../data_sampling/data_analyzer.py | 25 +++++++++++++------ .../data_sampling/indexed_dataset.py | 8 ++++++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py index 5da11097319d..7088df223bd8 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py @@ -12,7 +12,7 @@ import torch from torch.utils.data import BatchSampler, SequentialSampler, DataLoader, Subset -from deepspeed.utils import logger, groups +from deepspeed.utils import logger import deepspeed.comm as dist from deepspeed.runtime.data_pipeline.data_sampling.indexed_dataset import MMapIndexedDataset, valid_dtypes from deepspeed.runtime.data_pipeline.data_sampling.utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype @@ -482,17 +482,17 @@ def __init__( dist.init_distributed() # comm_group and worker_id+num_workers are mutually exclusive - if comm_group is not None: - self.comm_group = comm_group - self.num_workers = self.comm_group.size() - self.worker_id = self.comm_group.rank() + self.comm_group = comm_group + if self.comm_group is None: + # self.comm_group = deepspeed.utils.groups._clone_world_group() + self.num_workers = num_workers + self.worker_id = worker_id else: - self.comm_group = groups._clone_world_group() self.num_workers = self.comm_group.size() self.worker_id = self.comm_group.rank() if self.worker_id == 0: - logger.info(f"Data analyzer initialized with {self.num_workers} workers.") + logger.info(f"Distributed data analyzer initialized with {self.num_workers} workers.") def run_map_reduce(self): @@ -635,9 +635,18 @@ def file_write_ordered(self, tensor_list, fname, numpy_dtype): # method to deserializes a buffer into rows of different lengths and write them to file def write_buffer_to_file(buff, src, builder): assert self.worker_id == 0, "only rank 0 can write to file" + + # # write one buffer at a time + # for row_len in row_lens[src]: + # builder.add_item(buff[:row_len].cpu()) + # buff = buff[row_len:] + + # collect all buffers and write them all at once + buffer_list = [] for row_len in row_lens[src]: - builder.add_item(buff[:row_len].cpu()) + buffer_list.append(buff[:row_len].cpu()) buff = buff[row_len:] + builder.add_items(buffer_list) # 5. rank 0 prepares output folder and file if self.worker_id == 0: diff --git a/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py b/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py index 7a6963bc27eb..453e6ba6039d 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py @@ -581,10 +581,18 @@ def __init__(self, out_file, dtype=np.int64): self._doc_idx = [0] def add_item(self, tensor): + """ write the tensor to the file and update its size in the index""" np_array = np.array(tensor.numpy(), dtype=self._dtype) self._data_file.write(np_array.tobytes(order='C')) self._sizes.append(np_array.size) + def add_items(self, tensor_list): + """ write a list of tensors to the file and update their sizes in the index""" + np_arrays = [np.array(t.numpy(), dtype=self._dtype) for t in tensor_list] + self._data_file.writelines([arr.tobytes(order='C') for arr in np_arrays]) + for arr in np_arrays: + self._sizes.append(arr.size) + def add_item_numpy(self, np_array): if np_array.dtype != self._dtype: np_array = np_array.astype(self._dtype)