Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PowerSGDCompressor #27 #47

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
98 changes: 98 additions & 0 deletions autodist/kernel/synchronization/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,104 @@ class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works becaus
"""Horovod's Compression but with Error Feedback."""


class PowerSGDCompressor(CompressorEF):
sonicxml marked this conversation as resolved.
Show resolved Hide resolved
"""An implementation of the PowerSGD compression algorithm (arxiv.org/abs/1905.13727)."""
def __init__(self, var_op_name, rank=1):
self.rank = rank
self.og_shape, self.ndims, self.compressor = None, None, None # compressor is the Q in paper
self.var_op_name = var_op_name
sonicxml marked this conversation as resolved.
Show resolved Hide resolved
super.__init__(var_op_name)

def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig):
"""
Compress, reduce, and decompress a given tensor.

Args:
tensor (Tensor): the Tensor to reduce.
conf (CollectiveOpsConfig): the config for Collective Ops.

Returns:
Reduced Tensor
"""
if self.og_shape is None:
self.og_shape = tensor.shape
self.ndims = len(self.og_shape)

# rank <= 1
if self.ndims <= 1 or (self.ndims==2 and any([d == 1 for d in self.og_shape])):
return self._all_reduce(tensor, conf)

# compressor init
if self.compressor is None:
self.compressor = random_ops.random_normal([array_ops.shape_v2(tensor)[1], self.rank])
sonicxml marked this conversation as resolved.
Show resolved Hide resolved

if self.error is not None:
tensor += self.error

compressed_tensor = self._compress(tensor)
self.error = tensor - self._decompress(compressed_tensor)

reduced_tensor = self._all_reduce(compressed_tensor, conf)

orthonormal_reduced_tensor = self._modified_gram_schmidt(reduced_tensor)

self.compressor = math_ops.matmul(tensor, orthonormal_reduced_tensor, transpose_a=True) # mxn * nxr => mxr

# all reduce mean compressor
instance_key = conf.instance_key
conf.instance_key = get_collective_keys().get_instance_key(self.var_op_name + '/compressor')
self.compressor = self._all_reduce(self.compressor, conf)
conf.instance_key = instance_key

return self._decompress(orthonormal_reduced_tensor)

def _compress(self, tensor: Tensor):
"""
Compress a given tensor.

Args:
tensor (Tensor): the Tensor to compress.

Returns:
Tensor
"""
return math_ops.matmul(tensor, self.compressor) # nxm * mxr => nxr

def _decompress(self, compressed_tensor: Tensor):
"""
Decompress a given tensor.

Args:
compressed_tensor (Tensor): the Tensor to decompress.

Returns:
Tensor, Context
"""
return math_ops.matmul(compressed_tensor, self.compressor, transpose_b=True) # nxr * rxm = nxm

@staticmethod
def _modified_gram_schmidt(matrix):
'''
sonicxml marked this conversation as resolved.
Show resolved Hide resolved
apply modified Gram-Schmidt procedure to orthogonalize a matrix in columns

Args:
matrix (Tensor): the Tensor to orthogonalize.

Returns:
matrix (Tensor)
'''
n, m = matrix.shape

for i in range(m):
v = matrix[:, i:i+1]
v /= linalg_ops.norm_v2(v, axis=0)

rest = matrix[:,i+1:]
rest -= math_ops.reduce_sum_v1(v * rest, axis=0, keepdims=True) * v
matrix = array_ops.concat([matrix[:,:i], v, rest],axis=1)
return matrix


# class PowerSGDCompressor(CompressorEF):
# """An implementation of the PowerSGD compression algorithm (arxiv.org/abs/1905.13727)."""

Expand Down