diff --git a/.github/workflows/nv-inference.yml b/.github/workflows/nv-inference.yml index cc55136d1a4b..0cbe212666de 100644 --- a/.github/workflows/nv-inference.yml +++ b/.github/workflows/nv-inference.yml @@ -32,7 +32,7 @@ jobs: - name: Install pytorch run: | - pip install -U --cache-dir $TORCH_CACHE torch==1.13.1 torchvision --extra-index-url https://download.pytorch.org/whl/cu116 + pip install -U --cache-dir $TORCH_CACHE torch==2.1.2 torchvision==0.16.2 --extra-index-url https://download.pytorch.org/whl/cu118 python -c "import torch; print('torch:', torch.__version__, torch)" python -c "import torch; print('CUDA available:', torch.cuda.is_available())" @@ -57,6 +57,6 @@ jobs: run: | unset TORCH_CUDA_ARCH_LIST # only jit compile for current arch cd tests - #pytest $PYTEST_OPTS -m 'seq_inference' unit/ --torch_ver="1.13" --cuda_ver="11.6" - pytest $PYTEST_OPTS -m 'inference_ops' unit/ --torch_ver="1.13" --cuda_ver="11.6" - pytest $PYTEST_OPTS --forked -n 4 -m 'inference' unit/ --torch_ver="1.13" --cuda_ver="11.6" + #pytest $PYTEST_OPTS -m 'seq_inference' unit/ --torch_ver="2.1" --cuda_ver="11.8" + pytest $PYTEST_OPTS -m 'inference_ops' unit/ --torch_ver="2.1" --cuda_ver="11.8" + pytest $PYTEST_OPTS --forked -n 4 -m 'inference' unit/ --torch_ver="2.1" --cuda_ver="11.8" diff --git a/accelerator/cuda_accelerator.py b/accelerator/cuda_accelerator.py index 2030f36631e9..3d5e9c168c16 100644 --- a/accelerator/cuda_accelerator.py +++ b/accelerator/cuda_accelerator.py @@ -3,6 +3,7 @@ # DeepSpeed Team +import functools import os import pkgutil import importlib @@ -260,31 +261,31 @@ def replay_graph(self, graph): @property def BFloat16Tensor(self): - return torch.cuda.BFloat16Tensor + return functools.partial(torch.tensor, dtype=torch.bfloat16, device='cuda') @property def ByteTensor(self): - return torch.cuda.ByteTensor + return functools.partial(torch.tensor, dtype=torch.uint8, device='cuda') @property def DoubleTensor(self): - return torch.cuda.DoubleTensor + return functools.partial(torch.tensor, dtype=torch.double, device='cuda') @property def FloatTensor(self): - return torch.cuda.FloatTensor + return functools.partial(torch.tensor, dtype=torch.float, device='cuda') @property def HalfTensor(self): - return torch.cuda.HalfTensor + return functools.partial(torch.tensor, dtype=torch.half, device='cuda') @property def IntTensor(self): - return torch.cuda.IntTensor + return functools.partial(torch.tensor, dtype=torch.int, device='cuda') @property def LongTensor(self): - return torch.cuda.LongTensor + return functools.partial(torch.tensor, dtype=torch.long, device='cuda') def pin_memory(self, tensor, align_bytes=1): return tensor.pin_memory() diff --git a/deepspeed/inference/v2/checkpoint/huggingface_engine.py b/deepspeed/inference/v2/checkpoint/huggingface_engine.py index bec53a742a84..46a84c61f884 100644 --- a/deepspeed/inference/v2/checkpoint/huggingface_engine.py +++ b/deepspeed/inference/v2/checkpoint/huggingface_engine.py @@ -40,16 +40,13 @@ def _fetch_checkpoint_files(self): # currently coming from the ckpt engine init but maybe a catch all kwargs for other # snapshot download parameters would be more flexible. - # NOTE(jeff): allow_patterns here are explicitly not using safetensors or other - # checkpoint files that may be present. Example of all files in the llama-2-7b - # repo here: https://huggingface.co/meta-llama/Llama-2-7b-hf/tree/main - from huggingface_hub import snapshot_download, list_files_info + from huggingface_hub import snapshot_download, list_repo_tree def model_has_safetensors(model_name_or_path: str) -> bool: if os.path.isdir(model_name_or_path): file_list = os.listdir(model_name_or_path) else: - file_list = [rf.rfilename for rf in list_files_info(model_name_or_path)] + file_list = [rf.path for rf in list_repo_tree(model_name_or_path)] for f in file_list: if f.endswith(".safetensors"): return True diff --git a/deepspeed/inference/v2/kernels/core_ops/bias_activations/bias_activation.cu b/deepspeed/inference/v2/kernels/core_ops/bias_activations/bias_activation_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/core_ops/bias_activations/bias_activation.cu rename to deepspeed/inference/v2/kernels/core_ops/bias_activations/bias_activation_cuda.cu diff --git a/deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm.cu b/deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm.cu rename to deepspeed/inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm_cuda.cu diff --git a/deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm.cu b/deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm.cu rename to deepspeed/inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm_cuda.cu diff --git a/deepspeed/inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels.cu b/deepspeed/inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels.cu rename to deepspeed/inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/embed/embed.cu b/deepspeed/inference/v2/kernels/ragged_ops/embed/embed_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/embed/embed.cu rename to deepspeed/inference/v2/kernels/ragged_ops/embed/embed_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary.cu b/deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary.cu rename to deepspeed/inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/logits_gather/logits_gather.cu b/deepspeed/inference/v2/kernels/ragged_ops/logits_gather/logits_gather_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/logits_gather/logits_gather.cu rename to deepspeed/inference/v2/kernels/ragged_ops/logits_gather/logits_gather_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/moe_gather/moe_gather.cu b/deepspeed/inference/v2/kernels/ragged_ops/moe_gather/moe_gather_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/moe_gather/moe_gather.cu rename to deepspeed/inference/v2/kernels/ragged_ops/moe_gather/moe_gather_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter.cu b/deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter.cu rename to deepspeed/inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter_cuda.cu diff --git a/deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating.cu b/deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating_cuda.cu similarity index 100% rename from deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating.cu rename to deepspeed/inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating_cuda.cu diff --git a/deepspeed/profiling/flops_profiler/profiler.py b/deepspeed/profiling/flops_profiler/profiler.py index 79e682a73b90..de847e59e82e 100644 --- a/deepspeed/profiling/flops_profiler/profiler.py +++ b/deepspeed/profiling/flops_profiler/profiler.py @@ -827,6 +827,15 @@ def _elementwise_flops_compute(input, other): return flops, 0 +def _attn_flops_compute(q, k, v, *args, **kwargs): + """ + Count flops for the scaled_dot_product_attention operation. + """ + macs = _prod(q.shape) * k.shape[-2] + macs += _prod(q.shape[:-1]) * k.shape[-2] * v.shape[-1] + return 2 * macs, macs + + def wrapFunc(func, funcFlopCompute): oldFunc = func name = func.__str__ @@ -899,10 +908,14 @@ def _patch_functionals(): # embedding F.embedding = wrapFunc(F.embedding, _embedding_flops_compute) + # attn + F.scaled_dot_product_attention = wrapFunc(F.scaled_dot_product_attention, _attn_flops_compute) + def _patch_tensor_methods(): torch.matmul = wrapFunc(torch.matmul, _matmul_flops_compute) torch.Tensor.matmul = wrapFunc(torch.Tensor.matmul, _matmul_flops_compute) + torch.Tensor.__matmul__ = wrapFunc(torch.Tensor.__matmul__, _matmul_flops_compute) torch.mm = wrapFunc(torch.mm, _matmul_flops_compute) torch.Tensor.mm = wrapFunc(torch.Tensor.mm, _matmul_flops_compute) torch.bmm = wrapFunc(torch.bmm, _matmul_flops_compute) diff --git a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py index f8ce0e3fa2bf..7088df223bd8 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/data_analyzer.py @@ -13,8 +13,9 @@ from torch.utils.data import BatchSampler, SequentialSampler, DataLoader, Subset from deepspeed.utils import logger -from .indexed_dataset import MMapIndexedDataset, valid_dtypes -from .utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype +import deepspeed.comm as dist +from deepspeed.runtime.data_pipeline.data_sampling.indexed_dataset import MMapIndexedDataset, valid_dtypes +from deepspeed.runtime.data_pipeline.data_sampling.utils import split_dataset, split_index, create_mmap_dataset_builder, close_mmap_dataset_builder, find_fit_int_dtype class DataAnalyzer(object): @@ -97,16 +98,16 @@ def update_metric_results(self, metric_values = metric_function(data) assert torch.is_tensor(metric_values) or isinstance(metric_values, np.ndarray), \ - "metric_function must return a tensor or array" + "metric_function must return a tensor or array" assert metric_values.dtype == metric_dtype, \ - f"metric_function result dtype {metric_values.dtype} does not match metric_dtype {metric_dtype}" + f"metric_function result dtype {metric_values.dtype} does not match metric_dtype {metric_dtype}" if isinstance(metric_values, np.ndarray): metric_values = torch.from_numpy(metric_values) if metric_type == 'single_value_per_sample': for row in range(metric_values.size()[0]): sample_idx = batch_start_idx + row # sample idx following dataset iteration order - if 'index' in data: # Megatron use case, sample idx provided in 'index' field + if isinstance(data, dict) and 'index' in data: # Megatron use case, idx provided in 'index' field sample_idx = data['index'][row][0].item() elif self.sample_indices is not None: # user defined shuffling of indices sample_idx = self.sample_indices[sample_idx] @@ -178,7 +179,7 @@ def run_map_helper(self, thread_id): else: self.custom_map_update(data, self.metric_types, self.metric_dtypes, self.metric_functions, metric_results, batch_start_idx) - processed_sample += self.batch_size + processed_sample += len(data) duration = (time.time() - start) / 3600.0 remain_duration = duration * total_sample / processed_sample - duration logger.info( @@ -384,26 +385,10 @@ def merge_map_results(self, dataset, metric_names, metric_types, save_path, num_ index_to_metric_builder.merge_file_(chunk_im_fname) close_mmap_dataset_builder(index_to_sample_builder, index_to_sample_fname) close_mmap_dataset_builder(index_to_metric_builder, index_to_metric_fname) - num_sample_per_value = {} - index_to_sample = MMapIndexedDataset(index_to_sample_fname, skip_warmup=True) - index_to_metric = MMapIndexedDataset(index_to_metric_fname, skip_warmup=True) - index_to_sample_merged_fname = f"{metric_save_path}/{metric_name}_index_to_sample_percentile_merged" - index_to_sample_merged_builder = create_mmap_dataset_builder(index_to_sample_merged_fname, - sample_idx_dtype) - for v_idx in range(len(index_to_sample)): - if v_idx > 0: - assert index_to_metric[v_idx] > index_to_metric[v_idx - 1] - num_sample_per_value[index_to_metric[v_idx][0]] = len(index_to_sample[v_idx]) - assert sum(num_sample_per_value.values()) == total_num_samples - merge_step = max(1, len(index_to_sample) // 100) - for v_idx in range(0, len(index_to_sample), merge_step): - merged_samples = np.copy( - np.concatenate(index_to_sample[v_idx:min(len(index_to_sample), (v_idx + merge_step))], - axis=None)) - index_to_sample_merged_builder.add_item( - torch.tensor(merged_samples.astype(np.int64), dtype=torch.long)) - logger.info(f"Finished merging index_to_sample {v_idx} to {v_idx+merge_step}.") - close_mmap_dataset_builder(index_to_sample_merged_builder, index_to_sample_merged_fname) + + num_sample_per_value = DataAnalyzer.output_index_to_sample_percentile( + index_to_sample_fname, index_to_metric_fname, metric_name, metric_save_path, total_num_samples, + sample_idx_dtype) self.get_metric_value_percentiles(metric_name, num_sample_per_value, total_num_samples) elif metric_type == 'accumulate_value_over_samples': metric_save_path = f"{save_path}/{metric_name}/" @@ -425,6 +410,29 @@ def merge_map_results(self, dataset, metric_names, metric_types, save_path, num_ metric_value_builder.add_item(torch.tensor(metric_value.astype(np.int64), dtype=torch.long)) close_mmap_dataset_builder(metric_value_builder, metric_value_fname) + @staticmethod + def output_index_to_sample_percentile(index_to_sample_fname, index_to_metric_fname, metric_name, metric_save_path, + total_num_samples, sample_idx_dtype): + """ read index_to_metric and index_to_sample files and write distribution to index_to_sample_percentage_merged """ + num_sample_per_value = {} + index_to_sample = MMapIndexedDataset(index_to_sample_fname, skip_warmup=True) + index_to_metric = MMapIndexedDataset(index_to_metric_fname, skip_warmup=True) + index_to_sample_merged_fname = f"{metric_save_path}/{metric_name}_index_to_sample_percentile_merged" + index_to_sample_merged_builder = create_mmap_dataset_builder(index_to_sample_merged_fname, sample_idx_dtype) + for v_idx in range(len(index_to_sample)): + if v_idx > 0: + assert index_to_metric[v_idx] > index_to_metric[v_idx - 1] + num_sample_per_value[index_to_metric[v_idx][0]] = len(index_to_sample[v_idx]) + assert sum(list(num_sample_per_value.values())) == total_num_samples + merge_step = max(1, len(index_to_sample) // 100) + for v_idx in range(0, len(index_to_sample), merge_step): + merged_samples = np.copy( + np.concatenate(index_to_sample[v_idx:min(len(index_to_sample), (v_idx + merge_step))], axis=None)) + index_to_sample_merged_builder.add_item(torch.tensor(merged_samples.astype(np.int64), dtype=torch.long)) + logger.info(f"Finished merging index_to_sample {v_idx} to {v_idx+merge_step}.") + close_mmap_dataset_builder(index_to_sample_merged_builder, index_to_sample_merged_fname) + return num_sample_per_value + def run_reduce(self): if self.custom_reduce is None: self.merge_map_results(self.dataset, self.metric_names, self.metric_types, self.save_path, @@ -432,3 +440,389 @@ def run_reduce(self): else: self.custom_reduce(self.dataset, self.metric_names, self.metric_types, self.save_path, self.num_workers, self.num_threads, self.num_threads_reduce) + + def run_map_reduce(self, comm_group=None): + self.run_map() + # wait for the mapping operation, where all nodes outputs their own (partial) result files + dist.barrier(group=comm_group) + if self.worker_id == 0: + self.run_reduce() + # wait for the reduce, where rank 0 merges all (partial) files. Dataset can then be used by all nodes. + dist.barrier(group=comm_group) + + +class DistributedDataAnalyzer(object): + + def __init__( + self, + dataset, + num_workers=1, + worker_id=0, + batch_size=1, + metric_names=[], + metric_functions=[], + metric_types=[], + save_path="./", + collate_fn=None, + device='cuda', + comm_group=None, + sample_indices=None, + ) -> None: + self.dataset = dataset + self.batch_size = batch_size + self.metric_names = metric_names + self.metric_functions = metric_functions + self.metric_types = metric_types + self.save_path = save_path + self.collate_fn = collate_fn + self.device = device + self.sample_indices = sample_indices + + if not dist.is_initialized(): + dist.init_distributed() + + # comm_group and worker_id+num_workers are mutually exclusive + self.comm_group = comm_group + if self.comm_group is None: + # self.comm_group = deepspeed.utils.groups._clone_world_group() + self.num_workers = num_workers + self.worker_id = worker_id + else: + self.num_workers = self.comm_group.size() + self.worker_id = self.comm_group.rank() + + if self.worker_id == 0: + logger.info(f"Distributed data analyzer initialized with {self.num_workers} workers.") + + def run_map_reduce(self): + + # setup individual dataloaders + worker_splits, _ = split_dataset(self.dataset, self.num_workers, self.worker_id, num_threads=1) + start_idx, end_idx = worker_splits[self.worker_id] + logger.info(f"worker {self.worker_id}: start working on data subset {start_idx} to {end_idx}") + worker_dataset = Subset(self.dataset, list(range(start_idx, end_idx))) + sampler = BatchSampler(SequentialSampler(worker_dataset), batch_size=self.batch_size, drop_last=False) + dataloader = DataLoader(dataset=worker_dataset, + batch_sampler=sampler, + num_workers=0, + collate_fn=self.collate_fn, + pin_memory=False) + + # set initial results list + metric_results = [] + for metric_type in self.metric_types: + assert metric_type in ['single_value_per_sample', 'accumulate_value_over_samples'], \ + f"metric_type {metric_type} not implemented." + metric_results.append([] if metric_type == 'single_value_per_sample' else None) + + # iterate dataloader and store metric results + batch_start_idx = start_idx + for data in dataloader: + for m_idx in range(len(self.metric_names)): + metric_type, metric_function = self.metric_types[m_idx], self.metric_functions[m_idx] + metric_values = metric_function(data) + assert torch.is_tensor(metric_values) or isinstance(metric_values, np.ndarray), \ + "metric_function must return a tensor or array" + if isinstance(metric_values, np.ndarray): + metric_values = torch.from_numpy(metric_values) + assert metric_values.dtype in valid_dtypes, \ + f"metric_function result dtype {metric_values.dtype} not supported. Supported dtypes {valid_dtypes}" + + if metric_type == 'single_value_per_sample': + for row in range(metric_values.size()[0]): + value = metric_values[row].item() + sample_idx = batch_start_idx + row # sample idx following dataset iteration order + if isinstance(data, dict) and 'index' in data: # Megatron use case + sample_idx = data['index'][row][0].item() + elif self.sample_indices is not None: # user defined shuffling of indices + sample_idx = self.sample_indices[sample_idx] + metric_results[m_idx].append((value, sample_idx)) + elif metric_type == 'accumulate_value_over_samples': + if metric_results[m_idx] is None: + metric_results[m_idx] = metric_values + else: + metric_results[m_idx].add_(metric_values) + batch_start_idx += len(data) + + # compute dtype for sample ids + total_num_samples = len(self.dataset) + sample_idx_dtype = find_fit_int_dtype(0, total_num_samples - 1) + logger.info(f"Total number of data samples: {total_num_samples}.") + logger.info(f"Will use {sample_idx_dtype} to store the sample indexes.") + + # convert to list of tensors + metric_results = [torch.tensor(m).to(self.device) for m in metric_results] + + for m_idx in range(len(self.metric_names)): + metric_values, metric_name, metric_type = \ + metric_results[m_idx], self.metric_names[m_idx], self.metric_types[m_idx] + metric_save_path = f"{self.save_path}/{metric_name}/" + os.makedirs(metric_save_path, exist_ok=True) + + if metric_type == 'single_value_per_sample': + + # Compute sample and metric value dtypes based on range + values, samples = metric_values[:, 0], metric_values[:, 1] + value_min, value_max = Dist.min_max(values, self.comm_group) + sample_min, sample_max = Dist.min_max(samples, self.comm_group) + metric_value_dtype = find_fit_int_dtype(value_min, value_max) + sample_value_dtype = find_fit_int_dtype(sample_min, sample_max) + + # sample_to_metric maps sample ids to metric values, as a list of metric values + sample_to_metric_fname = f"{metric_save_path}/{metric_name}_sample_to_metric" + values = [torch.tensor([x]) for x in metric_values[:, 0]] + self.file_write_ordered(values, sample_to_metric_fname, metric_value_dtype) + + # distributed sorting by values, gives an ordered disjoint subset of keys on nodes + metric_values = Dist.sample_sort(metric_values, self.comm_group, self.num_workers) + metric_to_samples_dict = {} + if len(metric_values) > 0: + for value, sample in metric_values: + if value.item() not in metric_to_samples_dict: + metric_to_samples_dict[value.item()] = [] + metric_to_samples_dict[value.item()].append(sample.item()) + + # index_to_metric and index_to_sample serialize a dicitonary from metric to samples + # index_to_metric stores a key per row, index_to_sample stores the values per row + values = [torch.tensor([x]) for x in metric_to_samples_dict.keys()] + samples = [torch.tensor(metric_to_samples_dict[x]) for x in metric_to_samples_dict.keys()] + index_to_metric_fname = f"{metric_save_path}/{metric_name}_index_to_metric" #dict keys + index_to_sample_fname = f"{metric_save_path}/{metric_name}_index_to_sample" #dict values + self.file_write_ordered(values, index_to_metric_fname, metric_value_dtype) + self.file_write_ordered(samples, index_to_sample_fname, sample_value_dtype) + + if self.worker_id == 0: + DataAnalyzer.output_index_to_sample_percentile(index_to_sample_fname, index_to_metric_fname, + metric_name, metric_save_path, total_num_samples, + sample_idx_dtype) + dist.barrier(self.comm_group) + + elif metric_type == 'accumulate_value_over_samples': + metric_value_fname = f"{metric_save_path}/{metric_name}_metric_value" + dist.reduce(metric_values, dst=0, op=dist.ReduceOp.SUM, group=self.comm_group) + metric_value_dtype = find_fit_int_dtype(metric_values.min(), metric_values.max()) + + if self.worker_id == 0: + builder = create_mmap_dataset_builder(metric_value_fname, metric_value_dtype) + builder.add_item(metric_values.cpu()) + close_mmap_dataset_builder(builder, metric_value_fname) + dist.barrier(self.comm_group) + + def file_write_ordered(self, tensor_list, fname, numpy_dtype): + """ MPI_file_write_ordered extended to write a list of tensors, by one rank, iteratively """ + + # each not has a list of rows (tensors) to be written to the file. + # we will serialize it to communicate it in one comm step. + + tkwargs = dict(dtype=torch.int64, device=self.device) + + # 1. gather on rank 0 the number of rows to be sent/recv + row_count = torch.tensor([len(tensor_list)], **tkwargs) + row_counts = torch.zeros(self.num_workers, **tkwargs) + dist.all_gather_into_tensor(row_counts, row_count, group=self.comm_group) + assert row_counts[self.worker_id] == row_count == len(tensor_list), "all_gather failed" + + # 2. gather on rank 0 the sizes of the rows to be sent/recv + row_len = torch.tensor([len(l) for l in tensor_list], **tkwargs) + row_lens = Dist.gather_v(row_len, 0, self.comm_group, self.num_workers, self.worker_id) + + # 4. gather on rank 0 of the total size (sum of all row lengths) to be received + size = torch.tensor([sum(row_len).item()], **tkwargs) + sizes = torch.zeros(self.num_workers, **tkwargs) + dist.all_gather_into_tensor(sizes, size, group=self.comm_group) + assert sizes[self.worker_id] == size.item(), "all_gather did not return the same sizes" #sanity check + + # method to deserializes a buffer into rows of different lengths and write them to file + def write_buffer_to_file(buff, src, builder): + assert self.worker_id == 0, "only rank 0 can write to file" + + # # write one buffer at a time + # for row_len in row_lens[src]: + # builder.add_item(buff[:row_len].cpu()) + # buff = buff[row_len:] + + # collect all buffers and write them all at once + buffer_list = [] + for row_len in row_lens[src]: + buffer_list.append(buff[:row_len].cpu()) + buff = buff[row_len:] + builder.add_items(buffer_list) + + # 5. rank 0 prepares output folder and file + if self.worker_id == 0: + os.makedirs(os.path.dirname(fname), exist_ok=True) + builder = create_mmap_dataset_builder(fname, numpy_dtype) + + # iterate through ranks that have data to be sent/recv/written + for src in [rank for rank, count in enumerate(row_counts) if count > 0]: + + dist.barrier(group=self.comm_group) + if self.worker_id == 0 and src == 0: # rank 0's write its own data + buffer = torch.cat(tensor_list, dim=0).to(self.device) + write_buffer_to_file(buffer, 0, builder) + elif self.worker_id == 0 and src > 0: # rank 0 receives other rank's data and writes it + buffer = torch.empty(sizes[src].item(), dtype=buffer.dtype, device=buffer.device) + err = dist.recv(buffer, src=src, group=self.comm_group, tag=src) + assert err == src and len(buffer) > 0, "recv failed" + write_buffer_to_file(buffer, src, builder) + elif self.worker_id == src: # current rank sends data to rank 0 + buffer = torch.cat(tensor_list, dim=0).to(self.device) + dist.send(buffer, 0, group=self.comm_group, tag=src) + + # rank 0 closes the file + if self.worker_id == 0: + close_mmap_dataset_builder(builder, fname) # close file + dist.barrier(self.comm_group) + + +class Dist: + """ auxiliary class to perform distributed operations on tensors""" + + @staticmethod + def min_max(tensor, comm_group): + """ given a distributed tensor, return the min/max values across all ranks""" + + value_min, value_max = tensor.min(), tensor.max() + dist.reduce(value_min, 0, op=dist.ReduceOp.MIN, group=comm_group) + dist.reduce(value_max, 0, op=dist.ReduceOp.MAX, group=comm_group) + return value_min.item(), value_max.item() + + @staticmethod + def gather_v(tensor, dst, comm_group, num_workers, worker_id): + """ MPI_Gatherv. gather tensors of variable sizes in a single rank """ + + # gather the number of rows to be sent/recv + size = torch.tensor([len(tensor)], dtype=torch.int64, device=tensor.device) + sizes = torch.zeros(num_workers, dtype=torch.int64, device=tensor.device) + dist.all_gather_into_tensor(sizes, size, group=comm_group) + assert sizes[worker_id] == size, "all_gather failed" + + # all_gather requires all tensors to be of same size so we need to pad them + max_size = max(sizes).item() + buffer = torch.empty(max_size, dtype=tensor.dtype, device=tensor.device) + buffer[0:size] = torch.tensor(tensor, dtype=tensor.dtype, device=tensor.device) + buffer_list = None + if worker_id == 0: # create padded recv buffers + buffer_list = [torch.empty(max_size, dtype=tensor.dtype, device=tensor.device) for _ in range(num_workers)] + dist.gather(buffer, buffer_list, dst=dst, group=comm_group) + + # revert padding and return value + if worker_id == 0: + buffer_list = [r[:s.item()] for r, s in zip(buffer_list, sizes)] + return buffer_list + + @staticmethod + def sample_sort(tensor, comm_group, num_workers, n_samples=100): + """ perform a distributed random sort of a tensor, and returns the sorted partial tensor""" + device, dims = tensor.device, tensor.size()[1] + + # 1 - sort rows by first column, then second column, then third, etc... + tensor = torch.tensor(sorted(tensor.tolist()), dtype=tensor.dtype, device=tensor.device) + + # 2 - collect few samples per rank + idx = torch.round(torch.linspace(0, len(tensor) - 1, n_samples)).to(int) + samples = tensor[idx][:, 0].contiguous().to(device) #only first column, all but last row + + # 2 - Allgather samples + all_samples = [torch.zeros(n_samples, dtype=samples.dtype, device=device) for _ in range(num_workers)] + dist.all_gather(all_samples, samples, group=comm_group) + all_samples = torch.cat(all_samples, dim=0).to(device) + + # 3 - Sort all samples and collect the ranges of each rank as equidistant + all_samples = all_samples.sort()[0] + idx = torch.round(torch.linspace(0, len(all_samples) - 1, num_workers + 1)).to(int) + ranges = all_samples[idx] # range of each rank r as ranges[r] <= x < ranges[r+1] + ranges[-1] += 1 # increase upper limit of last rank so that x < ranges[r+1]. + + # 4 - collect elements to send to each rank, based on the rank ranges + send = [] + for rank in range(num_workers): + mask = (tensor[:, 0] >= ranges[rank]) & (tensor[:, 0] < ranges[rank + 1]) + send.append(tensor[mask]) + + # 5. all to all to communicate the sizes to be sent/recv + send_count = [torch.tensor([len(s) * dims], dtype=torch.int64, device=device) for s in send] + recv_count = list(torch.empty([num_workers], dtype=torch.int64, device=device).chunk(num_workers)) + dist.all_to_all(recv_count, send_count, group=comm_group) + + # 6. all-to-all-v to communicate the elements to be sent/recv as a single tensor + send = torch.cat(send, dim=0).flatten().to(device) + recv = torch.zeros(sum(recv_count), dtype=send.dtype).to(device) + send_count = [s.item() for s in send_count] # convert to list of ints + recv_count = [r.item() for r in recv_count] + dist.all_to_all_single(recv, send, recv_count, send_count, group=comm_group) + del send + + # 7. the received tensor is the 1D disjoint subset of the distributed tensor. + # We will recover the original dimensionality and sort it by columns again. + recv = recv.view(-1, dims) + recv = torch.tensor(sorted(recv.tolist()), dtype=recv.dtype, device=recv.device) + return recv + + +def test_compare_both_data_analyzers(dataset): + """ given a dataset, compare file and memory based data analyser""" + + id = lambda t: torch.tensor(t).to(torch.int64) # identity + batch_sum = lambda t: id(t).sum() #sum batch + kwargs = dict( + dataset=dataset, + batch_size=3, + worker_id=int(os.environ['RANK']), + num_workers=int(os.environ['WORLD_SIZE']), + metric_names=["mod", "batch_sum"], + metric_functions=[id, batch_sum], + metric_types=['single_value_per_sample', 'accumulate_value_over_samples'], + ) + + dda = DistributedDataAnalyzer( + save_path="./output_dist", + device=f"cuda:{int(os.environ['LOCAL_RANK'])}", + **kwargs, + ) + start_time = time.time() + dda.run_map_reduce() + if dda.worker_id == 0: + print("DistributedDataAnalyzer runtime: %s seconds " % (time.time() - start_time)) + + da = DataAnalyzer(num_threads=2, + num_threads_reduce=2, + metric_dtypes=[torch.int64, torch.int64], + save_path="./output_disk", + **kwargs) + start_time = time.time() + da.run_map_reduce() + if da.worker_id == 0: + print("DataAnalyzer runtime: %s seconds " % (time.time() - start_time)) + + output_paths = [ + "batch_sum/batch_sum_metric_value.bin", "batch_sum/batch_sum_metric_value.idx", \ + "mod/mod_index_to_metric.bin", "mod/mod_index_to_metric.idx", \ + "mod/mod_index_to_sample.bin", "mod/mod_index_to_sample.idx", \ + "mod/mod_index_to_sample_percentile_merged.bin", "mod/mod_index_to_sample_percentile_merged.idx", \ + "mod/mod_sample_to_metric.bin", "mod/mod_sample_to_metric.idx" + ] + + if dda.worker_id == 0: + for path in output_paths: + with open(os.path.join(da.save_path, path), 'rb') as f1, \ + open(os.path.join(dda.save_path, path), 'rb') as f2: + if f1.read() != f2.read(): + print(f"files {path} are not identical.") + + +if __name__ == "__main__": + + class TestDataset(torch.utils.data.Dataset): + + def __init__(self, size=20): + self.values = [1001 + x % 6 for x in range(size)] + self.size = size + + def __len__(self): + return self.size + + def __getitem__(self, idx): + return self.values[idx] + + test_compare_both_data_analyzers(TestDataset()) diff --git a/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py b/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py index 7a6963bc27eb..453e6ba6039d 100644 --- a/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py +++ b/deepspeed/runtime/data_pipeline/data_sampling/indexed_dataset.py @@ -581,10 +581,18 @@ def __init__(self, out_file, dtype=np.int64): self._doc_idx = [0] def add_item(self, tensor): + """ write the tensor to the file and update its size in the index""" np_array = np.array(tensor.numpy(), dtype=self._dtype) self._data_file.write(np_array.tobytes(order='C')) self._sizes.append(np_array.size) + def add_items(self, tensor_list): + """ write a list of tensors to the file and update their sizes in the index""" + np_arrays = [np.array(t.numpy(), dtype=self._dtype) for t in tensor_list] + self._data_file.writelines([arr.tobytes(order='C') for arr in np_arrays]) + for arr in np_arrays: + self._sizes.append(arr.size) + def add_item_numpy(self, np_array): if np_array.dtype != self._dtype: np_array = np_array.astype(self._dtype) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index fe0fa8ad793d..5c1202ba06ae 100644 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -230,7 +230,7 @@ def __init__(self, self._is_gradient_accumulation_boundary = None self.scale_wrt_gas = None - self.losses = 0.0 + self.losses = None # for debug purposes - can then debug print: debug_get_module_name(module) debug_extract_module_and_param_names(model) @@ -1911,9 +1911,6 @@ def print_forward_breakdown(self, fwd_time): @instrument_w_nvtx def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE): - assert not (self.bfloat16_enabled() and self.pipeline_parallelism), \ - f'allreduce_gradients() is not valid when bfloat+pipeline_parallelism is enabled' - # Pass (PP) gas boundary flag to optimizer (required for zero) self.optimizer.is_gradient_accumulation_boundary = self.is_gradient_accumulation_boundary() # ZeRO stage >= 2 communicates during non gradient accumulation boundaries as well @@ -1926,7 +1923,11 @@ def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE): self.optimizer, 'reduce_gradients'): self.optimizer.reduce_gradients(pipeline_parallel=self.pipeline_parallelism) else: - self.buffered_allreduce_fallback(elements_per_buffer=bucket_size) + grads = None + if hasattr(self.optimizer, "get_grads_for_reduction"): + # This is currently for BF16 optimizer + grads = self.optimizer.get_grads_for_reduction() + self.buffered_allreduce_fallback(grads=grads, elements_per_buffer=bucket_size) @instrument_w_nvtx def backward(self, loss, allreduce_gradients=True, release_loss=False, retain_graph=False, scale_wrt_gas=True): @@ -1951,13 +1952,14 @@ def backward(self, loss, allreduce_gradients=True, release_loss=False, retain_gr loss = self._scale_loss_by_gas(loss.float()) # Log training loss - self.losses += loss.mean().item() + mean_loss = loss.mean().detach() + self.losses = mean_loss if self.losses is None else self.losses + mean_loss if self.monitor.enabled: if self.is_gradient_accumulation_boundary(): if self.global_rank == 0: self.summary_events = [( f"Train/Samples/train_loss", - self.losses, + self.losses.item(), self.global_samples, )] self.monitor.write_events(self.summary_events) @@ -2123,7 +2125,7 @@ def _take_model_step(self, lr_kwargs, block_eigenvalue={}): if report_progress and (self.global_steps + 1) % self.steps_per_print() == 0: self._report_progress(self.global_steps + 1) - self.losses = 0.0 + self.losses = None self.global_steps += 1 self.global_samples += self.train_batch_size() @@ -3223,22 +3225,21 @@ def _save_moe_checkpoint(self, save_dir, tag, client_state={}, exclude_frozen_pa # In the case of E + D parallelism, only the # first expert parallel group should save the expert weights # since each expert parallel group is a copy of the model's experts - if exp_dp_rank != 0: - return - - # Save optimizer states. They are different across each exp parallel rank. - optimizer_state = { - 'optimizer': self.optimizer.state_dict() if self.optimizer and not self.zero_optimization() else None - } - # TODO: why use BufferedWriter not the path - file_path = self._get_optimizer_ckpt_name(save_dir, tag, expp_rank) - self.checkpoint_engine.save(optimizer_state, file_path) - - # get non-moe parameters - model_state_dict = self._get_non_moe_state_dict( - self.module_state_dict(exclude_frozen_parameters=exclude_frozen_parameters)) - - if expp_rank == 0: + if exp_dp_rank == 0: + # Save optimizer states. They are different across each exp parallel rank. + optimizer_state = { + 'optimizer': self.optimizer.state_dict() if self.optimizer and not self.zero_optimization() else None + } + # TODO: why use BufferedWriter not the path + file_path = self._get_optimizer_ckpt_name(save_dir, tag, expp_rank) + self.checkpoint_engine.save(optimizer_state, file_path) + + # Load flow uses below saved file for model parameters, RNG and more + if groups._get_data_parallel_rank() == 0: + # get non-moe parameters + model_state_dict = self._get_non_moe_state_dict( + self.module_state_dict(exclude_frozen_parameters=exclude_frozen_parameters)) + # TODO: update num experts info,.. in checkpoint state = { 'module': diff --git a/deepspeed/runtime/utils.py b/deepspeed/runtime/utils.py index d7a35b7dbbe9..d1ebe4b2f83d 100755 --- a/deepspeed/runtime/utils.py +++ b/deepspeed/runtime/utils.py @@ -407,7 +407,7 @@ def clip_grad_norm_(parameters, max_norm, norm_type=2, mpu=None): max_norm = torch.tensor([float(max_norm)], device=parameters[0].device) clip_coef = max_norm / (total_norm + 1e-6) tmp_tensor = torch.tensor([1.0], device=parameters[0].device) - clip_coef = torch.max(tmp_tensor, clip_coef) + clip_coef = torch.min(tmp_tensor, clip_coef) for p in parameters: p.grad.data.mul_(clip_coef) return total_norm diff --git a/deepspeed/runtime/zero/stage3.py b/deepspeed/runtime/zero/stage3.py index 42008236a9ea..4132a4527d6a 100644 --- a/deepspeed/runtime/zero/stage3.py +++ b/deepspeed/runtime/zero/stage3.py @@ -21,7 +21,7 @@ from deepspeed.runtime.zero.offload_config import OffloadDeviceEnum from deepspeed.runtime.zero.parameter_offload import DeepSpeedZeRoOffload from deepspeed.runtime.zero.utils import apply_to_tensors_only -from deepspeed.ops.adam import DeepSpeedCPUAdam, FusedAdam +from deepspeed.ops.adam import DeepSpeedCPUAdam from deepspeed.runtime.swap_tensor.partitioned_param_swapper import PartitionedParamStatus from deepspeed.runtime.swap_tensor.optimizer_utils import OptimizerSwapper from deepspeed.runtime.swap_tensor.partitioned_optimizer_swapper import PartitionedOptimizerSwapper @@ -201,13 +201,12 @@ def __init__( backup_gpu_tensor = torch.randn(1, device=get_accelerator().device_name()).to(self.dtype) backup_gpu_param = torch.nn.Parameter(backup_gpu_tensor) assert type(init_optimizer) == DeepSpeedCPUAdam, 'Hybrid Optimizer Only Supports DeepSpeedCPUAdam' - self.backup_optimizer = FusedAdam([backup_gpu_param], - lr=self.optimizer.param_groups[0]["lr"], - bias_correction=self.optimizer.param_groups[0]["bias_correction"], - betas=self.optimizer.param_groups[0]["betas"], - eps=self.optimizer.param_groups[0]["eps"], - weight_decay=self.optimizer.param_groups[0]["weight_decay"], - amsgrad=self.optimizer.param_groups[0]["amsgrad"]) + self.backup_optimizer = torch.optim.AdamW([backup_gpu_param], + lr=self.optimizer.param_groups[0]["lr"], + betas=self.optimizer.param_groups[0]["betas"], + eps=self.optimizer.param_groups[0]["eps"], + weight_decay=self.optimizer.param_groups[0]["weight_decay"], + amsgrad=self.optimizer.param_groups[0]["amsgrad"]) # Multiple param_groups configs for back-up optimizer if len(self.optimizer.param_groups) > 1: for i in range(1, len(self.optimizer.param_groups)): diff --git a/deepspeed/runtime/zero/stage_1_and_2.py b/deepspeed/runtime/zero/stage_1_and_2.py index 18b58403f1d7..e4009f6ac883 100755 --- a/deepspeed/runtime/zero/stage_1_and_2.py +++ b/deepspeed/runtime/zero/stage_1_and_2.py @@ -1686,7 +1686,7 @@ def get_grad_norm_direct(self, gradients, params, norm_type=2): if len(all_norms) > 0: total_norm = torch.stack(all_norms).square().sum().float() else: - total_norm = torch.FloatTensor([0.0]).to(self.device) + total_norm = torch.tensor(0.0, dtype=torch.float32).to(self.device) # Sum across all model parallel Device. dist.all_reduce(total_norm, op=dist.ReduceOp.SUM, group=self.dp_process_group) @@ -1970,24 +1970,26 @@ def _check_overflow(self, partition_gradients=True): self.overflow = self.has_overflow(partition_gradients) # `params` is a list / generator of torch.Variable - def has_overflow_serial(self, params, is_grad_list=False): + def has_overflow_serial(self, params): + invalid_grad_count = torch.zeros([1], dtype=torch.float, device=get_accelerator().current_device_name()) for p in params: - if p.grad is not None and self._has_inf_or_nan(p.grad.data): - return True - - return False + if p.grad is not None: + invalid_grad_count += self._has_inf_or_nan(p.grad) + return invalid_grad_count.bool() def has_overflow_partitioned_grads_serial(self): + invalid_grad_count = torch.zeros([1], dtype=torch.float, device=get_accelerator().current_device_name()) for i in range(len(self.bit16_groups)): for j, grad in enumerate(self.averaged_gradients[i]): - if grad is not None and self._has_inf_or_nan(grad.data, j): - return True - return False + if grad is not None: + invalid_grad_count += self._has_inf_or_nan(grad) + return invalid_grad_count.bool() def has_overflow(self, partition_gradients=True): if partition_gradients: overflow = self.local_overflow if self.cpu_offload else self.has_overflow_partitioned_grads_serial() - overflow_gpu = get_accelerator().ByteTensor([overflow]) + overflow_gpu = get_accelerator().ByteTensor([overflow]) if self.cpu_offload else overflow.byte().to( + get_accelerator().current_device_name()) '''This will capture overflow across all data parallel and expert parallel process Since expert parallel process are a subset of data parallel process''' dist.all_reduce(overflow_gpu, op=dist.ReduceOp.MAX, group=self.dp_process_group) @@ -1997,9 +1999,7 @@ def has_overflow(self, partition_gradients=True): for group in self.bit16_groups: for param in group: params.append(param) - - overflow = self.has_overflow_serial(params, is_grad_list=partition_gradients) - overflow_gpu = get_accelerator().ByteTensor([overflow]) + overflow_gpu = self.has_overflow_serial(params).byte().to(get_accelerator().current_device_name()) # Since each model parallel GPU carries only part of the model, # make sure overflow flag is synced across all the model parallel GPUs @@ -2011,24 +2011,11 @@ def has_overflow(self, partition_gradients=True): # `x` is a torch.Tensor @staticmethod def _has_inf_or_nan(x, j=None): - try: - # if x is half, the .float() incurs an additional deep copy, but it's necessary if - # Pytorch's .sum() creates a one-element tensor of the same type as x - # (which is true for some recent version of pytorch). - cpu_sum = float(x.float().sum()) - # More efficient version that can be used if .sum() returns a Python scalar - # cpu_sum = float(x.sum()) - except RuntimeError as instance: - # We want to check if inst is actually an overflow exception. - # RuntimeError could come from a different error. - # If so, we still want the exception to propagate. - if "value cannot be converted" not in instance.args[0]: - raise - return True - else: - if cpu_sum == float('inf') or cpu_sum == -float('inf') or cpu_sum != cpu_sum: - return True - return False + float_x = x.float() + nan = float_x.isnan() + inf = float_x.isinf() + inf_or_nan = nan.logical_or(inf) + return inf_or_nan.float().max() def backward(self, loss, retain_graph=False): """ diff --git a/op_builder/builder.py b/op_builder/builder.py index fec39f2b4feb..dd77f967cc60 100644 --- a/op_builder/builder.py +++ b/op_builder/builder.py @@ -453,9 +453,10 @@ def deepspeed_src_path(self, code_path): def builder(self): from torch.utils.cpp_extension import CppExtension + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] return CppExtension(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, extra_compile_args={'cxx': self.strip_empty_entries(self.cxx_args())}, extra_link_args=self.strip_empty_entries(self.extra_ldflags())) @@ -638,7 +639,7 @@ def builder(self): from torch.utils.cpp_extension import CppExtension as ExtensionBuilder else: from torch.utils.cpp_extension import CUDAExtension as ExtensionBuilder - + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] compile_args = {'cxx': self.strip_empty_entries(self.cxx_args())} if self.build_for_cpu else \ {'cxx': self.strip_empty_entries(self.cxx_args()), \ 'nvcc': self.strip_empty_entries(self.nvcc_args())} @@ -651,7 +652,7 @@ def builder(self): cuda_ext = ExtensionBuilder(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, libraries=self.strip_empty_entries(self.libraries_args()), extra_compile_args=compile_args, extra_link_args=self.strip_empty_entries(self.extra_ldflags())) @@ -702,11 +703,18 @@ def nvcc_args(self): '-DROCM_VERSION_MINOR=%s' % ROCM_MINOR ] else: + try: + nvcc_threads = int(os.getenv("DS_NVCC_THREADS", "")) + if nvcc_threads <= 0: + raise ValueError("") + except ValueError: + nvcc_threads = min(os.cpu_count(), 8) + cuda_major, _ = installed_cuda_version() args += [ '-allow-unsupported-compiler' if sys.platform == "win32" else '', '--use_fast_math', '-std=c++17' if cuda_major > 10 else '-std=c++14', '-U__CUDA_NO_HALF_OPERATORS__', - '-U__CUDA_NO_HALF_CONVERSIONS__', '-U__CUDA_NO_HALF2_OPERATORS__' + '-U__CUDA_NO_HALF_CONVERSIONS__', '-U__CUDA_NO_HALF2_OPERATORS__', f'--threads={nvcc_threads}' ] if os.environ.get('DS_DEBUG_CUDA_BUILD', '0') == '1': args.append('--ptxas-options=-v') diff --git a/op_builder/cpu/builder.py b/op_builder/cpu/builder.py index f6a71c7d1971..d2bc8eacfa25 100644 --- a/op_builder/cpu/builder.py +++ b/op_builder/cpu/builder.py @@ -3,6 +3,8 @@ # DeepSpeed Team +import os + try: # is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed # if successful this also means we're doing a local install and not JIT compile path @@ -16,12 +18,12 @@ class CPUOpBuilder(OpBuilder): def builder(self): from torch.utils.cpp_extension import CppExtension as ExtensionBuilder - + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] compile_args = {'cxx': self.strip_empty_entries(self.cxx_args())} cpp_ext = ExtensionBuilder(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, libraries=self.strip_empty_entries(self.libraries_args()), extra_compile_args=compile_args) diff --git a/op_builder/hpu/builder.py b/op_builder/hpu/builder.py index 5a538c84040c..3c86128fffd6 100644 --- a/op_builder/hpu/builder.py +++ b/op_builder/hpu/builder.py @@ -4,6 +4,8 @@ # DeepSpeed Team +import os + try: # is op_builder from deepspeed or a 3p version? this should only succeed if it's deepspeed # if successful this also means we're doing a local install and not JIT compile path @@ -17,12 +19,12 @@ class CPUOpBuilder(OpBuilder): def builder(self): from torch.utils.cpp_extension import CppExtension as ExtensionBuilder - + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] compile_args = {'cxx': self.strip_empty_entries(self.cxx_args())} cpp_ext = ExtensionBuilder(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, libraries=self.strip_empty_entries(self.libraries_args()), extra_compile_args=compile_args) diff --git a/op_builder/inference_core_ops.py b/op_builder/inference_core_ops.py index 229b500bebda..8073b63ad16b 100755 --- a/op_builder/inference_core_ops.py +++ b/op_builder/inference_core_ops.py @@ -60,13 +60,13 @@ def sources(self): sources = [ "inference/v2/kernels/core_ops/core_ops.cpp", "inference/v2/kernels/core_ops/bias_activations/bias_activation.cpp", - "inference/v2/kernels/core_ops/bias_activations/bias_activation.cu", + "inference/v2/kernels/core_ops/bias_activations/bias_activation_cuda.cu", "inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm.cpp", - "inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm.cu", + "inference/v2/kernels/core_ops/cuda_layer_norm/layer_norm_cuda.cu", "inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm.cpp", - "inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm.cu", + "inference/v2/kernels/core_ops/cuda_rms_norm/rms_norm_cuda.cu", "inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels.cpp", - "inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels.cu", + "inference/v2/kernels/core_ops/gated_activations/gated_activation_kernels_cuda.cu", ] prefix = self.get_prefix() diff --git a/op_builder/ragged_ops.py b/op_builder/ragged_ops.py index 8cb372e96c37..ec7cab91885f 100644 --- a/op_builder/ragged_ops.py +++ b/op_builder/ragged_ops.py @@ -63,18 +63,18 @@ def sources(self): "inference/v2/kernels/ragged_ops/atom_builder/atom_builder.cpp", "inference/v2/kernels/ragged_ops/blocked_flash/blocked_flash.cpp", "inference/v2/kernels/ragged_ops/embed/embed.cpp", - "inference/v2/kernels/ragged_ops/embed/embed.cu", + "inference/v2/kernels/ragged_ops/embed/embed_cuda.cu", "inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary.cpp", - "inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary.cu", + "inference/v2/kernels/ragged_ops/linear_blocked_kv_rotary/blocked_kv_rotary_cuda.cu", "inference/v2/kernels/ragged_ops/logits_gather/logits_gather.cpp", - "inference/v2/kernels/ragged_ops/logits_gather/logits_gather.cu", + "inference/v2/kernels/ragged_ops/logits_gather/logits_gather_cuda.cu", "inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter.cpp", - "inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter.cu", + "inference/v2/kernels/ragged_ops/moe_scatter/moe_scatter_cuda.cu", "inference/v2/kernels/ragged_ops/moe_gather/moe_gather.cpp", - "inference/v2/kernels/ragged_ops/moe_gather/moe_gather.cu", + "inference/v2/kernels/ragged_ops/moe_gather/moe_gather_cuda.cu", "inference/v2/kernels/ragged_ops/ragged_helpers/ragged_kernel_helpers.cpp", "inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating.cpp", - "inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating.cu", + "inference/v2/kernels/ragged_ops/top_k_gating/top_k_gating_cuda.cu", ] prefix = self.get_prefix() diff --git a/op_builder/xpu/builder.py b/op_builder/xpu/builder.py index 2f0feba1d4e1..459dcce6bfae 100644 --- a/op_builder/xpu/builder.py +++ b/op_builder/xpu/builder.py @@ -23,11 +23,11 @@ def builder(self): from intel_extension_for_pytorch.xpu.cpp_extension import DPCPPExtension except ImportError: from intel_extension_for_pytorch.xpu.utils import DPCPPExtension - + include_dirs = [os.path.abspath(x) for x in self.strip_empty_entries(self.include_paths())] print("dpcpp sources = {}".format(self.sources())) dpcpp_ext = DPCPPExtension(name=self.absolute_name(), sources=self.strip_empty_entries(self.sources()), - include_dirs=self.strip_empty_entries(self.include_paths()), + include_dirs=include_dirs, extra_compile_args={ 'cxx': self.strip_empty_entries(self.cxx_args()), }, diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 7ae81d14aa18..f28c1ecb165c 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -6,7 +6,7 @@ future importlib-metadata>=4 mup pre-commit>=2.20.0 -pytest +pytest<=8.0.0 pytest-forked pytest-randomly pytest-xdist diff --git a/setup.py b/setup.py index d9aed9b47bd8..418c1f11a0e0 100755 --- a/setup.py +++ b/setup.py @@ -119,7 +119,8 @@ def get_env_if_set(key, default: typing.Any = ""): # For any pre-installed ops force disable ninja. if torch_available: from accelerator import get_accelerator - cmdclass['build_ext'] = get_accelerator().build_extension().with_options(use_ninja=False) + use_ninja = not is_env_set("DS_DISABLE_NINJA") + cmdclass['build_ext'] = get_accelerator().build_extension().with_options(use_ninja=use_ninja) if torch_available: TORCH_MAJOR = torch.__version__.split('.')[0] diff --git a/tests/unit/runtime/test_runtime_utils.py b/tests/unit/runtime/test_runtime_utils.py index 5d8478b249be..6fdeb2074246 100644 --- a/tests/unit/runtime/test_runtime_utils.py +++ b/tests/unit/runtime/test_runtime_utils.py @@ -26,10 +26,10 @@ def test_call_to_str(): assert c2s('hello', 1138, val=3) == 'hello(1138, val=3)' -class TestClibGradNorm(DistributedTest): +class TestClipGradNorm(DistributedTest): world_size = 2 - def test(self): + def test_gather(self): param1 = torch.nn.Parameter(torch.Tensor([0])) param1.grad = torch.Tensor([1]) param2 = torch.nn.Parameter(torch.Tensor([0])) @@ -50,6 +50,27 @@ def test(self): assert gathered_norm[0] == gathered_norm[1], "norm at rank 0 does not match the norm at rank 1" + def test_clipped_val(self): + max_norm = 0.1 + + def test_params(): + param1 = torch.nn.Parameter(torch.Tensor([0])) + param1.grad = torch.Tensor([1]) + param2 = torch.nn.Parameter(torch.Tensor([0])) + param2.grad = torch.Tensor([1]) + return [param1, param2] + + # This assumes gradients are same on all the ranks and doesn't consider multiple ranks + params_expected = test_params() + torch.nn.utils.clip_grad_norm_(params_expected, max_norm) + + params_actual = test_params() + ds_utils.clip_grad_norm_(params_actual, max_norm=max_norm) + + # This can be allclose + assert torch.equal(params_expected[0].grad, params_actual[0].grad) + assert torch.equal(params_expected[1].grad, params_actual[1].grad) + @pytest.mark.parametrize("check_using_norm", [(False), (True)]) class TestCheckOverflow(DistributedTest): diff --git a/tests/unit/runtime/zero/test_zero.py b/tests/unit/runtime/zero/test_zero.py index 2594d910acff..5a8af95bb0f8 100644 --- a/tests/unit/runtime/zero/test_zero.py +++ b/tests/unit/runtime/zero/test_zero.py @@ -1604,7 +1604,7 @@ def test_empty_param_groups(self, dtype, use_client_optimizer, empty_weight_grou } if use_client_optimizer: - optimizer = deepspeed.ops.adam.FusedAdam(param_groups, lr=0.1) + optimizer = torch.optim.AdamW(param_groups, lr=0.1) model_parameters = model.parameters() else: config_dict["optimizer"] = {"type": "adamw"}