Skip to content

Commit

Permalink
Merge branch 'master' into torch_compile_micro_offset_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tohtana authored Sep 12, 2024
2 parents 9c5bd48 + 170b46e commit ebdfc12
Show file tree
Hide file tree
Showing 41 changed files with 420 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ repos:
name: check-torchcuda
entry: ./scripts/check-torchcuda.py
language: python
exclude: ^(.github/workflows/|scripts/check-torchcuda.py|docs/_tutorials/accelerator-abstraction-interface.md|accelerator/cuda_accelerator.py|deepspeed/inference/engine.py|deepspeed/model_implementations/transformers/clip_encoder.py|deepspeed/model_implementations/diffusers/vae.py|deepspeed/model_implementations/diffusers/unet.py|op_builder/spatial_inference.py|op_builder/transformer_inference.py|op_builder/builder.py|setup.py|tests/unit/ops/sparse_attention/test_sparse_attention.py)
exclude: ^(.github/workflows/|scripts/check-torchcuda.py|docs/_tutorials/accelerator-abstraction-interface.md|docs/_tutorials/deepnvme.md|accelerator/cuda_accelerator.py|deepspeed/inference/engine.py|deepspeed/model_implementations/transformers/clip_encoder.py|deepspeed/model_implementations/diffusers/vae.py|deepspeed/model_implementations/diffusers/unet.py|op_builder/spatial_inference.py|op_builder/transformer_inference.py|op_builder/builder.py|setup.py|tests/unit/ops/sparse_attention/test_sparse_attention.py)
# Specific deepspeed/ files are excluded for now until we wrap ProcessGroup in deepspeed.comm

- repo: local
Expand Down
10 changes: 10 additions & 0 deletions accelerator/hpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self):
self._name = 'hpu'
self._communication_backend_name = 'hccl'
self._compile_backend = "hpu_backend"
self.apply_hpu_workarounds()
try:
import habana_frameworks.torch.hpu as hpu
hpu.setDeterministic(True)
Expand All @@ -28,6 +29,15 @@ def __init__(self):

self.fp16_supported = None

def apply_hpu_workarounds(self):

def update_wa_env_var(key, value):
if key not in os.environ.keys():
os.environ[key] = value

update_wa_env_var("PT_HPU_LAZY_ACC_PAR_MODE", "0")
update_wa_env_var("PT_HPU_ENABLE_REFINE_DYNAMIC_SHAPES", "0")

# Device APIs
def is_synchronized_device(self):
return False
Expand Down
5 changes: 4 additions & 1 deletion bin/ds_bench
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import sys
required_env = ["RANK", "WORLD_SIZE", "MASTER_ADDR", "MASTER_PORT", "LOCAL_RANK"]
if not all(map(lambda v: v in os.environ, required_env)):
import subprocess
subprocess.run("deepspeed $(which ds_bench) " + " ".join(sys.argv[1:]), shell=True)
r = subprocess.check_output(["which", "ds_bench"])
ds_bench_bin = r.decode('utf-8').strip()
safe_cmd = ["deepspeed", ds_bench_bin] + sys.argv[1:]
subprocess.run(safe_cmd)
else:
args = benchmark_parser().parse_args()
rank = args.local_rank
Expand Down
2 changes: 1 addition & 1 deletion blogs/deepspeed-gds/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ this problem, DeepSpeed has created a suite of I/O optimizations collectively ca

DeepNVMe improves the performance and efficiency of I/O-bound DL applications by accelerating I/O operations
and reducing hardware requirements. It achieves this by leveraging storage innovations such as Non-Volatile
Memory Express (NVMe) Solid Storage Devices (SSDs) and NVIDIA Magnum IO<sup>TM</sup> GPUDirect® Storage (GDS). In this
Memory Express (NVMe) Solid State Drives (SSDs) and NVIDIA Magnum IO<sup>TM</sup> GPUDirect® Storage (GDS). In this
blog we show the benefits of DeepNVMe using microbenchmarks and an inference application. In experiments
conducted on an Azure NC96ads\_A100\_v4 VM, we observed that DeepNVMe saturates available NVMe bandwidth for
data transfers with GPU or CPU memory, achieving up to 10GB/sec reads and 5 GB/secs writes.
Expand Down
5 changes: 3 additions & 2 deletions csrc/aio/py_test/ds_aio_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Functionality of swapping tensors to/from (NVMe) storage devices.
"""
import subprocess
import shlex


class Job(object):
Expand Down Expand Up @@ -39,10 +40,10 @@ def close_output_file(self):


def run_job(job):
args = ' '.join(job.cmd())
args = shlex.split(' '.join(job.cmd()))
print(f'args = {args}')
job.open_output_file()
proc = subprocess.run(args=args, shell=True, stdout=job.get_stdout(), stderr=job.get_stderr(), cwd=job.get_cwd())
proc = subprocess.run(args=args, stdout=job.get_stdout(), stderr=job.get_stderr(), cwd=job.get_cwd())
job.close_output_file()
assert proc.returncode == 0, \
f"This command failed: {job.cmd()}"
2 changes: 2 additions & 0 deletions csrc/fp_quantizer/fp_quantize.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#include <cuda_fp16.h>
#include <curand_kernel.h>

#ifdef BF16_AVAILABLE
#include <cuda_bf16.h>
#endif
#include <cuda_runtime_api.h>

using ROp = reduce::ROpType;
Expand Down
2 changes: 2 additions & 0 deletions csrc/fp_quantizer/includes/fp_quantize.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

#include <cuda_fp16.h>

#ifdef BF16_AVAILABLE
#include <cuda_bf16.h>
#endif
#include <cuda_runtime_api.h>
#include <stdio.h>

Expand Down
5 changes: 3 additions & 2 deletions deepspeed/comm/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,9 @@ def mpi_discovery(distributed_port=TORCH_DISTRIBUTED_DEFAULT_PORT, verbose=True)

master_addr = None
if rank == 0:
hostname_cmd = ["hostname -I"]
result = subprocess.check_output(hostname_cmd, shell=True)
import shlex
hostname_cmd = shlex.split("hostname -I")
result = subprocess.check_output(hostname_cmd)
master_addr = result.decode('utf-8').split()[0]
master_addr = comm.bcast(master_addr, root=0)

Expand Down
4 changes: 3 additions & 1 deletion deepspeed/elasticity/elastic_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def _set_master_addr_port(store: Store,

if master_addr is None:
# master_addr = _get_fq_hostname()
result = subprocess.check_output("hostname -I", shell=True)
import shlex
safe_cmd = shlex.split("hostname -I")
result = subprocess.check_output(safe_cmd)
master_addr = result.decode('utf-8').split()[0]

store.set("MASTER_ADDR", master_addr.encode(encoding="UTF-8"))
Expand Down
4 changes: 2 additions & 2 deletions deepspeed/launcher/multinode_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def get_cmd(self, environment, active_resources):
"""Return the command to execute on node"""

def add_export(self, key, var):
self.exports[key.strip()] = var.strip()
self.exports[key.strip()] = f"\"{var.strip()}\""

def parse_user_args(self):
return self.args.user_args
Expand Down Expand Up @@ -406,7 +406,7 @@ def backend_exists(self):
if not mpiname_exists:
warnings.warn("mpiname does not exist, mvapich is not installed properly")
else:
results = subprocess.check_output('mpiname', shell=True)
results = subprocess.check_output(['mpiname'])
mpiname_results = results.decode('utf-8').strip()
if "MVAPICH2-GDR" in mpiname_results:
exists = True
Expand Down
8 changes: 5 additions & 3 deletions deepspeed/launcher/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from copy import deepcopy
import signal
import time
import shlex

from .multinode_runner import PDSHRunner, OpenMPIRunner, MVAPICHRunner, SlurmRunner, MPICHRunner, IMPIRunner
from .constants import PDSH_LAUNCHER, OPENMPI_LAUNCHER, MVAPICH_LAUNCHER, SLURM_LAUNCHER, MPICH_LAUNCHER, IMPI_LAUNCHER
Expand Down Expand Up @@ -445,7 +446,8 @@ def main(args=None):
if args.ssh_port is not None:
ssh_check_cmd += f"-p {args.ssh_port} "
ssh_check_cmd += f"{first_host} hostname"
subprocess.check_call(ssh_check_cmd, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, shell=True)
safe_ssh_cmd = shlex.split(ssh_check_cmd)
subprocess.check_call(safe_ssh_cmd, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
except subprocess.CalledProcessError:
raise RuntimeError(
f"Using hostfile at {args.hostfile} but host={first_host} was not reachable via ssh. If you are running with a single node please remove {args.hostfile} or setup passwordless ssh."
Expand All @@ -458,9 +460,9 @@ def main(args=None):
if args.ssh_port is not None:
ssh_check_cmd += f" -p {args.ssh_port}"
ssh_check_cmd += f" {first_host} hostname -I"
hostname_cmd = [ssh_check_cmd]
hostname_cmd = shlex.split(ssh_check_cmd)
try:
result = subprocess.check_output(hostname_cmd, shell=True)
result = subprocess.check_output(hostname_cmd)
except subprocess.CalledProcessError as err:
logger.error(
"Unable to detect suitable master address via `hostname -I`, please manually specify one via --master_addr"
Expand Down
14 changes: 9 additions & 5 deletions deepspeed/ops/transformer/inference/triton/matmul_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ class TritonCacheDir:
_warning_printed = False

@staticmethod
def default_cache_dir():
tmp_path = os.path.join(Path.home(), ".triton", "autotune")
if is_nfs_path(tmp_path) and not TritonCacheDir._warning_printed:
def warn_if_nfs(cache_dir):
if is_nfs_path(cache_dir) and not TritonCacheDir._warning_printed:
print(
f"Warning: The default cache directory for DeepSpeed Triton autotune, {tmp_path}, appears to be on an NFS system. While this is generally acceptable, if you experience slowdowns or hanging when DeepSpeed exits, it is recommended to set the TRITON_CACHE_DIR environment variable to a non-NFS path."
f"Warning: The cache directory for DeepSpeed Triton autotune, {cache_dir}, appears to be on an NFS system. While this is generally acceptable, if you experience slowdowns or hanging when DeepSpeed exits, it is recommended to set the TRITON_CACHE_DIR environment variable to a non-NFS path."
)
TritonCacheDir._warning_printed = True
return

@staticmethod
def default_cache_dir():
tmp_path = os.path.join(Path.home(), ".triton", "autotune")
return tmp_path


Expand Down Expand Up @@ -80,9 +84,9 @@ def __init__(self, key):
self.lock_path = None
# if caching is enabled, get the lock and bin path
self.cache_dir = os.environ.get('TRITON_CACHE_DIR', TritonCacheDir.default_cache_dir())
TritonCacheDir.warn_if_nfs(self.cache_dir)
if self.cache_dir:
os.makedirs(self.cache_dir, exist_ok=True)
if self.cache_dir:
self.file_path = os.path.join(self.cache_dir, self.key + ".pickle")
self.lock_path = self.file_path + ".lock"

Expand Down
6 changes: 4 additions & 2 deletions deepspeed/profiling/flops_profiler/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from deepspeed.utils import logger
from deepspeed.moe.layer import MoE
from deepspeed.utils.timer import FORWARD_GLOBAL_TIMER, BACKWARD_GLOBAL_TIMER, STEP_GLOBAL_TIMER
from deepspeed.utils.torch import required_torch_version

Tensor = torch.Tensor

Expand Down Expand Up @@ -908,8 +909,9 @@ def _patch_functionals():
# embedding
F.embedding = wrapFunc(F.embedding, _embedding_flops_compute)

# attn
F.scaled_dot_product_attention = wrapFunc(F.scaled_dot_product_attention, _attn_flops_compute)
# attn - scaled_dot_product_attention added in torch 2.0+
if required_torch_version(min_version=2.0):
F.scaled_dot_product_attention = wrapFunc(F.scaled_dot_product_attention, _attn_flops_compute)


def _patch_tensor_methods():
Expand Down
37 changes: 12 additions & 25 deletions deepspeed/runtime/bf16_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ def _setup_for_real_optimizer(self):

see_memory_usage(f'after initializing group {i}', force=True)

see_memory_usage('before initialize_optimizer', force=True)
self.initialize_optimizer_states()
see_memory_usage('end initialize_optimizer', force=True)

self._grad_acc_hooks = []
if self.immediate_grad_update:
self.create_grad_acc_hooks()
Expand Down Expand Up @@ -252,25 +248,6 @@ def _lazy_init_hp_params_optimizer_state(self):
self.optimizer.state)
self._hp_optimizer_states_linked = True

def initialize_optimizer_states(self):
"""Take an optimizer step with zero-valued gradients to allocate internal
optimizer state.
This helps prevent memory fragmentation by allocating optimizer state at the
beginning of training instead of after activations have been allocated.
"""
for param_partition, grad_partition in zip(self.fp32_groups_flat_partition,
self.fp32_groups_gradient_flat_partition):
# In case of grad acc dtype different than FP32, need to cast to high precision.
param_partition.grad = grad_partition.to(
param_partition.dtype) if grad_partition.dtype != param_partition.dtype else grad_partition

if self.grad_acc_dtype is not torch.float32:
for param_partition in self.fp32_groups_flat_partition:
param_partition.grad = None

self.clear_hp_grads()

def _split_flat_tensor(self, flat_tensor, num_elem_list):
assert sum(num_elem_list) <= flat_tensor.numel()
tensor_list = []
Expand Down Expand Up @@ -317,8 +294,18 @@ def step(self, closure=None):
mpu=self.mpu,
use_graph=self.graph_harvesting)

for param_partition, grad_partition in zip(self.fp32_groups_flat_partition,
self.fp32_groups_gradient_flat_partition):
# In case of grad acc dtype different than FP32, need to cast to high precision.
param_partition.grad = grad_partition.to(
param_partition.dtype) if grad_partition.dtype != param_partition.dtype else grad_partition

self.optimizer.step()

if self.grad_acc_dtype is not torch.float32:
for param_partition in self.fp32_groups_flat_partition:
param_partition.grad = None

# We need to link optimizer state after the first step() call
self._lazy_init_hp_params_optimizer_state()

Expand Down Expand Up @@ -495,7 +482,7 @@ def refresh_fp32_params(self):

def load_state_dict(self,
state_dict_list,
checkpoint_folder,
checkpoint_folder=None,
load_optimizer_states=True,
load_from_fp32_weights=False,
load_serial=None,
Expand Down Expand Up @@ -547,7 +534,7 @@ def state(self):

def accumulate_hp_grads_and_remove_lp(self, lp_param, group_idx, param_idx):
assert self.immediate_grad_update
self._update_hp_grad(lp_param, group_idx, param_idx, clear_lp_grads=True)
self._update_hp_grad(lp_param, group_idx, param_idx, clear_lp_grads=False)

def create_grad_acc_hooks(self):
self.grad_accs = []
Expand Down
4 changes: 1 addition & 3 deletions deepspeed/runtime/pipe/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,7 @@ def eval_batch(self,
micro_batches = self.micro_batches if num_micro_batches is None else num_micro_batches

# Do the work
sched = schedule.InferenceSchedule(micro_batches=self.micro_batches,
stages=self.num_stages,
stage_id=self.stage_id)
sched = schedule.InferenceSchedule(micro_batches=micro_batches, stages=self.num_stages, stage_id=self.stage_id)

# prevent dead-lock with multiple evals sequence
dist.barrier()
Expand Down
28 changes: 18 additions & 10 deletions deepspeed/runtime/zero/partition_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def new_tensor(cls, *args, **kwargs) -> Tensor:


# https://stackoverflow.com/a/63851681/9201239
def get_all_subclasses(cls):
def get_all_subclasses(cls, include_root=True):
subclass_list = []

def recurse(cl):
Expand All @@ -272,7 +272,10 @@ def recurse(cl):

recurse(cls)

return set(subclass_list)
ret = set(subclass_list)
if include_root:
ret.add(cls)
return ret


@instrument_w_nvtx
Expand Down Expand Up @@ -465,11 +468,13 @@ def wrapper(*args, **kwargs):
return wrapper

def _enable_class_apply(cls):
cls._old_apply_of_skip_init_hook = cls._apply
cls._apply = partition_after_empty_init(cls._apply)
if '_apply' in cls.__dict__:
cls._old_apply_of_skip_init_hook = cls._apply
cls._apply = partition_after_empty_init(cls._apply)

def _disable_class_apply(cls):
cls._apply = cls._old_apply_of_skip_init_hook
if hasattr(cls, '_old_apply_of_skip_init_hook'):
cls._apply = cls._old_apply_of_skip_init_hook

# add hooks for to_empty: apply_(empty_like)
for subclass in get_all_subclasses(torch.nn.modules.module.Module):
Expand Down Expand Up @@ -522,12 +527,14 @@ def wrapper(module, *args, **kwargs):
return wrapper

def _enable_class(cls):
cls._old_init = cls.__init__
cls.__init__ = partition_after(cls.__init__)
if '__init__' in cls.__dict__:
cls._old_init = cls.__init__
cls.__init__ = partition_after(cls.__init__)

def _init_subclass(cls, **kwargs):
cls._old_init = cls.__init__
cls.__init__ = partition_after(cls.__init__)
if '__init__' in cls.__dict__:
cls._old_init = cls.__init__
cls.__init__ = partition_after(cls.__init__)

# Replace .__init__() for all existing subclasses of torch.nn.Module recursively
for subclass in get_all_subclasses(torch.nn.modules.module.Module):
Expand Down Expand Up @@ -567,7 +574,8 @@ def unpatch_init_and_builtins(self):
if self.patched:

def _disable_class(cls):
cls.__init__ = cls._old_init
if hasattr(cls, '_old_init'):
cls.__init__ = cls._old_init

for subclass in get_all_subclasses(torch.nn.modules.module.Module):
_disable_class(subclass)
Expand Down
4 changes: 2 additions & 2 deletions deepspeed/utils/numa.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def check_for_numactl_pkg():
flag, lib, tool = data
path = distutils.spawn.find_executable(pkgmgr)
if path is not None:
cmd = f"{pkgmgr} {flag} {lib}"
result = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
cmd = [pkgmgr, flag, lib]
result = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.wait() == 0:
found = True
else:
Expand Down
Loading

0 comments on commit ebdfc12

Please sign in to comment.