From 057d25be6775105f4b9e1d41e6c21981a157c849 Mon Sep 17 00:00:00 2001 From: Logan Adams <114770087+loadams@users.noreply.github.com> Date: Fri, 8 Nov 2024 08:34:20 -0800 Subject: [PATCH 01/13] Update version.txt after 0.15.4 release (#6731) **Auto-generated PR to update version.txt after a DeepSpeed release** Released version - 0.15.4 Author - @loadams Co-authored-by: loadams --- version.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.txt b/version.txt index 7ffdfa1cad65..1282fff53bfa 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.15.4 +0.15.5 From 08555662282f624f1258d45617aefef1577a4dd3 Mon Sep 17 00:00:00 2001 From: Logan Adams <114770087+loadams@users.noreply.github.com> Date: Mon, 11 Nov 2024 06:22:08 -0800 Subject: [PATCH 02/13] Update GH hosted workflows to 24.04 (#6717) `ubuntu-latset` is moving to be 24.04, so we should test updating as well to ensure it doesn't break any of our workflows. --- .github/workflows/cpu-torch-latest.yml | 2 +- .github/workflows/no-torch.yml | 3 ++- .github/workflows/nv-pre-compile-ops.yml | 2 +- .github/workflows/release.yml | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/.github/workflows/cpu-torch-latest.yml b/.github/workflows/cpu-torch-latest.yml index 0de6832b37c1..0125fa50bc14 100644 --- a/.github/workflows/cpu-torch-latest.yml +++ b/.github/workflows/cpu-torch-latest.yml @@ -19,7 +19,7 @@ concurrency: jobs: unit-tests: - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/no-torch.yml b/.github/workflows/no-torch.yml index eb3ac9b03161..1a13c0f3f4f1 100644 --- a/.github/workflows/no-torch.yml +++ b/.github/workflows/no-torch.yml @@ -19,7 +19,7 @@ permissions: jobs: unit-tests: - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v4 @@ -30,6 +30,7 @@ jobs: - name: Python environment run: | pip uninstall torch --yes + pip install setuptools pip list - name: Build deepspeed diff --git a/.github/workflows/nv-pre-compile-ops.yml b/.github/workflows/nv-pre-compile-ops.yml index 72ba8abbd95d..fc810bc190d0 100644 --- a/.github/workflows/nv-pre-compile-ops.yml +++ b/.github/workflows/nv-pre-compile-ops.yml @@ -21,7 +21,7 @@ concurrency: jobs: unit-tests: - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 container: image: deepspeed/gh-builder:ubuntu1804-py38-torch1131-cu116 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 02881ef12f39..eb763792f0c4 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -7,7 +7,7 @@ on: jobs: deploy: - runs-on: ubuntu-22.04 + runs-on: ubuntu-24.04 environment: release-env steps: From b7e2ff50807975934c3b181457a95df424d115c3 Mon Sep 17 00:00:00 2001 From: Olatunji Ruwase Date: Mon, 11 Nov 2024 14:51:10 -0500 Subject: [PATCH 03/13] Add COMMITTER file (#6741) Add COMMITTER file --- COMMITTERS.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 COMMITTERS.md diff --git a/COMMITTERS.md b/COMMITTERS.md new file mode 100644 index 000000000000..bcb8579bf1f7 --- /dev/null +++ b/COMMITTERS.md @@ -0,0 +1,9 @@ +# DeepSpeed TSC Committers # + +| Name | GitHub ID | Affiliation +|--- | ---- | --- | +| Olatunji Ruwase | [tjruwase](https://github.com/tjruwase) | Microsoft | +| Logan Adams | [loadams](https://github.com/loadams) | Microsoft | +| Masahiro Tanaka | [tohtana](https://github.com/tohtana) | Microsoft | +| Jeff Rasley | [jeffra](https://github.com/jeffra) | SnowFlake | +| Minjia Zhang | [minjiazhang](https://github.com/minjiazhang) | UIUC | From b45ca2635495997bb294f4b9b9dbcb23db0dcac6 Mon Sep 17 00:00:00 2001 From: Logan Adams <114770087+loadams@users.noreply.github.com> Date: Mon, 11 Nov 2024 13:26:41 -0800 Subject: [PATCH 04/13] Update AMD apex version (#6739) --- .github/workflows/amd-mi200.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/amd-mi200.yml b/.github/workflows/amd-mi200.yml index ea8d2f5f806f..6e8d5847835d 100644 --- a/.github/workflows/amd-mi200.yml +++ b/.github/workflows/amd-mi200.yml @@ -48,8 +48,6 @@ jobs: - name: Install (ROCm) apex run: | git clone https://github.com/ROCmSoftwarePlatform/apex.git - cd apex - git checkout torch_2.1_higher CURRENT_VER=$(git rev-parse HEAD) INSTALLED_VER=$(cat /blob/amd-apex/.venv_installed_version) if [[ "$CURRENT_VER" != "$INSTALLED_VER" ]]; then From 99e9cbed1663b13bbc240d79946913bfe430ffb5 Mon Sep 17 00:00:00 2001 From: Xinyu Lian Date: Mon, 11 Nov 2024 17:31:45 -0600 Subject: [PATCH 05/13] Fix Type Name Inconsistency & Typo in cpu_adam (#6732) There is a typing error & inconsistency in cpu-adam code, while not affecting functionality, impacts code readability. Specifically, the type name `ds_params_percision_t` contains a typo ('percision'), whereas the related type name `ds_state_precision_t` is spelled correctly. I think it is beneficial to fix this typo&inconsistency to improve code readability, maintainability and further development. I have tested the corrected version of cpu_adam, and it compiles and runs successfully. Compilation Log: image Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> Co-authored-by: Olatunji Ruwase --- csrc/adagrad/cpu_adagrad.cpp | 30 +++++++++++++++--------------- csrc/adam/cpu_adam_impl.cpp | 30 +++++++++++++++--------------- csrc/includes/cpu_adagrad.h | 20 ++++++++++---------- csrc/includes/cpu_adam.h | 20 ++++++++++---------- csrc/includes/cpu_lion.h | 20 ++++++++++---------- csrc/lion/cpu_lion_impl.cpp | 30 +++++++++++++++--------------- 6 files changed, 75 insertions(+), 75 deletions(-) diff --git a/csrc/adagrad/cpu_adagrad.cpp b/csrc/adagrad/cpu_adagrad.cpp index 5790e79e2bc2..e276ad0856dd 100644 --- a/csrc/adagrad/cpu_adagrad.cpp +++ b/csrc/adagrad/cpu_adagrad.cpp @@ -17,9 +17,9 @@ static std::unordered_map> s_optimizers; // C++ interface -template -void Adagrad_Optimizer::Step_1(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Adagrad_Optimizer::Step_1(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg_sq, size_t _param_size) { @@ -56,9 +56,9 @@ void Adagrad_Optimizer::Step_1(ds_params_percision_t* _params, } } -template -void Adagrad_Optimizer::Step_4(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Adagrad_Optimizer::Step_4(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg_sq, size_t _param_size) { @@ -104,9 +104,9 @@ int create_adagrad_optimizer(int optimizer_id, return 0; } -template -void Adagrad_Optimizer::Step_8(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Adagrad_Optimizer::Step_8(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg_sq, size_t _param_size) { @@ -121,15 +121,15 @@ void Adagrad_Optimizer::Step_8(ds_params_percision_t* _params, (_param_size - rounded_size)); } -template +template void step_invoker(std::shared_ptr opt, void* _params, void* grads, void* _exp_avg_sq, size_t _param_size) { - opt->Step_8((ds_params_percision_t*)(_params), - (ds_params_percision_t*)(grads), + opt->Step_8((ds_params_precision_t*)(_params), + (ds_params_precision_t*)(grads), (ds_state_precision_t*)(_exp_avg_sq), _param_size); } @@ -139,12 +139,12 @@ std::map, invokers; // Fill map with template functions for each type -template +template void create_invoker() { - invokers[std::tuple(c10::CppTypeToScalarType(), + invokers[std::tuple(c10::CppTypeToScalarType(), c10::CppTypeToScalarType())] = - step_invoker; + step_invoker; } struct InvokerInitializer { InvokerInitializer() diff --git a/csrc/adam/cpu_adam_impl.cpp b/csrc/adam/cpu_adam_impl.cpp index 15d4e74d69d5..465aae7b9a34 100644 --- a/csrc/adam/cpu_adam_impl.cpp +++ b/csrc/adam/cpu_adam_impl.cpp @@ -18,9 +18,9 @@ static std::unordered_map> s_optimizers; // C++ interface -template -void Adam_Optimizer::Step_1(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Adam_Optimizer::Step_1(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, ds_state_precision_t* _exp_avg_sq, size_t _param_size) @@ -67,9 +67,9 @@ void Adam_Optimizer::Step_1(ds_params_percision_t* _params, } } -template -void Adam_Optimizer::Step_4(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Adam_Optimizer::Step_4(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, ds_state_precision_t* _exp_avg_sq, size_t _param_size) @@ -126,9 +126,9 @@ int create_adam_optimizer(int optimizer_id, return 0; } -template -void Adam_Optimizer::Step_8(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Adam_Optimizer::Step_8(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, ds_state_precision_t* _exp_avg_sq, size_t _param_size) @@ -145,7 +145,7 @@ void Adam_Optimizer::Step_8(ds_params_percision_t* _params, (_param_size - rounded_size)); } -template +template void step_invoker(std::shared_ptr opt, void* _params, void* grads, @@ -153,8 +153,8 @@ void step_invoker(std::shared_ptr opt, void* _exp_avg_sq, size_t _param_size) { - opt->Step_8((ds_params_percision_t*)(_params), - (ds_params_percision_t*)(grads), + opt->Step_8((ds_params_precision_t*)(_params), + (ds_params_precision_t*)(grads), (ds_state_precision_t*)(_exp_avg), (ds_state_precision_t*)(_exp_avg_sq), _param_size); @@ -165,12 +165,12 @@ std::map, invokers; // Fill map with template functions for each type -template +template void create_invoker() { - invokers[std::tuple(c10::CppTypeToScalarType(), + invokers[std::tuple(c10::CppTypeToScalarType(), c10::CppTypeToScalarType())] = - step_invoker; + step_invoker; } struct InvokerInitializer { InvokerInitializer() diff --git a/csrc/includes/cpu_adagrad.h b/csrc/includes/cpu_adagrad.h index c06d3a6b35e9..6f500250f033 100644 --- a/csrc/includes/cpu_adagrad.h +++ b/csrc/includes/cpu_adagrad.h @@ -14,9 +14,9 @@ #include "simd.h" #define STEP(SPAN) \ - template \ - void Step_##SPAN(ds_params_percision_t* _params, \ - ds_params_percision_t* grads, \ + template \ + void Step_##SPAN(ds_params_precision_t* _params, \ + ds_params_precision_t* grads, \ ds_state_precision_t* _exp_avg_sq, \ size_t _param_size); @@ -28,10 +28,10 @@ class Adagrad_Optimizer { } ~Adagrad_Optimizer() {} #if defined(__AVX512__) or defined(__AVX256__) - template + template void Step_AVX(size_t* rounded_size, - ds_params_percision_t* _params, - ds_params_percision_t* grads, + ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg_sq, size_t param_size); #endif @@ -61,15 +61,15 @@ class Adagrad_Optimizer { }; #if defined(__AVX512__) or defined(__AVX256__) -template +template void Adagrad_Optimizer::Step_AVX(size_t* rounded_size, - ds_params_percision_t* _params, - ds_params_percision_t* grads, + ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg_sq, size_t _param_size) { #if !defined(__AVX512__) - if (std::is_same_v || + if (std::is_same_v || std::is_same_v) { return; } diff --git a/csrc/includes/cpu_adam.h b/csrc/includes/cpu_adam.h index faf99020aee5..a7db6fda3705 100644 --- a/csrc/includes/cpu_adam.h +++ b/csrc/includes/cpu_adam.h @@ -14,9 +14,9 @@ #include "simd.h" #define STEP(SPAN) \ - template \ - void Step_##SPAN(ds_params_percision_t* _params, \ - ds_params_percision_t* grads, \ + template \ + void Step_##SPAN(ds_params_precision_t* _params, \ + ds_params_precision_t* grads, \ ds_state_precision_t* _exp_avg, \ ds_state_precision_t* _exp_avg_sq, \ size_t _param_size); @@ -43,10 +43,10 @@ class Adam_Optimizer { ~Adam_Optimizer() {} #if defined(__AVX512__) or defined(__AVX256__) - template + template void Step_AVX(size_t* rounded_size, - ds_params_percision_t* _params, - ds_params_percision_t* grads, + ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, ds_state_precision_t* _exp_avg_sq, size_t param_size); @@ -106,16 +106,16 @@ class Adam_Optimizer { }; #if defined(__AVX512__) or defined(__AVX256__) -template +template void Adam_Optimizer::Step_AVX(size_t* rounded_size, - ds_params_percision_t* _params, - ds_params_percision_t* grads, + ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, ds_state_precision_t* _exp_avg_sq, size_t _param_size) { #if !defined(__AVX512__) - if (std::is_same_v || + if (std::is_same_v || std::is_same_v) { return; } diff --git a/csrc/includes/cpu_lion.h b/csrc/includes/cpu_lion.h index 62b304923222..beaf357a3211 100644 --- a/csrc/includes/cpu_lion.h +++ b/csrc/includes/cpu_lion.h @@ -14,9 +14,9 @@ #include "simd.h" #define STEP(SPAN) \ - template \ - void Step_##SPAN(ds_params_percision_t* _params, \ - ds_params_percision_t* grads, \ + template \ + void Step_##SPAN(ds_params_precision_t* _params, \ + ds_params_precision_t* grads, \ ds_state_precision_t* _exp_avg, \ size_t _param_size); @@ -32,10 +32,10 @@ class Lion_Optimizer { ~Lion_Optimizer() {} #if defined(__AVX512__) or defined(__AVX256__) - template + template void Step_AVX(size_t* rounded_size, - ds_params_percision_t* _params, - ds_params_percision_t* grads, + ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, size_t param_size); #endif @@ -67,15 +67,15 @@ class Lion_Optimizer { }; #if defined(__AVX512__) or defined(__AVX256__) -template +template void Lion_Optimizer::Step_AVX(size_t* rounded_size, - ds_params_percision_t* _params, - ds_params_percision_t* grads, + ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, size_t _param_size) { #if !defined(__AVX512__) - if (std::is_same_v || + if (std::is_same_v || std::is_same_v) { return; } diff --git a/csrc/lion/cpu_lion_impl.cpp b/csrc/lion/cpu_lion_impl.cpp index 85896ba86e19..6a98162314f9 100644 --- a/csrc/lion/cpu_lion_impl.cpp +++ b/csrc/lion/cpu_lion_impl.cpp @@ -19,9 +19,9 @@ static std::unordered_map> s_optimizers; // C++ interface -template -void Lion_Optimizer::Step_1(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Lion_Optimizer::Step_1(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, size_t _param_size) { @@ -64,9 +64,9 @@ void Lion_Optimizer::Step_1(ds_params_percision_t* _params, } } -template -void Lion_Optimizer::Step_4(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Lion_Optimizer::Step_4(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, size_t _param_size) { @@ -117,9 +117,9 @@ int create_lion_optimizer(int optimizer_id, return 0; } -template -void Lion_Optimizer::Step_8(ds_params_percision_t* _params, - ds_params_percision_t* grads, +template +void Lion_Optimizer::Step_8(ds_params_precision_t* _params, + ds_params_precision_t* grads, ds_state_precision_t* _exp_avg, size_t _param_size) { @@ -134,15 +134,15 @@ void Lion_Optimizer::Step_8(ds_params_percision_t* _params, (_param_size - rounded_size)); } -template +template void step_invoker(std::shared_ptr opt, void* _params, void* grads, void* _exp_avg, size_t _param_size) { - opt->Step_8((ds_params_percision_t*)(_params), - (ds_params_percision_t*)(grads), + opt->Step_8((ds_params_precision_t*)(_params), + (ds_params_precision_t*)(grads), (ds_state_precision_t*)(_exp_avg), _param_size); } @@ -152,12 +152,12 @@ std::map, invokers; // Fill map with template functions for each type -template +template void create_invoker() { - invokers[std::tuple(c10::CppTypeToScalarType(), + invokers[std::tuple(c10::CppTypeToScalarType(), c10::CppTypeToScalarType())] = - step_invoker; + step_invoker; } struct InvokerInitializer { InvokerInitializer() From fabab197f747a4ab3ac9c2a7bdd97b6aaa1db698 Mon Sep 17 00:00:00 2001 From: Chengming Zhang Date: Mon, 11 Nov 2024 17:55:09 -0600 Subject: [PATCH 06/13] Add Domino code (#6733) add domino code Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- deepspeed/__init__.py | 2 +- deepspeed/runtime/domino/__init__.py | 4 + deepspeed/runtime/domino/transformer.py | 518 ++++++++++++++++++++++++ 3 files changed, 523 insertions(+), 1 deletion(-) create mode 100644 deepspeed/runtime/domino/__init__.py create mode 100644 deepspeed/runtime/domino/transformer.py diff --git a/deepspeed/__init__.py b/deepspeed/__init__.py index de405dc40edb..a8d15cd5332b 100755 --- a/deepspeed/__init__.py +++ b/deepspeed/__init__.py @@ -42,7 +42,7 @@ from .utils import log_dist, OnDevice, logger from .comm.comm import init_distributed -from .runtime import zero +from .runtime import zero, domino from .runtime.compiler import is_compile_supported from .pipe import PipelineModule diff --git a/deepspeed/runtime/domino/__init__.py b/deepspeed/runtime/domino/__init__.py new file mode 100644 index 000000000000..208299fb8c50 --- /dev/null +++ b/deepspeed/runtime/domino/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: Apache-2.0 + +# DeepSpeed Team diff --git a/deepspeed/runtime/domino/transformer.py b/deepspeed/runtime/domino/transformer.py new file mode 100644 index 000000000000..8eb95e49c29d --- /dev/null +++ b/deepspeed/runtime/domino/transformer.py @@ -0,0 +1,518 @@ +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: Apache-2.0 + +# DeepSpeed Team + +import torch +import torch.nn.functional as F +from torch.nn.parameter import Parameter +import deepspeed +from deepspeed import comm as dist +from deepspeed.accelerator import get_accelerator + + +def is_rank_0(): + if dist.get_rank() == 0: + return True + + +class DominoModule(torch.nn.Module): + """extensions of torch Module.""" + + def __init__(self, ): + super(DominoModule, self).__init__() + + +import enum + + +class LayerType(enum.Enum): + encoder = 1 + decoder = 2 + + +class AttnType(enum.Enum): + self_attn = 1 + cross_attn = 2 + + +class AttnMaskType(enum.Enum): + padding = 1 + causal = 2 + + +class ModelType(enum.Enum): + encoder_or_decoder = 1 + encoder_and_decoder = 2 + + +handle_dic = {} + + +def no_oper(input_, dic_, h_id): + return NoOper.apply(input_, dic_, h_id) + + +class NoOper(torch.autograd.Function): + + @staticmethod + def symbolic(graph, input_, handle_dic, h_id): + return input_ + + @staticmethod + def forward(ctx, input_, handle_dic, h_id): + ctx.handle_dic = handle_dic + ctx.h_id = h_id + return input_ + + @staticmethod + def backward(ctx, grad_output): + handle = ctx.handle_dic[ctx.h_id] + handle.wait() + return grad_output, None, None + + +def copy_to_tensor_model_parallel_region_a(mpu, input_, dic_, h_id): + return _CopyToModelParallelRegionA.apply(mpu, input_, dic_, h_id) + + +class _CopyToModelParallelRegionA(torch.autograd.Function): + """Pass the input to the model parallel region.""" + + @staticmethod + def symbolic(graph, mpu, input_, handle_dic, h_id): + return input_ + + @staticmethod + def forward(ctx, mpu, input_, handle_dic, h_id): + ctx.mpu = mpu + ctx.handle_dic = handle_dic + ctx.h_id = h_id + return input_ + + @staticmethod + def backward(ctx, grad_output): + # Bypass the function if we are using only 1 GPU. + if ctx.mpu.get_tensor_model_parallel_world_size() == 1: + return grad_output + + # Async All-reduce. + handle = deepspeed.comm.all_reduce(grad_output, group=ctx.mpu.get_tensor_model_parallel_group(), async_op=True) + ctx.handle_dic[ctx.h_id] = handle + return None, grad_output, None, None + + +class CoreAttention(DominoModule): + + def __init__(self, config, layer_number, mpu, attn_mask_type=AttnMaskType.causal): + super(CoreAttention, self).__init__() + + self.layer_number = max(1, layer_number) + self.att_dropout_p = config.attention_dropout + self.is_causal = True + projection_size = config.kv_channels * config.num_attention_heads + world_size = mpu.get_tensor_model_parallel_world_size() + self.hidden_size_per_partition = projection_size // world_size + + def forward(self, query_layer, key_layer, value_layer, attention_mask): + # attn_mask is None when is_causal=True + context_layer = torch.nn.functional.scaled_dot_product_attention(query_layer, + key_layer, + value_layer, + attn_mask=None, + dropout_p=self.att_dropout_p, + is_causal=True, + scale=None) + + # [b, np, sq, hn] --> [sq, b, np, hn] + context_layer = context_layer.permute(2, 0, 1, 3).contiguous() + + # [sq, b, np, hn] --> [sq, b, hp] + new_context_layer_shape = context_layer.size()[:-2] + \ + (self.hidden_size_per_partition,) + context_layer = context_layer.view(*new_context_layer_shape) + + return context_layer + + +class ShardedAttention(DominoModule): + """Sharded self-attention layer class. + Only support self attention and causal attention mask + """ + + def __init__(self, + config, + layer_number, + mpu, + ColumnParallelLinear, + RowParallelLinearNoComm, + apply_rotary_pos_emb, + attention_type=AttnType.self_attn, + attn_mask_type=AttnMaskType.causal): + super(ShardedAttention, self).__init__() + self.layer_number = max(1, layer_number) + self.attention_type = attention_type + self.attn_mask_type = attn_mask_type + self.params_dtype = config.params_dtype + self.apply_rotary_pos_emb = apply_rotary_pos_emb + + query_projection_size = config.kv_channels * config.num_attention_heads + kv_projection_size = config.kv_channels * config.num_attention_heads + + # Per attention head and per partition values. + world_size = mpu.get_tensor_model_parallel_world_size() + self.hidden_size_per_attention_head = query_projection_size // config.num_attention_heads + self.num_attention_heads_per_partition = config.num_attention_heads // world_size + + self.query_key_value = ColumnParallelLinear(config.hidden_size, + query_projection_size + 2 * kv_projection_size, + config=config, + init_method=config.init_method, + bias=config.add_bias_linear, + gather_output=False) + + self.core_attention = CoreAttention(config, self.layer_number, mpu, self.attn_mask_type) + + self.dense = RowParallelLinearNoComm(query_projection_size, + config.hidden_size, + config=config, + init_method=config.output_layer_init_method, + bias=config.add_bias_linear, + input_is_parallel=True, + skip_bias_add=True) + + def forward(self, hidden_states, attention_mask, rotary_pos_emb=None): + # hidden_states: [s, b, h] + + # Query, Key, and Value + # Attention heads [s, b, h] --> [s, b, np * 3 * hn)] + mixed_x_layer, _ = self.query_key_value(hidden_states) + + # [s, b, np * 3 * hn] --> [s, b, np, 3 * hn] + new_tensor_shape = mixed_x_layer.size()[:-1] + ( + self.num_attention_heads_per_partition, + 3 * self.hidden_size_per_attention_head, + ) + mixed_x_layer = mixed_x_layer.view(*new_tensor_shape) + + # [s, b, np, 3 * hn] -> [b, np, s, 3*hn] + mixed_x_layer = mixed_x_layer.permute(1, 2, 0, 3).contiguous() + + # [s, b, np, 3 * hn] --> [s, b, np, hn], [s, b, np, hn], [s, b, np, hn] + (query_layer, key_layer, value_layer) = torch.split(mixed_x_layer, [ + self.hidden_size_per_attention_head, self.hidden_size_per_attention_head, + self.hidden_size_per_attention_head + ], + dim=3) + # [s, b, np, np * hn] -> [s, b, np, hn] + query_layer = query_layer.view(query_layer.size(0), query_layer.size(1), -1, + self.hidden_size_per_attention_head) + + # apply rotary embedding + if rotary_pos_emb is not None: + if isinstance(rotary_pos_emb, tuple): + rotary_pos_emb = rotary_pos_emb + else: + rotary_pos_emb = ((rotary_pos_emb, ) * 2) + q_pos_emb, k_pos_emb = rotary_pos_emb + query_layer = self.apply_rotary_pos_emb(query_layer, q_pos_emb) + key_layer = self.apply_rotary_pos_emb(key_layer, k_pos_emb) + + context_layer = self.core_attention(query_layer, key_layer, value_layer, attention_mask) + + # Output. [s, b, h] + output, bias = self.dense(context_layer) + + return output, bias + + +class DominoTransformerLayer(DominoModule): + """A domino single transformer layer. + [s, b, h] -> [s, b, h] + """ + + def __init__(self, + config, + layer_number, + mpu, + fused_layer_norm, + _initialize_affine_weight_gpu, + ColumnParallelLinear, + RowParallelLinearNoComm, + apply_rotary_pos_emb, + bias_dropout_add_fused_train, + bias_dropout_add_fused_inference, + skip_bias_add=True, + layer_type=LayerType.encoder, + self_attn_mask_type=AttnMaskType.causal, + drop_path_rate=0., + output_bias=None): + super(DominoTransformerLayer, self).__init__() + + self.llama_model = config.llama_model + self.layer_number = layer_number + self.layer_type = layer_type + self.apply_residual_connection_post_layernorm = config.apply_residual_connection_post_layernorm + self.bias_dropout_add_fused_train = bias_dropout_add_fused_train + self.bias_dropout_add_fused_inference = bias_dropout_add_fused_inference + self.mpu = mpu + self.output_bias = output_bias + + # Layernorm on the input data. + self.input_layernorm = fused_layer_norm(config.hidden_size, + eps=config.layernorm_epsilon, + no_persist_layer_norm=config.no_persist_layer_norm) + + # Self attention. + self.self_attention = ShardedAttention(config, + layer_number, + mpu, + ColumnParallelLinear, + RowParallelLinearNoComm, + apply_rotary_pos_emb, + attention_type=AttnType.self_attn, + attn_mask_type=self_attn_mask_type) + + self.hidden_dropout = config.hidden_dropout + + # Layernorm on the attention output + self.post_attention_layernorm = fused_layer_norm(config.hidden_size, + eps=config.layernorm_epsilon, + no_persist_layer_norm=config.no_persist_layer_norm) + + # ------------ init mlp start ------------ + init_method = config.init_method + output_layer_init_method = config.output_layer_init_method + self.add_bias = config.add_bias_linear + self.skip_bias_add = skip_bias_add + + ffn_hidden_size = config.ffn_hidden_size + if config.gated_linear_unit: + ffn_hidden_size *= 2 + self.output_size_c = config.ffn_hidden_size + self.input_size_c = config.hidden_size + self.input_size_r = config.ffn_hidden_size + self.output_size_r = self.input_size_c + + world_size = mpu.get_tensor_model_parallel_world_size() + self.output_size_per_partition = self.output_size_c // world_size + self.input_size_per_partition = self.input_size_r // world_size + + # Initialize weight. + self.weight_c = Parameter( + torch.empty(self.output_size_per_partition, + self.input_size_c, + device=get_accelerator().current_device_name(), + dtype=config.params_dtype)) + self.weight_r = Parameter( + torch.empty(self.output_size_r, + self.input_size_per_partition, + device=get_accelerator().current_device_name(), + dtype=config.params_dtype)) + + if config.perform_initialization: + _initialize_affine_weight_gpu(self.weight_c, init_method, partition_dim=0, stride=1) + + _initialize_affine_weight_gpu(self.weight_r, output_layer_init_method, partition_dim=1, stride=1) + + if self.add_bias: + self.bias_c = Parameter( + torch.empty(self.output_size_per_partition, + device=get_accelerator().current_device_name(), + dtype=config.params_dtype)) + self.bias_r = Parameter( + torch.empty(self.output_size_r, + device=get_accelerator().current_device_name(), + dtype=config.params_dtype)) + if config.perform_initialization: + with torch.no_grad(): + self.bias_c.zero_() + self.bias_r.zero_() + else: + self.register_parameter('bias_c', None) + self.register_parameter('bias_r', None) + + if config.swiglu: + + def swiglu(x): + x = torch.chunk(x, 2, dim=-1) + return F.silu(x[0]) * x[1] + + self.mlp_activation_func = swiglu + else: + self.mlp_activation_func = F.gelu + # ------------ init mlp end ------------ + + def forward(self, hidden_states, attention_mask, rotary_pos_emb=None): + # hidden_states: [s, b, h] + hidden_states0, hidden_states1 = hidden_states + + layernorm_output0 = self.input_layernorm(hidden_states0) + layernorm_output1 = self.input_layernorm(hidden_states1) + + if not self.llama_model: + rotary_pos_emb = None + + attention_output0, attention_bias0 = \ + self.self_attention( + layernorm_output0, + attention_mask, + rotary_pos_emb=rotary_pos_emb) + handle0 = deepspeed.comm.all_reduce(attention_output0, + group=self.mpu.get_tensor_model_parallel_group(), + async_op=True) + + attention_output1, attention_bias1 = \ + self.self_attention( + layernorm_output1, + attention_mask, + rotary_pos_emb=rotary_pos_emb) + handle1 = deepspeed.comm.all_reduce(attention_output1, + group=self.mpu.get_tensor_model_parallel_group(), + async_op=True) + handle0.wait() + + # Residual0 connection. + if self.apply_residual_connection_post_layernorm: + residual0 = layernorm_output0 + else: + residual0 = hidden_states0 + + if self.training: + bias_dropout_add_func = self.bias_dropout_add_fused_train + else: + bias_dropout_add_func = self.bias_dropout_add_fused_inference + if attention_bias0 is not None: + attention_bias0 = attention_bias0.expand_as(residual0) + layernorm_input0 = bias_dropout_add_func(attention_output0, attention_bias0, residual0, self.hidden_dropout) + + layernorm_output0 = self.post_attention_layernorm(layernorm_input0) + layernorm_output0 = no_oper(layernorm_output0, handle_dic, f'{self.layer_number}_0') + + # Residual1 connection. + if self.apply_residual_connection_post_layernorm: + residual1 = layernorm_output1 + else: + residual1 = hidden_states1 + + if attention_bias1 is not None: + attention_bias1 = attention_bias1.expand_as(residual1) + layernorm_input1 = bias_dropout_add_func(attention_output1, attention_bias1, residual1, self.hidden_dropout) + + layernorm_output1 = self.post_attention_layernorm(layernorm_input1) + layernorm_output1 = no_oper(layernorm_output1, handle_dic, f'{self.layer_number}_1') + + # ------------ explicit mlp start ------------ + bias_c = self.bias_c if not self.skip_bias_add else None + + input0 = copy_to_tensor_model_parallel_region_a(self.mpu, layernorm_output0, handle_dic, + f'{self.layer_number}_0') + # Batch0 Matrix multiply. + output0 = torch.matmul(input0, self.weight_c.t()) + if bias_c is not None: + output0 = output0 + bias_c + output0 = self.mlp_activation_func(output0) + output0 = torch.matmul(output0, self.weight_r.t()) + handle2 = deepspeed.comm.all_reduce(output0, group=self.mpu.get_tensor_model_parallel_group(), async_op=True) + + handle1.wait() + + input1 = copy_to_tensor_model_parallel_region_a(self.mpu, layernorm_output1, handle_dic, + f'{self.layer_number}_1') + # Batch1 Matrix multiply. + output1 = torch.matmul(input1, self.weight_c.t()) + output1 = self.mlp_activation_func(output1) + if bias_c is not None: + output1 = output1 + bias_c + output1 = torch.matmul(output1, self.weight_r.t()) + deepspeed.comm.all_reduce(output1, group=self.mpu.get_tensor_model_parallel_group()) + + handle2.wait() + + output0 = output0 + self.bias_r if self.bias_r is not None else output0 + output1 = output1 + self.bias_r if self.bias_r is not None else output1 + output_bias = self.output_bias + mlp_output0, mlp_output1, mlp_bias0, mlp_bias1 = output0, output1, output_bias, output_bias + # ------------ explicit mlp end ------------ + + if self.apply_residual_connection_post_layernorm: + residual0 = layernorm_output0 + residual1 = layernorm_output1 + else: + residual0 = layernorm_input0 + residual1 = layernorm_input1 + + if mlp_bias0 is not None: + mlp_bias0 = mlp_bias0.expand_as(residual0) + mlp_bias1 = mlp_bias1.expand_as(residual1) + output0 = bias_dropout_add_func(mlp_output0, mlp_bias0, residual0, self.hidden_dropout) + output1 = bias_dropout_add_func(mlp_output1, mlp_bias1, residual1, self.hidden_dropout) + + return output0, output1 + + +class DominoTransformer(DominoModule): + """Transformer class.""" + + def __init__(self, + config, + model_type, + mpu, + fused_layer_norm, + _initialize_affine_weight_gpu, + ColumnParallelLinear, + RowParallelLinearNoComm, + apply_rotary_pos_emb, + bias_dropout_add_fused_train, + bias_dropout_add_fused_inference, + layer_type=LayerType.encoder, + self_attn_mask_type=AttnMaskType.causal, + pre_process=True, + post_process=True, + post_layer_norm=True, + drop_path_rate=0.0): + super(DominoTransformer, self).__init__() + + self.layer_type = layer_type + self.model_type = model_type + self.post_process = post_process + self.post_layer_norm = post_layer_norm + self.num_layers = config.num_layers + self.drop_path_rate = drop_path_rate + self.drop_path_rates = [rate.item() for rate in torch.linspace(0, self.drop_path_rate, config.num_layers)] + + def build_layer(layer_number): + return DominoTransformerLayer(config, + layer_number, + mpu, + fused_layer_norm, + _initialize_affine_weight_gpu, + ColumnParallelLinear, + RowParallelLinearNoComm, + apply_rotary_pos_emb, + bias_dropout_add_fused_train, + bias_dropout_add_fused_inference, + layer_type=layer_type, + self_attn_mask_type=self_attn_mask_type, + drop_path_rate=self.drop_path_rates[layer_number - 1]) + + self.layers = torch.nn.ModuleList([build_layer(i + 1) for i in range(self.num_layers)]) + + if self.post_process and self.post_layer_norm: + self.final_layernorm = fused_layer_norm(config.hidden_size, + eps=config.layernorm_epsilon, + no_persist_layer_norm=config.no_persist_layer_norm) + + def forward(self, hidden_states, attention_mask, rotary_pos_emb=None): + # hidden_states: [s, b, h] + + for index in range(self.num_layers): + layer = self.layers[index] + hidden_states = layer(hidden_states, attention_mask, rotary_pos_emb) + + hidden_states0, hidden_states1 = hidden_states + if self.post_process and self.post_layer_norm: + hidden_states0 = self.final_layernorm(hidden_states0) + hidden_states1 = self.final_layernorm(hidden_states1) + + return hidden_states0, hidden_states1 From 73d974ee640a95a594be95bb68af00fd77e44409 Mon Sep 17 00:00:00 2001 From: Hongwei Chen <33092912+hwchen2017@users.noreply.github.com> Date: Tue, 12 Nov 2024 05:01:31 -0800 Subject: [PATCH 07/13] Add data type check for bf16 (#6742) Add data type check for bf16 to fix #6723 --- deepspeed/runtime/engine.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index 05bb23e8ddd9..b639ef6a1456 100755 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -1077,7 +1077,10 @@ def _supported_optims(self): # Validate configuration based on command line arguments def _do_sanity_check(self): if self.fp16_enabled() and not get_accelerator().is_fp16_supported(): - raise ValueError("Type fp16 is not supported.") + raise ValueError("Type fp16 is not supported on your device.") + + if self.bfloat16_enabled() and not get_accelerator().is_bf16_supported(): + raise ValueError("Type bf16 is not supported on your device.") expected_optim_types = self._supported_optims() expected_optim_types += [type(None), Callable] From 7af3a4beb5bf99517bb2d51b450861ca54bed8d3 Mon Sep 17 00:00:00 2001 From: inkcherry Date: Tue, 12 Nov 2024 22:25:33 +0800 Subject: [PATCH 08/13] add zero3 ```module_granularity_threshold ``` to zero optimization. (#6649) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds Z3 coalesced fetch to zero optimization. Currently, some logic can be reused, but it's difficult to realize that as optimization choice(I only discovered these logic when trying to implement it). The benefit of this approach is reducing host overhead(reduce many hooks) and during the process of recursive fetching parameters (especially in fine-grained models, such as those with a large number of moe experts). This is particularly helpful for host-sensitive devices (such as hpu), where it achieved a 40% performance improvement in our customer workloads. FYI @delock @deepcharm --------- Co-authored-by: Ma, Guokai Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> Co-authored-by: Olatunji Ruwase --- deepspeed/runtime/engine.py | 5 + deepspeed/runtime/zero/config.py | 9 ++ deepspeed/runtime/zero/parameter_offload.py | 91 ++++++++++++- deepspeed/runtime/zero/stage3.py | 8 +- deepspeed/utils/__init__.py | 2 +- deepspeed/utils/z3_leaf_module.py | 27 ++-- docs/_pages/config-json.md | 5 + .../runtime/zero/test_zero_leaf_module.py | 121 +++++++++++++++++- 8 files changed, 252 insertions(+), 16 deletions(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index b639ef6a1456..e1e745d2b112 100755 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -811,6 +811,9 @@ def zero_max_reuse_distance(self): def zero_prefetch_bucket_size(self): return self._config.zero_config.prefetch_bucket_size + def zero_module_granularity_threshold(self): + return self._config.zero_config.module_granularity_threshold + def zero_param_persistence_threshold(self): return self._config.zero_config.param_persistence_threshold @@ -1614,6 +1617,7 @@ def _configure_zero_optimizer(self, optimizer): zero_param_parallel_group=zero_param_parallel_group, zero_quantized_weights=self.zero_quantized_weights(), zero_quantized_nontrainable_weights=self.zero_quantized_nontrainable_weights(), + zero_module_granularity_threshold=self.zero_module_granularity_threshold(), ) else: log_dist( @@ -1660,6 +1664,7 @@ def _configure_zero_optimizer(self, optimizer): zero_hpz_partition_size=self.zero_hpz_partition_size(), zero_quantized_weights=self.zero_quantized_weights(), zero_quantized_nontrainable_weights=self.zero_quantized_nontrainable_weights(), + zero_module_granularity_threshold=self.zero_module_granularity_threshold(), ) else: diff --git a/deepspeed/runtime/zero/config.py b/deepspeed/runtime/zero/config.py index 1cfcd784e2ce..7cac7e3c1ce7 100644 --- a/deepspeed/runtime/zero/config.py +++ b/deepspeed/runtime/zero/config.py @@ -21,6 +21,7 @@ "stage3_max_live_parameters" : 1000000000, "stage3_max_reuse_distance" : 1000000000, "stage3_use_all_reduce_for_fetch_params": [true|false], + "stage3_module_granularity_threshold": 0, "allgather_partitions": [true|false], "use_multi_rank_bucket_allreduce": [true|false], "allgather_bucket_size": 500000000, @@ -245,6 +246,14 @@ class DeepSpeedZeroConfig(DeepSpeedConfigModel): this option is enabled and then saves the fp16 model weights. """ + module_granularity_threshold: int = Field(pp_int(0), alias="stage3_module_granularity_threshold") + """ + The granularity of a module is determined by the ratio of "parameter_count / (1 + descendant count)". + ZeRO3 classifies modules with a granularity below the threshold as fine-grained, + which are treated as integral units during parameter fetching. This reduces host overhead + and the separate allgather overhead introduced by hooks for fine-grained layers when fetching parameters. + """ + use_all_reduce_for_fetch_params: bool = Field(False, alias="stage3_use_all_reduce_for_fetch_params") """ Use all_reduce op when fetching module parameters at stage3. This improves performance by reducing diff --git a/deepspeed/runtime/zero/parameter_offload.py b/deepspeed/runtime/zero/parameter_offload.py index 4b0ddb7679a9..082d7e874e4d 100644 --- a/deepspeed/runtime/zero/parameter_offload.py +++ b/deepspeed/runtime/zero/parameter_offload.py @@ -6,7 +6,7 @@ import sys import torch from collections import OrderedDict -from deepspeed.utils import z3_leaf_module +from deepspeed.utils import z3_leaf_module, set_z3_leaf_module from deepspeed.runtime.utils import see_memory_usage from deepspeed.runtime.zero.utils import apply_to_tensors_only, is_zero_param from deepspeed.runtime.zero.offload_config import OffloadDeviceEnum @@ -14,6 +14,7 @@ from deepspeed.runtime.zero.partition_parameters import * from deepspeed.runtime.zero.partitioned_param_coordinator import PartitionedParameterCoordinator, InflightParamRegistry, iter_params from deepspeed.accelerator import get_accelerator +from deepspeed import utils FWD_MODULE_STACK = list() @@ -101,6 +102,7 @@ def __init__( zero_param_parallel_group=None, zero_quantized_weights=False, zero_quantized_nontrainable_weights=False, + zero_module_granularity_threshold=0, ): see_memory_usage("DeepSpeedZeRoOffload initialize [begin]", force=True) @@ -155,8 +157,16 @@ def __init__( zero_quantized_nontrainable_weights=self.zero_quantized_nontrainable_weights, ) + if zero_module_granularity_threshold > 0: + self.min_granularity_value = sys.maxsize + self.min_granularity_layer = None + self.granularity_info = set() + self.z3_leaf_layers = [] + self._set_z3_leaf_modules_by_threshold(module, zero_module_granularity_threshold) + self.forward_hooks = [] self.backward_hooks = [] + self.setup_zero_stage3_hooks() print_rank_0( f'Created module hooks: forward = {len(self.forward_hooks)}, backward = {len(self.backward_hooks)}', @@ -482,3 +492,82 @@ def post_sub_module_backward_function(self, sub_module): see_memory_usage( f"After sub module backward function {sub_module.__class__.__name__} {sub_module.id} after release", force=False) + + def _set_z3_leaf_modules_by_threshold(self, module, zero_module_granularity_threshold): + + self._get_granularity_recursively(module) + print_rank_0(f"{'MODULE NAME'.ljust(30)}|{'GRANULARITY VALUE'.rjust(20)}", force=True) + for granularity in self.granularity_info: + print_rank_0(granularity, force=True) + + if self.min_granularity_value <= zero_module_granularity_threshold: + self._set_leaf_by_threshold_preorder(module, zero_module_granularity_threshold) + utils.logger.info( + f"z3_leaf_module was set by stage3_module_granularity_threshold:{zero_module_granularity_threshold}") + for layer in self.z3_leaf_layers: + print_rank_0(f"{layer.__class__.__name__}:{layer.ds_model_granularity}", force=True) + else: + utils.logger.warning( + f"The smallest module granularity is [{self.min_granularity_layer}:{self.min_granularity_value}]. "\ + f"To make stage3_module_granularity_threshold effective, you need to set stage3_module_granularity_threshold >= {self.min_granularity_value}. "\ + f"Current Value:{zero_module_granularity_threshold}" + ) + + def _get_granularity_recursively(self, module): + """This function is used to recursively obtain the granularity of each module.""" + + # avoid setting as leaf for particularly large models, even if the granularity is very small + # an oversized leaf module increases the number of live parameters, introducing memory overhead + Z3_MAX_LEAF_SIZE = 1e9 + + if not list(module.parameters()): + # skip Modules without parameters, such as GELU, etc. + module.ds_model_granularity = sys.maxsize + return 0, 0 + + num_layers = 0 + num_params = 0 + num_params += sum(p.ds_numel for p in module.parameters(recurse=False)) + if not any(module.children()): + # torch leaf module + module.ds_model_granularity = sys.maxsize + return 1, num_params + + for child in module.children(): + layers_in_child, params_in_child = self._get_granularity_recursively(child) + num_layers += layers_in_child + num_params += params_in_child + + if module.__class__.__name__ in torch.nn.modules.container.__all__: + # Do not set container modules like ModuleList as leaf modules + # as this will prevent hooks from being set on their children + # and they may do not invoke the forward method + module.ds_model_granularity = sys.maxsize + return num_layers, num_params + + num_layers += 1 + ds_model_granularity = (num_params // num_layers) if num_params <= Z3_MAX_LEAF_SIZE else sys.maxsize + module.ds_model_granularity = ds_model_granularity + # module.ds_model_num_layers = num_layers + # module.ds_model_num_params = num_params + if self.min_granularity_value > ds_model_granularity: + self.min_granularity_value = ds_model_granularity + self.min_granularity_layer = module.__class__.__name__ + self.granularity_info.add(f"{module.__class__.__name__.ljust(30)}|{str(ds_model_granularity).rjust(20)}") + + return num_layers, num_params + + def _set_leaf_by_threshold_preorder(self, module, granularity_treshhold): + '''Set modules as leaf modules based on the threshold, prioritizing parent nodes.''' + + num_params = sum(p.ds_numel for p in module.parameters()) + if num_params == 0: + # skip Modules without parameters, such as GELU, etc. + return + if module.ds_model_granularity <= granularity_treshhold: + set_z3_leaf_module(module, True) + self.z3_leaf_layers.append(module) + return + + for sub_module in module.children(): + self._set_leaf_by_threshold_preorder(sub_module, granularity_treshhold) diff --git a/deepspeed/runtime/zero/stage3.py b/deepspeed/runtime/zero/stage3.py index 2c0c9d498d13..04d52319ae8c 100644 --- a/deepspeed/runtime/zero/stage3.py +++ b/deepspeed/runtime/zero/stage3.py @@ -157,6 +157,7 @@ def __init__( zero_hpz_partition_size=1, zero_quantized_weights=False, zero_quantized_nontrainable_weights=False, + zero_module_granularity_threshold=0, ): see_memory_usage("Stage 3 initialize beginning", force=True) @@ -227,7 +228,8 @@ def __init__( mpu=mpu, zero_param_parallel_group=zero_param_parallel_group, zero_quantized_weights=zero_quantized_weights, - zero_quantized_nontrainable_weights=zero_quantized_nontrainable_weights) + zero_quantized_nontrainable_weights=zero_quantized_nontrainable_weights, + zero_module_granularity_threshold=zero_module_granularity_threshold) self.persistent_parameters = self.parameter_offload.persistent_parameters self._configure_offloading(offload_optimizer_config, offload_param_config) @@ -458,6 +460,7 @@ def initialize_ds_offload( zero_param_parallel_group, zero_quantized_weights, zero_quantized_nontrainable_weights, + zero_module_granularity_threshold, ): return DeepSpeedZeRoOffload(module=module, timers=timers, @@ -473,7 +476,8 @@ def initialize_ds_offload( mpu=mpu, zero_param_parallel_group=zero_param_parallel_group, zero_quantized_weights=zero_quantized_weights, - zero_quantized_nontrainable_weights=zero_quantized_nontrainable_weights) + zero_quantized_nontrainable_weights=zero_quantized_nontrainable_weights, + zero_module_granularity_threshold=zero_module_granularity_threshold) def _get_trainable_parameter_groups(self): param_groups = [] diff --git a/deepspeed/utils/__init__.py b/deepspeed/utils/__init__.py index c6a202d485eb..983e64642c69 100644 --- a/deepspeed/utils/__init__.py +++ b/deepspeed/utils/__init__.py @@ -16,7 +16,7 @@ from .tensor_fragment import safe_set_full_fp32_param, safe_set_full_optimizer_state, safe_set_full_grad from .tensor_fragment import safe_get_local_fp32_param, safe_get_local_grad, safe_get_local_optimizer_state from .tensor_fragment import safe_set_local_fp32_param, safe_set_local_grad, safe_set_local_optimizer_state -from .z3_leaf_module import set_z3_leaf_modules, unset_z3_leaf_modules, get_z3_leaf_modules, z3_leaf_module, z3_leaf_parameter +from .z3_leaf_module import set_z3_leaf_modules, unset_z3_leaf_modules, get_z3_leaf_modules, z3_leaf_module, z3_leaf_parameter, set_z3_leaf_module from .mixed_precision_linkage import link_hp_params, lazy_init_hp_params_optimizer_state from deepspeed.runtime.dataloader import RepeatingLoader from .numa import get_numactl_cmd diff --git a/deepspeed/utils/z3_leaf_module.py b/deepspeed/utils/z3_leaf_module.py index 47d9ff698f1f..14e8ae2d2823 100644 --- a/deepspeed/utils/z3_leaf_module.py +++ b/deepspeed/utils/z3_leaf_module.py @@ -4,7 +4,7 @@ # DeepSpeed Team import torch -from typing import List, Type +from typing import List, Type, Union def z3_leaf_module(model: torch.nn.Module) -> bool: @@ -40,18 +40,24 @@ def get_z3_leaf_modules(model: torch.nn.Module) -> List[torch.nn.Module]: return [module for module in model.modules() if z3_leaf_module(module)] -def _do_set_z3_leaf_modules(model: torch.nn.Module, leaf_module_classes: List[Type], +def set_z3_leaf_module(model: torch.nn.Module, flag: bool): + model._z3_leaf = flag + + +def _do_set_z3_leaf_modules(model: torch.nn.Module, leaf_module_classes: Union[List[Type], List[str]], flag: bool) -> List[torch.nn.Module]: - assert all(isinstance(module_class, type) for module_class in leaf_module_classes), \ - f'leaf_module_classes must be a list of types, got {leaf_module_classes}' + assert all(isinstance(module_class, (type, str) ) for module_class in leaf_module_classes), \ + f'leaf_module_classes must be a list of types or names, got {leaf_module_classes}' leaf_modules = [] def _set_z3_leaf_flag(model: torch.nn.Module): nonlocal leaf_modules - if model.__class__ in leaf_module_classes: - model._z3_leaf = flag - leaf_modules.append(model) + for module in leaf_module_classes: + if (isinstance(module, type) and model.__class__ == module) or \ + (isinstance(module, str) and model.__class__.__name__ == module): + model._z3_leaf = flag + leaf_modules.append(model) model.apply(_set_z3_leaf_flag) @@ -61,13 +67,14 @@ def _set_z3_leaf_flag(model: torch.nn.Module): return leaf_modules -def set_z3_leaf_modules(model: torch.nn.Module, leaf_module_classes: List[Type]) -> List[torch.nn.Module]: +def set_z3_leaf_modules(model: torch.nn.Module, leaf_module_classes: Union[List[Type], + List[str]]) -> List[torch.nn.Module]: """Sets a flag within a module in `model` to instruct ZeRO3 to stop setting hooks recursively when it encounters a module class listed in `leaf_module_classes`. This is particularly useful in the context of Mixture of Experts (MoE) models. In MoE models, the computation order of experts varies across forward passes. This variability can disrupt ZeRO3's functionality, as ZeRO3 relies on tracking the computation order of modules to prefetch parameters efficiently. By designating a module as a 'leaf' node, ZeRO3 will prefetch parameters for all child modules upon entering the module. Another scenario where this functionality is beneficial is in models with excessively fine-grained nested modules, where it helps to avoid the overhead associated with hooks. Args: model (torch.nn.Module): The model to which the leaf module flag will be applied. - leaf_module_classes (List[Type]): A list of module classes that should be flagged as 'leaf' modules. + leaf_module_classes (Union[List[Type], List[str]]): A list of module classes that should be flagged as 'leaf' modules. Returns: List[torch.nn.Module]: A list of modules that match the module classes in `leaf_module_classes`. """ @@ -79,7 +86,7 @@ def unset_z3_leaf_modules(model: torch.nn.Module, leaf_module_classes: List[Type See `set_z3_leaf_modules` for more details. Args: model (torch.nn.Module): The model to which the leaf module flag will be applied. - leaf_module_classes (List[Type]): A list of module classes that should be flagged as 'leaf' modules. + leaf_module_classes (Union[List[Type], List[str]]): A list of module classes that should be flagged as 'leaf' modules. Returns: List[torch.nn.Module]: A list of modules that match the module classes in `leaf_module_classes`. """ diff --git a/docs/_pages/config-json.md b/docs/_pages/config-json.md index adb2f1679ea0..51e3bbd6eaaa 100755 --- a/docs/_pages/config-json.md +++ b/docs/_pages/config-json.md @@ -489,6 +489,11 @@ Enabling and configuring ZeRO memory optimizations |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------- | | Consolidate the weights before saving the model by `save_16bit_model()`. Since the weights are partitioned across GPUs, they aren't part of `state_dict`, so this function automatically gathers the weights when this option is enabled and then saves the fp16 model weights. | `False` | +***stage3_module_granularity_threshold***: [integer] +| Description | Default | +|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------- | +| The granularity of a module is determined by the ratio of `parameter_count` / `(1 + descendant_count)`. ZeRO3 classifies modules with a granularity below the threshold as fine-grained, treating them as integral units during parameter fetching. This reduces host and communication overhead from separate hooks. | `0` | + ***zero_hpz_partition_size***: [integer] | Description | Default | diff --git a/tests/unit/runtime/zero/test_zero_leaf_module.py b/tests/unit/runtime/zero/test_zero_leaf_module.py index 1d3b88a04a4e..74c709883645 100644 --- a/tests/unit/runtime/zero/test_zero_leaf_module.py +++ b/tests/unit/runtime/zero/test_zero_leaf_module.py @@ -3,6 +3,7 @@ # DeepSpeed Team +import pytest import deepspeed.comm as dist import torch @@ -12,6 +13,8 @@ import deepspeed from deepspeed.utils import set_z3_leaf_modules, unset_z3_leaf_modules, get_z3_leaf_modules, z3_leaf_module from deepspeed.accelerator import get_accelerator +from torch import nn +import time class ChooseModuleByCounter(torch.nn.Module): @@ -53,6 +56,49 @@ def forward(self, x, y): return x, loss +class MLPBlock(nn.Module): + + def __init__(self, hidden_dim): + super(MLPBlock, self).__init__() + self.gate_proj = nn.Linear(hidden_dim, hidden_dim, bias=False) + self.up_proj = nn.Linear(hidden_dim, hidden_dim, bias=False) + self.down_proj = nn.Linear(hidden_dim, hidden_dim, bias=False) + self.act_fn = nn.GELU() + + def forward(self, x): + return self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) + + +class FineGrainedBlock(nn.Module): + + def __init__(self, hidden_dim, num_block): + super(FineGrainedBlock, self).__init__() + self.num_block = num_block + self.mlp_layers = torch.nn.ModuleList([MLPBlock(hidden_dim=hidden_dim) for _ in range(self.num_block)]) + + def forward(self, x): + for i in range(self.num_block): + x = self.mlp_layers[i](x) + return x + + +class modelWithFineGrainedBlock(nn.Module): + + def __init__(self, hidden_dim, num_block): + super(modelWithFineGrainedBlock, self).__init__() + self.coarse_grained_layer1 = nn.Linear(hidden_dim, 8 * hidden_dim) + self.coarse_grained_layer2 = nn.Linear(8 * hidden_dim, hidden_dim) + self.fine_grained_layer = FineGrainedBlock(hidden_dim, num_block) + self.cel = torch.nn.CrossEntropyLoss() + + def forward(self, x, y): + x = self.coarse_grained_layer1(x) + x = self.coarse_grained_layer2(x) + x = self.fine_grained_layer(x) + loss = self.cel(x, y) + return x, loss + + def run_model(model, config_dict, hidden_dim, dtype, requires_grad): model, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config_dict) data_loader = random_dataloader(model=model, @@ -97,9 +143,9 @@ def _test_set_z3_leaf_modules(self, cls, requires_grad): "stage3_max_reuse_distance": 0, } } - if get_accelerator().is_fp16_supported(): + if preferred_dtype() is torch.float16: config_dict["fp16"] = {"enabled": True} - elif get_accelerator().is_bf16_supported(): + elif preferred_dtype() is torch.bfloat16: config_dict["bf16"] = {"enabled": True} model = cls(hidden_dim) @@ -143,3 +189,74 @@ def test_set_no_match_class(self): raise AssertionError("Expected error that no module is set as a leaf module") except ValueError as e: pass + + +@pytest.mark.parametrize("module_granularity_threshold", [0, 100, 12100, 10000000]) +class TestZ3LeafOptimization(DistributedTest): + world_size = 2 + reuse_dist_env = True + + def test_finegrained_optimization(self, module_granularity_threshold: int): + hidden_dim = 128 + num_block = 16 + config_dict = { + "train_micro_batch_size_per_gpu": 1, + "steps_per_print": 1, + "optimizer": { + "type": "Adam", + "params": { + "lr": 1e-6 + } + }, + "zero_optimization": { + "stage": 3, + "stage3_prefetch_bucket_size": hidden_dim**2, + "stage3_param_persistence_threshold": 0, + "stage3_max_reuse_distance": 0, + } + } + if preferred_dtype() is torch.float16: + config_dict["fp16"] = {"enabled": True} + elif preferred_dtype() is torch.bfloat16: + config_dict["bf16"] = {"enabled": True} + + def bench_loss_and_time(config): + warm_up_step = 10 + model = modelWithFineGrainedBlock(hidden_dim, num_block) + model, _, _, _ = deepspeed.initialize(model=model, model_parameters=model.parameters(), config=config) + data_loader = random_dataloader(model=model, + total_samples=20, + hidden_dim=hidden_dim, + device=model.device, + dtype=preferred_dtype()) + dist.barrier() + loss_list = [] + + for i, batch in enumerate(data_loader): + if i == warm_up_step: + dist.barrier() + get_accelerator().synchronize() + start_time = time.time() + batch[0].requires_grad = True + loss = model(batch[0], batch[1]) + loss = loss[1] + loss_list.append(loss) + model.backward(loss) + model.step() + get_accelerator().synchronize() + end_time = time.time() + duration = end_time - start_time + model.destroy() + return loss_list, duration + + baseline_loss_list, baseline_exec_time = bench_loss_and_time(config_dict) + + config_dict["zero_optimization"]["stage3_module_granularity_threshold"] = module_granularity_threshold + loss, duration = bench_loss_and_time(config_dict) + + if dist.get_rank() == 0: + print(f"baseline exec time:", baseline_exec_time) + print( + f"finegrained optimziation exec time: {duration},granularity threshold:{module_granularity_threshold} " + ) + assert baseline_loss_list == loss, f"incorrect loss value with threshold:{module_granularity_threshold}" From b692cdea479fba8201584054d654f639e925a265 Mon Sep 17 00:00:00 2001 From: Joe Mayer <114769929+jomayeri@users.noreply.github.com> Date: Tue, 12 Nov 2024 08:34:17 -0800 Subject: [PATCH 09/13] AIO File Offsets (#6641) Adding the option for a file offset to the read/write functions of AIO & GDS ops. --------- Co-authored-by: jomayeri Co-authored-by: Masahiro Tanaka <81312776+tohtana@users.noreply.github.com> Co-authored-by: Olatunji Ruwase Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- accelerator/cpu_accelerator.py | 2 + csrc/aio/common/deepspeed_aio_utils.cpp | 18 ++-- csrc/aio/common/deepspeed_aio_utils.h | 4 +- csrc/aio/py_lib/deepspeed_aio_op_desc.cpp | 6 +- csrc/aio/py_lib/deepspeed_aio_op_desc.h | 4 +- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 19 ++-- csrc/aio/py_lib/deepspeed_cpu_op.h | 3 +- csrc/aio/py_lib/deepspeed_py_aio.cpp | 7 +- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 65 +++++++++----- csrc/aio/py_lib/deepspeed_py_io_handle.h | 27 ++++-- csrc/aio/py_lib/py_ds_aio.cpp | 24 +++-- csrc/aio/py_test/ds_aio_handle.py | 2 +- csrc/gds/py_lib/deepspeed_gds_op.cpp | 24 +++-- csrc/gds/py_lib/deepspeed_gds_op.h | 3 +- csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 15 +++- csrc/gds/py_lib/deepspeed_py_gds_handle.h | 3 +- csrc/gds/py_lib/py_ds_gds.cpp | 24 +++-- deepspeed/runtime/swap_tensor/utils.py | 4 +- deepspeed/utils/numa.py | 5 +- tests/unit/ops/aio/test_aio.py | 97 +++++++++++++++++++-- tests/unit/ops/aio/test_gds.py | 87 ++++++++++++++++-- 21 files changed, 342 insertions(+), 101 deletions(-) diff --git a/accelerator/cpu_accelerator.py b/accelerator/cpu_accelerator.py index 1e4335b19292..0e49bd9f6458 100644 --- a/accelerator/cpu_accelerator.py +++ b/accelerator/cpu_accelerator.py @@ -71,6 +71,8 @@ def device_count(self): # In flat mode, HBM is in separate NUMA node with no cores on this node. # Ignore these NUMA nodes with no cores. numa_core_lists = get_numa_cores() + if not numa_core_lists: + return 1 numa_count = 0 prev_core_list = [] for core_list in numa_core_lists: diff --git a/csrc/aio/common/deepspeed_aio_utils.cpp b/csrc/aio/common/deepspeed_aio_utils.cpp index 0536ff6a362e..fb269b58315f 100644 --- a/csrc/aio/common/deepspeed_aio_utils.cpp +++ b/csrc/aio/common/deepspeed_aio_utils.cpp @@ -19,9 +19,14 @@ const int c_io_queue_depth = 8; io_xfer_ctxt::io_xfer_ctxt(const int fd, const int64_t file_offset, + const int64_t buffer_offset, const int64_t num_bytes, const void* buffer) - : _fd(fd), _base_offset(file_offset), _mem_buffer(buffer), _num_bytes(num_bytes) + : _fd(fd), + _file_base_offset(file_offset), + _buffer_base_offset(buffer_offset), + _mem_buffer(buffer), + _num_bytes(num_bytes) { } @@ -41,9 +46,10 @@ void io_prep_context::prep_iocbs(const int n_iocbs, assert(static_cast(n_iocbs) <= _iocbs->size()); for (auto i = 0; i < n_iocbs; ++i) { const auto shift = i * _block_size; - const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_base_offset + shift; - const auto xfer_offset = _xfer_ctxt->_base_offset + start_offset + shift; + const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_buffer_base_offset + shift; + const auto xfer_offset = _xfer_ctxt->_file_base_offset + start_offset + shift; auto byte_count = _block_size; + if ((shift + _block_size) > num_bytes) { byte_count = num_bytes - shift; } if (_read_op) { @@ -79,10 +85,10 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector* auto actual_n_iocbs = min(static_cast(n_iocbs), _remaining_io_blocks); for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) { - const auto xfer_offset = _xfer_ctxt->_base_offset + (_next_iocb_index * _block_size); - const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + xfer_offset; + const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + _xfer_ctxt->_buffer_base_offset + + (_next_iocb_index * _block_size); + const auto xfer_offset = _xfer_ctxt->_file_base_offset + (_next_iocb_index * _block_size); const auto num_bytes = min(static_cast(_block_size), _remaining_bytes); - if (_read_op) { io_prep_pread(iocbs->at(i), _xfer_ctxt->_fd, xfer_buffer, num_bytes, xfer_offset); } else { diff --git a/csrc/aio/common/deepspeed_aio_utils.h b/csrc/aio/common/deepspeed_aio_utils.h index 20e81fe8eebd..6b7599acecb4 100644 --- a/csrc/aio/common/deepspeed_aio_utils.h +++ b/csrc/aio/common/deepspeed_aio_utils.h @@ -30,12 +30,14 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. struct io_xfer_ctxt { const int _fd; - const int64_t _base_offset; + const int64_t _file_base_offset; + const int64_t _buffer_base_offset; const void* _mem_buffer; const int64_t _num_bytes; io_xfer_ctxt(const int fd, const int64_t file_offset, + const int64_t buffer_offset, const int64_t num_bytes, const void* buffer); }; diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp index 6f311c5400c7..945251397225 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp @@ -13,14 +13,16 @@ io_op_desc_t::io_op_desc_t(const bool read_op, const char* filename, const int64_t file_num_bytes, const int intra_op_parallelism, - const bool validate) + const bool validate, + const int64_t file_offset) : _read_op(read_op), _buffer(buffer), _fd(fd), _filename(filename), _file_num_bytes(file_num_bytes), + _file_offset(file_offset), _intra_op_parallelism(intra_op_parallelism), - _num_bytes_per_thread(file_num_bytes / intra_op_parallelism), + _num_bytes_per_thread(static_cast(buffer.nbytes()) / intra_op_parallelism), _validate(validate) { } diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.h b/csrc/aio/py_lib/deepspeed_aio_op_desc.h index f841b8ce520a..ac1cdf90f78b 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.h +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.h @@ -19,6 +19,7 @@ struct io_op_desc_t { const int64_t _num_bytes_per_thread; torch::Tensor _contiguous_buffer; const bool _validate; + const int64_t _file_offset; io_op_desc_t(const bool read_op, const torch::Tensor& buffer, @@ -26,7 +27,8 @@ struct io_op_desc_t { const char* filename, const int64_t file_num_bytes, const int intra_op_parallelism, - const bool validate); + const bool validate, + const int64_t file_offset); virtual void run(const int tid, std::unique_ptr& aio_ctxt, diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index da2ff568d74b..56fb33fb1886 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -16,8 +16,16 @@ cpu_op_desc_t::cpu_op_desc_t( const char* filename, const int64_t file_num_bytes, const int intra_op_parallelism, - const bool validate) - : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, intra_op_parallelism, validate), + const bool validate, + const int64_t file_offset) + : io_op_desc_t(read_op, + buffer, + fd, + filename, + file_num_bytes, + intra_op_parallelism, + validate, + file_offset), _cpu_buffer(buffer), _pinned_tensor_mgr(pinned_tensor_mgr), _is_managed_bounce_buffer(false) @@ -66,10 +74,11 @@ void cpu_op_desc_t::run(const int tid, deepspeed_aio_config_t* aio_config) { assert(tid < _intra_op_parallelism); - const auto base_offset = _num_bytes_per_thread * tid; + const auto buffer_base_offset = _num_bytes_per_thread * tid; + const auto file_base_offset = _file_offset + (_num_bytes_per_thread * tid); - std::unique_ptr xfer_ctxt( - new io_xfer_ctxt(_fd, base_offset, _num_bytes_per_thread, data_ptr())); + std::unique_ptr xfer_ctxt(new io_xfer_ctxt( + _fd, file_base_offset, buffer_base_offset, _num_bytes_per_thread, data_ptr())); if (aio_config->_overlap_events) { do_aio_operation_overlap(_read_op, aio_ctxt, xfer_ctxt, aio_config, nullptr); diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.h b/csrc/aio/py_lib/deepspeed_cpu_op.h index 9de2fa254048..debaf4a90731 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.h +++ b/csrc/aio/py_lib/deepspeed_cpu_op.h @@ -20,7 +20,8 @@ struct cpu_op_desc_t : io_op_desc_t { const char* filename, const int64_t file_num_bytes, const int intra_op_parallelism, - const bool validate); + const bool validate, + const int64_t file_offset); void run(const int tid, std::unique_ptr& aio_ctxt, diff --git a/csrc/aio/py_lib/deepspeed_py_aio.cpp b/csrc/aio/py_lib/deepspeed_py_aio.cpp index 02b04057d1ac..1ff0397043fa 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio.cpp +++ b/csrc/aio/py_lib/deepspeed_py_aio.cpp @@ -52,7 +52,9 @@ int deepspeed_py_aio_write(const torch::Tensor& buffer, auto write_buffer = (char*)buffer.data_ptr(); const auto num_write_bytes = static_cast(buffer.nbytes()); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer)); + + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer)); std::unique_ptr aio_ctxt(new aio_context(config._block_size, config._queue_depth)); if (config._overlap_events) { @@ -97,7 +99,8 @@ int deepspeed_py_aio_read(torch::Tensor& buffer, auto read_buffer = (char*)buffer.data_ptr(); assert(static_cast(buffer.nbytes()) == num_file_bytes); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer)); + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer)); std::unique_ptr aio_ctxt(new aio_context(config._block_size, config._queue_depth)); if (config._overlap_events) { diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index 48ea8a1339d4..64d7c2e0541e 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -58,7 +58,10 @@ const bool deepspeed_io_handle_t::get_overlap_events() const { return _overlap_e const int deepspeed_io_handle_t::get_intra_op_parallelism() const { return _intra_op_parallelism; } -int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, const bool validate) +int deepspeed_io_handle_t::read(torch::Tensor& buffer, + const char* filename, + const bool validate, + const int64_t file_offset) { const auto start_time = std::chrono::high_resolution_clock::now(); @@ -76,7 +79,8 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con if (fd == -1) { return -1; } auto read_buffer = (char*)buffer.data_ptr(); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer)); + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(fd, file_offset, 0, num_file_bytes, read_buffer)); if (_aio_config._overlap_events) { do_aio_operation_overlap(true, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr); @@ -98,7 +102,8 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con int deepspeed_io_handle_t::write(const torch::Tensor& buffer, const char* filename, - const bool validate) + const bool validate, + const int64_t file_offset) { assert(_aio_ctxt); @@ -109,7 +114,8 @@ int deepspeed_io_handle_t::write(const torch::Tensor& buffer, auto write_buffer = (char*)buffer.data_ptr(); const auto num_write_bytes = static_cast(buffer.nbytes()); - std::unique_ptr xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer)); + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(fd, file_offset, 0, num_write_bytes, write_buffer)); if (_aio_config._overlap_events) { do_aio_operation_overlap(false, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr); @@ -206,7 +212,8 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( const int fd, const char* filename, const int64_t file_num_bytes, - const bool validate) + const bool validate, + const int64_t file_offset) { return std::make_shared(read_op, buffer, @@ -215,13 +222,15 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( filename, file_num_bytes, _intra_op_parallelism, - validate); + validate, + file_offset); } int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, const char* filename, const bool validate, - const bool async) + const bool async, + const int64_t file_offset) { int64_t num_file_bytes; if (-1 == get_file_size(filename, num_file_bytes)) { @@ -229,20 +238,18 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, report_file_error(filename, " fstat for read", error_code); return -1; } + + // buffer can exceed file size to enable 4k alignment const auto buffer_bytes = static_cast(buffer.nbytes()); - if (buffer_bytes != num_file_bytes) { - std::cout << filename << ": buffer nbytes != file bytes " << buffer_bytes - << " != " << num_file_bytes << std::endl; - } - assert(buffer_bytes == num_file_bytes); assert((num_file_bytes % _intra_op_parallelism) == 0); - if (!_is_valid_parallel_aio_op(true, num_file_bytes)) { return -1; } + if (!_is_valid_parallel_aio_op(true, buffer_bytes)) { return -1; } const auto fd = open_file(filename, true); if (fd == -1) { return -1; } - auto scheduled_op = _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, validate); + auto scheduled_op = + _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, validate, file_offset); _schedule_aio_work(scheduled_op); @@ -254,7 +261,8 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, const char* filename, const bool validate, - const bool async) + const bool async, + const int64_t file_offset) { const auto num_write_bytes = static_cast(buffer.nbytes()); assert((num_write_bytes % _intra_op_parallelism) == 0); @@ -264,7 +272,8 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, const auto fd = open_file(filename, false); if (fd == -1) { return -1; } - auto scheduled_op = _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, validate); + auto scheduled_op = + _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, validate, file_offset); _schedule_aio_work(scheduled_op); @@ -273,24 +282,32 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, return wait(); } -int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename) +int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, + const char* filename, + const int64_t file_offset) { - return pread(buffer, filename, false, false); + return pread(buffer, filename, false, false, file_offset); } -int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* filename) +int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, + const char* filename, + const int64_t file_offset) { - return pwrite(buffer, filename, false, false); + return pwrite(buffer, filename, false, false, file_offset); } -int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename) +int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, + const char* filename, + const int64_t file_offset) { - return pread(buffer, filename, false, true); + return pread(buffer, filename, false, true, file_offset); } -int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, const char* filename) +int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, + const char* filename, + const int64_t file_offset) { - return pwrite(buffer, filename, false, true); + return pwrite(buffer, filename, false, true, file_offset); } at::Tensor deepspeed_io_handle_t::new_cpu_locked_tensor(const int64_t num_elem, diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index 4fedf8080818..dfcb4125ab9a 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -38,27 +38,35 @@ struct deepspeed_io_handle_t { const bool get_overlap_events() const; const int get_intra_op_parallelism() const; - int read(torch::Tensor& buffer, const char* filename, const bool validate); + int read(torch::Tensor& buffer, + const char* filename, + const bool validate, + const int64_t file_offset); - int write(const torch::Tensor& buffer, const char* filename, const bool validate); + int write(const torch::Tensor& buffer, + const char* filename, + const bool validate, + const int64_t file_offset); int pread(const torch::Tensor& buffer, const char* filename, const bool validate, - const bool async); + const bool async, + const int64_t file_offset); int pwrite(const torch::Tensor& buffer, const char* filename, const bool validate, - const bool async); + const bool async, + const int64_t file_offset); - int sync_pread(torch::Tensor& buffer, const char* filename); + int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset); - int sync_pwrite(const torch::Tensor& buffer, const char* filename); + int sync_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset); - int async_pread(torch::Tensor& buffer, const char* filename); + int async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset); - int async_pwrite(const torch::Tensor& buffer, const char* filename); + int async_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset); // TODO: Make API's args to be shape and dtype. torch::Tensor new_cpu_locked_tensor(const int64_t num_elem, @@ -81,5 +89,6 @@ struct deepspeed_io_handle_t { const int fd, const char* filename, const int64_t file_num_bytes, - const bool validate); + const bool validate, + const int64_t file_offset); }; diff --git a/csrc/aio/py_lib/py_ds_aio.cpp b/csrc/aio/py_lib/py_ds_aio.cpp index b80fa2d6c8e6..bf298b691b81 100644 --- a/csrc/aio/py_lib/py_ds_aio.cpp +++ b/csrc/aio/py_lib/py_ds_aio.cpp @@ -40,14 +40,16 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, - "validate"_a) + "validate"_a, + "file_offset"_a = 0) .def("write", &deepspeed_aio_handle_t::write, "Synchronous and non-parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, - "validate"_a) + "validate"_a, + "file_offset"_a = 0) .def("pread", &deepspeed_aio_handle_t::pread, @@ -55,7 +57,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "async"_a) + "async"_a, + "file_offset"_a = 0) .def("pwrite", &deepspeed_aio_handle_t::pwrite, @@ -63,33 +66,38 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "async"_a) + "async"_a, + "file_offset"_a = 0) .def("sync_pread", &deepspeed_aio_handle_t::sync_pread, "Synchrononous parallel file read. Returns count of completed read ops", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a = 0) .def("sync_pwrite", &deepspeed_aio_handle_t::sync_pwrite, "Synchronous parallel file write. Returns count of completed write ops", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a = 0) .def("async_pread", &deepspeed_aio_handle_t::async_pread, "Asynchronous parallel file read. Returns 0 on success. Returns 0 on success, and " "following wait() returns count of completed ops.", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a = 0) .def("async_pwrite", &deepspeed_aio_handle_t::async_pwrite, "Asynchronous parallel file write. Returns 0 on success, and following wait() returns " "count of completed ops.", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a = 0) .def("new_cpu_locked_tensor", &deepspeed_aio_handle_t::new_cpu_locked_tensor, diff --git a/csrc/aio/py_test/ds_aio_handle.py b/csrc/aio/py_test/ds_aio_handle.py index f4a179deb9ec..6913e9090bf5 100755 --- a/csrc/aio/py_test/ds_aio_handle.py +++ b/csrc/aio/py_test/ds_aio_handle.py @@ -92,7 +92,7 @@ def main_parallel_read(pool_params): start_time = time.time() dest_buffer = BOUNCE_BUFFER if ctxt[BOUNCE_BUFFER] is not None else BUFFER - ret = handle.pread(ctxt[dest_buffer], ctxt['file'], args.validate, True) + ret = handle.pread(ctxt[dest_buffer], ctxt['file'], args.validate, 0, True) assert ret != -1 handle.wait() if dest_buffer == BOUNCE_BUFFER: diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index f49f74394374..b7055c8cc72b 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -95,8 +95,16 @@ gds_op_desc_t::gds_op_desc_t(const bool read_op, const char* filename, const int64_t file_num_bytes, const int intra_op_parallelism, - const bool validate) - : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, intra_op_parallelism, validate) + const bool validate, + const int64_t file_offset) + : io_op_desc_t(read_op, + buffer, + fd, + filename, + file_num_bytes, + intra_op_parallelism, + validate, + file_offset) { _contiguous_buffer = _buffer.contiguous(); const int64_t device = _buffer.get_device(); @@ -124,17 +132,17 @@ void gds_op_desc_t::run(const int tid, { assert(tid < _intra_op_parallelism); check_cudaruntimecall(cudaSetDevice(_buffer.get_device())); - int64_t buf_offset = data_ptr() + (_num_bytes_per_thread * tid) - (char*)_base_ptr; - const auto file_offset = _num_bytes_per_thread * tid; + const auto buf_offset = data_ptr() + (_num_bytes_per_thread * tid) - (char*)_base_ptr; + const auto tid_file_offset = _file_offset + (_num_bytes_per_thread * tid); if (_read_op) { auto ret = - cuFileRead(_cf_handle, _base_ptr, _num_bytes_per_thread, file_offset, buf_offset); - if (ret < 0) { _report_error(ret, errno, buf_offset); } + cuFileRead(_cf_handle, _base_ptr, _num_bytes_per_thread, tid_file_offset, buf_offset); + if (ret < 0) { _report_error(ret, errno, tid_file_offset); } } else { auto ret = - cuFileWrite(_cf_handle, _base_ptr, _num_bytes_per_thread, file_offset, buf_offset); - if (ret < 0) { _report_error(ret, errno, buf_offset); } + cuFileWrite(_cf_handle, _base_ptr, _num_bytes_per_thread, tid_file_offset, buf_offset); + if (ret < 0) { _report_error(ret, errno, tid_file_offset); } } } diff --git a/csrc/gds/py_lib/deepspeed_gds_op.h b/csrc/gds/py_lib/deepspeed_gds_op.h index 380bb0b9b6ae..d955527b1ba3 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.h +++ b/csrc/gds/py_lib/deepspeed_gds_op.h @@ -24,7 +24,8 @@ struct gds_op_desc_t : io_op_desc_t { const char* filename, const int64_t file_num_bytes, const int intra_op_parallelism, - const bool validate); + const bool validate, + const int64_t file_offset); void run(const int tid, std::unique_ptr& aio_ctxt, diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index c052144a0190..f11245c75a5e 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -107,12 +107,19 @@ std::shared_ptr deepspeed_gds_handle_t::_create_io_op_desc( const int fd, const char* filename, const int64_t file_num_bytes, - const bool validate) + const bool validate, + const int64_t file_offset) { if (buffer.is_cuda()) { - return std::make_shared( - read_op, buffer, fd, filename, file_num_bytes, _intra_op_parallelism, validate); + return std::make_shared(read_op, + buffer, + fd, + filename, + file_num_bytes, + _intra_op_parallelism, + validate, + file_offset); } return deepspeed_io_handle_t::_create_io_op_desc( - read_op, buffer, fd, filename, file_num_bytes, validate); + read_op, buffer, fd, filename, file_num_bytes, validate, file_offset); } diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.h b/csrc/gds/py_lib/deepspeed_py_gds_handle.h index 131e83e7b838..25f68e177b2c 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.h +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.h @@ -42,7 +42,8 @@ struct deepspeed_gds_handle_t : deepspeed_io_handle_t { const int fd, const char* filename, const int64_t file_num_bytes, - const bool validate); + const bool validate, + const int64_t file_offset); static int s_cuFile_init; }; diff --git a/csrc/gds/py_lib/py_ds_gds.cpp b/csrc/gds/py_lib/py_ds_gds.cpp index 57bf8d2207c4..2f165ee2c32a 100644 --- a/csrc/gds/py_lib/py_ds_gds.cpp +++ b/csrc/gds/py_lib/py_ds_gds.cpp @@ -33,14 +33,16 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "Synchronous and non-parallel file read. Returns count of completed read ops", "buffer"_a, "filename"_a, - "validate"_a) + "validate"_a, + "file_offset"_a = 0) .def("write", &deepspeed_gds_handle_t::write, "Synchronous and non-parallel file write. Returns count of completed write ops", "buffer"_a, "filename"_a, - "validate"_a) + "validate"_a, + "file_offset"_a = 0) .def("pread", &deepspeed_gds_handle_t::pread, @@ -48,7 +50,8 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "async"_a) + "async"_a, + "file_offset"_a = 0) .def("pwrite", &deepspeed_gds_handle_t::pwrite, @@ -56,33 +59,38 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "buffer"_a, "filename"_a, "validate"_a, - "async"_a) + "async"_a, + "file_offset"_a = 0) .def("sync_pread", &deepspeed_gds_handle_t::sync_pread, "Synchrononous parallel file read. Returns count of completed read ops", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a = 0) .def("sync_pwrite", &deepspeed_gds_handle_t::sync_pwrite, "Synchronous parallel file write. Returns count of completed write ops", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a = 0) .def("async_pread", &deepspeed_gds_handle_t::async_pread, "Asynchronous parallel file read. Returns 0 on success. Returns 0 on success, and " "following wait() returns count of completed ops.", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a = 0) .def("async_pwrite", &deepspeed_gds_handle_t::async_pwrite, "Asynchronous parallel file write. Returns 0 on success, and following wait() returns " "count of completed ops.", "buffer"_a, - "filename"_a) + "filename"_a, + "file_offset"_a = 0) .def("new_cpu_locked_tensor", &deepspeed_gds_handle_t::new_cpu_locked_tensor, diff --git a/deepspeed/runtime/swap_tensor/utils.py b/deepspeed/runtime/swap_tensor/utils.py index 90b2d9b8bd31..1f9825c34638 100644 --- a/deepspeed/runtime/swap_tensor/utils.py +++ b/deepspeed/runtime/swap_tensor/utils.py @@ -18,12 +18,12 @@ def swap_in_tensors(swap_handle, tensor_buffers, swap_paths): for buffer, path in zip(tensor_buffers, swap_paths): - assert (swap_handle.async_pread(buffer, path) == 0) + assert (swap_handle.async_pread(buffer, path, 0) == 0) def swap_out_tensors(swap_handle, tensor_buffers, swap_paths): for buffer, path in zip(tensor_buffers, swap_paths): - assert (swap_handle.async_pwrite(buffer, path) == 0) + assert (swap_handle.async_pwrite(buffer, path, 0) == 0) def print_object(obj, name, exclude_list=[]): diff --git a/deepspeed/utils/numa.py b/deepspeed/utils/numa.py index 4fe7cbba90ae..aba3b5179d41 100644 --- a/deepspeed/utils/numa.py +++ b/deepspeed/utils/numa.py @@ -23,7 +23,10 @@ # ] def get_numa_cores(): ret = [] - output = subprocess.check_output(['numactl', '--hardware']).decode("utf-8") + try: + output = subprocess.check_output(['numactl', '--hardware']).decode("utf-8") + except: + return [] lines = output.split('\n') for line in lines: if line.startswith('available:'): diff --git a/tests/unit/ops/aio/test_aio.py b/tests/unit/ops/aio/test_aio.py index a074cfca317f..1aa5f647a8aa 100644 --- a/tests/unit/ops/aio/test_aio.py +++ b/tests/unit/ops/aio/test_aio.py @@ -35,16 +35,21 @@ def _get_local_rank(): return 0 -def _do_ref_write(tmpdir, index=0): +def _do_ref_write(tmpdir, index=0, file_size=IO_SIZE): file_suffix = f'{_get_local_rank()}_{index}' ref_file = os.path.join(tmpdir, f'_py_random_{file_suffix}.pt') - ref_buffer = os.urandom(IO_SIZE) + ref_buffer = os.urandom(file_size) with open(ref_file, 'wb') as f: f.write(ref_buffer) return ref_file, ref_buffer +def _get_file_path(tmpdir, file_prefix, index=0): + file_suffix = f'{_get_local_rank()}_{index}' + return os.path.join(tmpdir, f'{file_prefix}_{file_suffix}.pt') + + def _get_test_write_file(tmpdir, index): file_suffix = f'{_get_local_rank()}_{index}' return os.path.join(tmpdir, f'_aio_write_random_{file_suffix}.pt') @@ -103,7 +108,7 @@ def test_parallel_read(self, tmpdir, use_cuda_pinned_tensor, single_submit, over _validate_handle_state(h, single_submit, overlap_events) ref_file, _ = _do_ref_write(tmpdir) - read_status = h.sync_pread(aio_buffer, ref_file) + read_status = h.sync_pread(aio_buffer, ref_file, 0) assert read_status == 1 with open(ref_file, 'rb') as f: @@ -131,7 +136,7 @@ def test_async_read(self, tmpdir, use_cuda_pinned_tensor, single_submit, overlap _validate_handle_state(h, single_submit, overlap_events) ref_file, _ = _do_ref_write(tmpdir) - read_status = h.async_pread(aio_buffer, ref_file) + read_status = h.async_pread(aio_buffer, ref_file, 0) assert read_status == 0 wait_status = h.wait() @@ -172,7 +177,7 @@ def test_parallel_write(self, tmpdir, use_cuda_pinned_tensor, single_submit, ove _validate_handle_state(h, single_submit, overlap_events) - write_status = h.sync_pwrite(aio_buffer, aio_file) + write_status = h.sync_pwrite(aio_buffer, aio_file, 0) assert write_status == 1 if not use_cuda_pinned_tensor: @@ -201,7 +206,7 @@ def test_async_write(self, tmpdir, use_cuda_pinned_tensor, single_submit, overla _validate_handle_state(h, single_submit, overlap_events) - write_status = h.async_pwrite(aio_buffer, aio_file) + write_status = h.async_pwrite(aio_buffer, aio_file, 0) assert write_status == 0 wait_status = h.wait() @@ -258,7 +263,7 @@ def test_read(self, tmpdir, async_queue, use_cuda_pinned_tensor, use_unpinned_te _validate_handle_state(h, single_submit, overlap_events) for i in range(async_queue): - read_status = h.async_pread(aio_buffers[i], ref_files[i]) + read_status = h.async_pread(aio_buffers[i], ref_files[i], 0) assert read_status == 0 wait_status = h.wait() @@ -305,7 +310,7 @@ def test_write(self, tmpdir, use_cuda_pinned_tensor, async_queue, use_unpinned_t _validate_handle_state(h, single_submit, overlap_events) for i in range(async_queue): - read_status = h.async_pwrite(aio_buffers[i], aio_files[i]) + read_status = h.async_pwrite(aio_buffers[i], aio_files[i], 0) assert read_status == 0 wait_status = h.wait() @@ -320,3 +325,79 @@ def test_write(self, tmpdir, use_cuda_pinned_tensor, async_queue, use_unpinned_t filecmp.clear_cache() assert filecmp.cmp(ref_files[i], aio_files[i], shallow=False) + + +@pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False]) +@pytest.mark.parametrize('file_partitions', [[1, 1, 1], [1, 1, 2], [1, 2, 1], [2, 1, 1]]) +class TestAsyncFileOffset(DistributedTest): + world_size = 1 + + def test_offset_write(self, tmpdir, file_partitions, use_cuda_pinned_tensor): + + _skip_for_invalid_environment(use_cuda_pinned_tensor=use_cuda_pinned_tensor) + ref_file = _get_file_path(tmpdir, '_py_random') + aio_file = _get_file_path(tmpdir, '_aio_random') + partition_unit_size = BLOCK_SIZE + file_size = sum(file_partitions) * partition_unit_size + + h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) + + if use_cuda_pinned_tensor: + data_buffer = torch.ByteTensor(list(os.urandom(file_size))).pin_memory() + else: + data_buffer = h.new_cpu_locked_tensor(file_size, torch.empty(0, dtype=torch.uint8)) + + file_offsets = [] + next_offset = 0 + for i in range(len(file_partitions)): + file_offsets.append(next_offset) + next_offset += file_partitions[i] * partition_unit_size + + ref_fd = open(ref_file, 'wb') + for i in range(len(file_partitions)): + src_buffer = torch.narrow(data_buffer, 0, file_offsets[i], file_partitions[i] * partition_unit_size) + + ref_fd.write(src_buffer.numpy().tobytes()) + ref_fd.flush() + + assert 1 == h.sync_pwrite(buffer=src_buffer, filename=aio_file, file_offset=file_offsets[i]) + + filecmp.clear_cache() + assert filecmp.cmp(ref_file, aio_file, shallow=False) + + ref_fd.close() + + if not use_cuda_pinned_tensor: + h.free_cpu_locked_tensor(data_buffer) + + def test_offset_read(self, tmpdir, file_partitions, use_cuda_pinned_tensor): + + _skip_for_invalid_environment(use_cuda_pinned_tensor=use_cuda_pinned_tensor) + partition_unit_size = BLOCK_SIZE + file_size = sum(file_partitions) * partition_unit_size + ref_file, _ = _do_ref_write(tmpdir, 0, file_size) + h = AsyncIOBuilder().load().aio_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) + + if use_cuda_pinned_tensor: + data_buffer = torch.zeros(file_size, dtype=torch.uint8, device='cpu').pin_memory() + else: + data_buffer = h.new_cpu_locked_tensor(file_size, torch.empty(0, dtype=torch.uint8)) + + file_offsets = [] + next_offset = 0 + for i in range(len(file_partitions)): + file_offsets.append(next_offset) + next_offset += file_partitions[i] * partition_unit_size + + with open(ref_file, 'rb') as ref_fd: + for i in range(len(file_partitions)): + ref_fd.seek(file_offsets[i]) + bytes_to_read = file_partitions[i] * partition_unit_size + ref_buf = list(ref_fd.read(bytes_to_read)) + + dst_tensor = torch.narrow(data_buffer, 0, 0, bytes_to_read) + assert 1 == h.sync_pread(dst_tensor, ref_file, file_offsets[i]) + assert dst_tensor.tolist() == ref_buf + + if not use_cuda_pinned_tensor: + h.free_cpu_locked_tensor(data_buffer) diff --git a/tests/unit/ops/aio/test_gds.py b/tests/unit/ops/aio/test_gds.py index e94d42cd22af..d97eff452eb5 100644 --- a/tests/unit/ops/aio/test_gds.py +++ b/tests/unit/ops/aio/test_gds.py @@ -29,16 +29,21 @@ def _get_local_rank(): return 0 -def _do_ref_write(tmpdir, index=0): +def _do_ref_write(tmpdir, index=0, file_size=IO_SIZE): file_suffix = f'{_get_local_rank()}_{index}' ref_file = os.path.join(tmpdir, f'_py_random_{file_suffix}.pt') - ref_buffer = os.urandom(IO_SIZE) + ref_buffer = os.urandom(file_size) with open(ref_file, 'wb') as f: f.write(ref_buffer) return ref_file, ref_buffer +def _get_file_path(tmpdir, file_prefix, index=0): + file_suffix = f'{_get_local_rank()}_{index}' + return os.path.join(tmpdir, f'{file_prefix}_{file_suffix}.pt') + + def _get_test_write_file(tmpdir, index): file_suffix = f'{_get_local_rank()}_{index}' return os.path.join(tmpdir, f'_gds_write_random_{file_suffix}.pt') @@ -78,7 +83,7 @@ def test_parallel_read(self, tmpdir, single_submit, overlap_events): _validate_handle_state(h, single_submit, overlap_events) ref_file, _ = _do_ref_write(tmpdir) - read_status = h.sync_pread(gds_buffer, ref_file) + read_status = h.sync_pread(gds_buffer, ref_file, 0) assert read_status == 1 with open(ref_file, 'rb') as f: @@ -97,7 +102,7 @@ def test_async_read(self, tmpdir, single_submit, overlap_events): _validate_handle_state(h, single_submit, overlap_events) ref_file, _ = _do_ref_write(tmpdir) - read_status = h.async_pread(gds_buffer, ref_file) + read_status = h.async_pread(gds_buffer, ref_file, 0) assert read_status == 0 wait_status = h.wait() @@ -128,7 +133,7 @@ def test_parallel_write(self, tmpdir, single_submit, overlap_events): _validate_handle_state(h, single_submit, overlap_events) - write_status = h.sync_pwrite(gds_buffer, gds_file) + write_status = h.sync_pwrite(gds_buffer, gds_file, 0) assert write_status == 1 h.unpin_device_tensor(gds_buffer) @@ -146,7 +151,7 @@ def test_async_write(self, tmpdir, single_submit, overlap_events): _validate_handle_state(h, single_submit, overlap_events) - write_status = h.async_pwrite(gds_buffer, gds_file) + write_status = h.async_pwrite(gds_buffer, gds_file, 0) assert write_status == 0 wait_status = h.wait() @@ -188,7 +193,7 @@ def test_read(self, tmpdir, async_queue): _validate_handle_state(h, single_submit, overlap_events) for i in range(async_queue): - read_status = h.async_pread(gds_buffers[i], ref_files[i]) + read_status = h.async_pread(gds_buffers[i], ref_files[i], 0) assert read_status == 0 wait_status = h.wait() @@ -225,7 +230,7 @@ def test_write(self, tmpdir, async_queue): _validate_handle_state(h, single_submit, overlap_events) for i in range(async_queue): - read_status = h.async_pwrite(gds_buffers[i], gds_files[i]) + read_status = h.async_pwrite(gds_buffers[i], gds_files[i], 0) assert read_status == 0 wait_status = h.wait() @@ -268,3 +273,69 @@ def test_pin_device_tensor(self, use_new_api): h.free_pinned_device_tensor(pinned_buffer) else: h.unpin_device_tensor(pinned_buffer) + + +@pytest.mark.parametrize('file_partitions', [[1, 1, 1], [1, 1, 2], [1, 2, 1], [2, 1, 1]]) +class TestAsyncFileOffset(DistributedTest): + world_size = 1 + + def test_offset_write(self, tmpdir, file_partitions): + ref_file = _get_file_path(tmpdir, '_py_random') + aio_file = _get_file_path(tmpdir, '_aio_random') + partition_unit_size = IO_SIZE + file_size = sum(file_partitions) * partition_unit_size + + h = GDSBuilder().load().gds_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) + + gds_buffer = torch.empty(file_size, dtype=torch.uint8, device=get_accelerator().device_name()) + h.pin_device_tensor(gds_buffer) + + file_offsets = [] + next_offset = 0 + for i in range(len(file_partitions)): + file_offsets.append(next_offset) + next_offset += file_partitions[i] * partition_unit_size + + ref_fd = open(ref_file, 'wb') + for i in range(len(file_partitions)): + src_buffer = torch.narrow(gds_buffer, 0, file_offsets[i], + file_partitions[i] * partition_unit_size).to(device='cpu') + + ref_fd.write(src_buffer.numpy().tobytes()) + ref_fd.flush() + + assert 1 == h.sync_pwrite(buffer=src_buffer, filename=aio_file, file_offset=file_offsets[i]) + + filecmp.clear_cache() + assert filecmp.cmp(ref_file, aio_file, shallow=False) + + ref_fd.close() + + h.unpin_device_tensor(gds_buffer) + + def test_offset_read(self, tmpdir, file_partitions): + partition_unit_size = BLOCK_SIZE + file_size = sum(file_partitions) * partition_unit_size + ref_file, _ = _do_ref_write(tmpdir, 0, file_size) + h = GDSBuilder().load().gds_handle(BLOCK_SIZE, QUEUE_DEPTH, True, True, IO_PARALLEL) + + gds_buffer = torch.empty(file_size, dtype=torch.uint8, device=get_accelerator().device_name()) + h.pin_device_tensor(gds_buffer) + + file_offsets = [] + next_offset = 0 + for i in range(len(file_partitions)): + file_offsets.append(next_offset) + next_offset += file_partitions[i] * partition_unit_size + + with open(ref_file, 'rb') as ref_fd: + for i in range(len(file_partitions)): + ref_fd.seek(file_offsets[i]) + bytes_to_read = file_partitions[i] * partition_unit_size + ref_buf = list(ref_fd.read(bytes_to_read)) + + dst_tensor = torch.narrow(gds_buffer, 0, 0, bytes_to_read) + assert 1 == h.sync_pread(dst_tensor, ref_file, file_offsets[i]) + assert dst_tensor.tolist() == ref_buf + + h.unpin_device_tensor(gds_buffer) From 877aa0dba673c2aa2157029c28363b804d6ee03d Mon Sep 17 00:00:00 2001 From: Logan Adams <114770087+loadams@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:50:02 -0800 Subject: [PATCH 10/13] Update path for BingBertSquad from DeepSpeedExamples (#6746) In https://github.com/microsoft/DeepSpeedExamples/pull/245, the DeepSpeedExamples directory structure was refactored, this updates the DeepSpeed examples from those changes. --- docs/_tutorials/bert-finetuning.md | 4 ++-- docs/_tutorials/onebit-adam.md | 4 ++-- tests/model/BingBertSquad/run_BingBertSquad.sh | 2 +- tests/model/BingBertSquad/run_BingBertSquad_sanity.sh | 2 +- tests/model/BingBertSquad/run_tests.sh | 2 +- tests/model/BingBertSquad/test_e2e_squad.py | 4 ++-- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/_tutorials/bert-finetuning.md b/docs/_tutorials/bert-finetuning.md index 3014be18d682..f833acebde9a 100755 --- a/docs/_tutorials/bert-finetuning.md +++ b/docs/_tutorials/bert-finetuning.md @@ -10,14 +10,14 @@ In this tutorial we will be adding DeepSpeed to the BingBert model for the SQuAD If you don't already have a copy of the DeepSpeed repository, please clone in now and checkout the DeepSpeedExamples submodule the contains the BingBertSquad -example (DeepSpeedExamples/BingBertSquad) we will be going over in the rest of +example (DeepSpeedExamples/training/BingBertSquad) we will be going over in the rest of this tutorial. ```shell git clone https://github.com/microsoft/DeepSpeed cd DeepSpeed git submodule update --init --recursive -cd DeepSpeedExamples/BingBertSquad +cd DeepSpeedExamples/training/BingBertSquad ``` ### Pre-requisites diff --git a/docs/_tutorials/onebit-adam.md b/docs/_tutorials/onebit-adam.md index b1a8b5369761..e66bba3f818b 100644 --- a/docs/_tutorials/onebit-adam.md +++ b/docs/_tutorials/onebit-adam.md @@ -136,7 +136,7 @@ You can also use a pre-trained BERT model checkpoint from either DeepSpeed, [Hug ### 2.1 Running BingBertSQuAD with DeepSpeed and 1-bit Adam -We provide example scripts under [DeepSpeedExamples/BingBertSquad/1-bit_adam/](https://github.com/microsoft/DeepSpeedExamples/tree/master/BingBertSquad/1-bit_adam). There are 3 sets of scripts corresponding to NCCL-based implementation, MPI-based implementation on Ethernet systems, and MPI-based implementation on InfiniBand systems. For MPI-based implementation, we provide both example scripts when launching with deepspeed or mpirun. +We provide example scripts under [DeepSpeedExamples/training/BingBertSquad/1-bit_adam/](https://github.com/microsoft/DeepSpeedExamples/tree/master/training/BingBertSquad/1-bit_adam). There are 3 sets of scripts corresponding to NCCL-based implementation, MPI-based implementation on Ethernet systems, and MPI-based implementation on InfiniBand systems. For MPI-based implementation, we provide both example scripts when launching with deepspeed or mpirun.