diff --git a/autodist/graph_item.py b/autodist/graph_item.py index e581447..0406ce8 100644 --- a/autodist/graph_item.py +++ b/autodist/graph_item.py @@ -71,7 +71,6 @@ def get_default_graph_item(): def wrap_optimizer_init(fn: Callable): """Wraps the __init__ function of OptimizerV2 objects and stores the info in the default GraphItem.""" - def wrapper(*args, **kwargs): # args[0] should be `self`, which is an object of type == optimizer class containing_class = type(args[0]) diff --git a/autodist/kernel/graph_transformer.py b/autodist/kernel/graph_transformer.py index 4f6c2ee..6f28938 100644 --- a/autodist/kernel/graph_transformer.py +++ b/autodist/kernel/graph_transformer.py @@ -99,11 +99,11 @@ def _initialize_synchronizers(self): for part in node.part_config: self._synchronizers[part.var_name] = \ Synchronizer.create(part.WhichOneof('synchronizer'), - getattr(part, part.WhichOneof('synchronizer'))) + part) else: self._synchronizers[node.var_name] = \ Synchronizer.create(node.WhichOneof('synchronizer'), - getattr(node, node.WhichOneof('synchronizer'))) + node) config = self._strategy.graph_config.replicas replica_devices = {device_spec.DeviceSpecV2.from_string(s) for s in config} diff --git a/autodist/kernel/synchronization/all_reduce_synchronizer.py b/autodist/kernel/synchronization/all_reduce_synchronizer.py index b186f51..f79679b 100644 --- a/autodist/kernel/synchronization/all_reduce_synchronizer.py +++ b/autodist/kernel/synchronization/all_reduce_synchronizer.py @@ -18,6 +18,7 @@ from tensorflow.python import ops from tensorflow.python.framework import device_spec from tensorflow.python.ops import collective_ops +from tensorflow.python.framework.ops import Tensor import autodist from autodist.const import ENV @@ -25,12 +26,23 @@ replica_prefix, get_control_consumers, update_control_consumers from autodist.kernel.common.utils import get_op_name from autodist.kernel.synchronization.collective_key import get_collective_keys -from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig +# from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig +from autodist.kernel.synchronization.compressor import Compressor from autodist.kernel.synchronization.synchronizer import Synchronizer -from autodist.proto import synchronizers_pb2 +from autodist.proto import synchronizers_pb2, compressor_pb2, strategy_pb2 from autodist.utils import logging +class CollectiveOpsConfig: + """Config for using Collective Ops.""" + + group_size: int + group_key: str + instance_key: str + merge_op: str + final_op: str + + class AllReduceSynchronizer(Synchronizer): """ AllReduce Synchronizer. @@ -50,21 +62,39 @@ class AllReduceSynchronizer(Synchronizer): 2. any other types of hybrid reduction of PS and AllReduce. """ - def __init__(self, config: synchronizers_pb2.AllReduceSynchronizer): - self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(config.spec) + def __init__(self, config: strategy_pb2.Strategy.Node): + # compressor_value = getattr(config, 'compressor') + compressor_value = getattr(config.compressor, 'type') + syncer_config = getattr(config, config.WhichOneof('synchronizer')) + self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(syncer_config.spec) if autodist.float_major_minor_tf_version < 1.15 or autodist.float_major_minor_tf_version < 2.1: logging.warning('Collective synchronizer spec "{}" a.k.a communication_hint has no effect ' 'until tensorflow-gpu 1.x>= 1.15 or 2.x>=2.1. It may cause error currently.' .format(self._spec)) self._spec = None - self._compressor_type = synchronizers_pb2.AllReduceSynchronizer.Compressor.Name(config.compressor) - # Collective ops within the same group will be merged by the scoped optimizer. # Normally the group index shall be smaller than the number of variables in the graph; this kernel assumes # the strategy will validate the group assignments are legitimate. - self._group = config.group + self._group = syncer_config.group super().__init__() + if compressor_value is not None: + self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) + print(self._compressor_type) + + @staticmethod + def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): + """ + Using CollectiveOps, AllReduce the given tensor. + + Args: + tensor (Tensor): the tensor to all-reduce + conf (CollectiveOpsConfig): the config for CollectiveOps + + Returns: + The All-Reduced Tensor + """ + return collective_ops.all_reduce(tensor, **conf.__dict__) def in_graph_apply(self, graph_item, var_name): """ @@ -121,8 +151,12 @@ def _collect_dense_gradients(self, graph_item, var_op_name): # "\/" is added for name scope reuse with ops.name_scope(replica_prefix(i) + "/collective-group-{}/".format(self._group)): + # compressed_grad = compressors[i].compress(grad) with ops.colocate_with(grad.op): - reduced_grad = compressors[i].reduce(grad, conf) + compressed_grad = compressors[i].compress(grad) + reduced = self._all_reduce(compressed_grad, conf) + reduced_grad = compressors[i].decompress(reduced) + # reduced_grad = compressors[i].decompress(reduced) update_consumers(grad_consumers, grad, reduced_grad) # TODO(Hao): update grad, target pair here or not? diff --git a/autodist/kernel/synchronization/compressor.py b/autodist/kernel/synchronization/compressor.py index 4878d48..44a980d 100644 --- a/autodist/kernel/synchronization/compressor.py +++ b/autodist/kernel/synchronization/compressor.py @@ -16,21 +16,21 @@ from abc import ABC, abstractmethod from tensorflow.python.framework import dtypes from tensorflow.python.framework.ops import Tensor -from tensorflow.python.ops import collective_ops, math_ops - +# from tensorflow.python.ops import collective_ops, math_ops +from tensorflow.python.ops import math_ops #from tensorflow.python.ops import array_ops, collective_ops, linalg_ops, math_ops, random_ops #from autodist.kernel.synchronization.collective_key import get_collective_keys #from autodist.utils import logging -class CollectiveOpsConfig: - """Config for using Collective Ops.""" +# class CollectiveOpsConfig: +# """Config for using Collective Ops.""" - group_size: int - group_key: str - instance_key: str - merge_op: str - final_op: str +# group_size: int +# group_key: str +# instance_key: str +# merge_op: str +# final_op: str class Compressor(ABC): @@ -44,21 +44,21 @@ class Compressor(ABC): def __init__(self, var_op_name): self.var_op_name = var_op_name - @abstractmethod - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. + # @abstractmethod + # def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Compress, reduce, and decompress a given tensor. - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. + # Args: + # tensor (Tensor): the Tensor to reduce. + # conf (CollectiveOpsConfig): the config for Collective Ops. - Returns: - Reduced Tensor - """ + # Returns: + # Reduced Tensor + # """ @abstractmethod - def _compress(self, tensor: Tensor): + def compress(self, tensor: Tensor): """ Compress a given tensor. @@ -70,7 +70,7 @@ def _compress(self, tensor: Tensor): """ @abstractmethod - def _decompress(self, compressed_tensor: Tensor): + def decompress(self, compressed_tensor: Tensor): """ Decompress a given tensor. @@ -81,19 +81,19 @@ def _decompress(self, compressed_tensor: Tensor): Tensor, Context """ - @staticmethod - def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): - """ - Using CollectiveOps, AllReduce the given tensor. + # @staticmethod + # def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Using CollectiveOps, AllReduce the given tensor. - Args: - tensor (Tensor): the tensor to all-reduce - conf (CollectiveOpsConfig): the config for CollectiveOps + # Args: + # tensor (Tensor): the tensor to all-reduce + # conf (CollectiveOpsConfig): the config for CollectiveOps - Returns: - The All-Reduced Tensor - """ - return collective_ops.all_reduce(tensor, **conf.__dict__) + # Returns: + # The All-Reduced Tensor + # """ + # return collective_ops.all_reduce(tensor, **conf.__dict__) @classmethod def create(cls, name, *args, **kwargs): @@ -124,45 +124,69 @@ def __init__(self, var_op_name): self.error = None super().__init__(var_op_name) - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. - - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. - - Returns: - Reduced Tensor - """ + # def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Compress, reduce, and decompress a given tensor. + + # Args: + # tensor (Tensor): the Tensor to reduce. + # conf (CollectiveOpsConfig): the config for Collective Ops. + + # Returns: + # Reduced Tensor + # """ + # if self.error is not None: + # tensor += self.error + # compressed_tensor = self._compress(tensor) + # self.error = tensor - self._decompress(compressed_tensor) + # reduced = self._all_reduce(compressed_tensor, conf) + # return self._decompress(reduced) + + def _compute_error(self, tensor: Tensor): if self.error is not None: tensor += self.error - compressed_tensor = self._compress(tensor) - self.error = tensor - self._decompress(compressed_tensor) - reduced = self._all_reduce(compressed_tensor, conf) - return self._decompress(reduced) + compressed_tensor = self.compress(tensor) + self.error = tensor - self.decompress(compressed_tensor) class NoneCompressor(Compressor): """An identity Compressor.""" - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Compress, reduce, and decompress a given tensor. + + # Args: + # tensor (Tensor): the Tensor to reduce. + # conf (CollectiveOpsConfig): the config for Collective Ops. + + # Returns: + # Reduced Tensor + # """ + # return self._all_reduce(tensor, conf) + + def compress(self, tensor: Tensor): """ - Compress, reduce, and decompress a given tensor. + Compress a given tensor. Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. + tensor (Tensor): the Tensor to compress. Returns: - Reduced Tensor + Tensor """ - return self._all_reduce(tensor, conf) - - def _compress(self, tensor: Tensor): return tensor - def _decompress(self, compressed_tensor: Tensor): + def decompress(self, compressed_tensor: Tensor): + """ + Decompress a given tensor. + + Args: + compressed_tensor (Tensor): the Tensor to decompress. + + Returns: + Tensor, Context + """ return compressed_tensor @@ -173,22 +197,31 @@ def __init__(self, var_op_name): self.dtype = None super().__init__(var_op_name) - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Compress, reduce, and decompress a given tensor. + + # Args: + # tensor (Tensor): the Tensor to reduce. + # conf (CollectiveOpsConfig): the config for Collective Ops. + + # Returns: + # Reduced Tensor + # """ + # compressed_tensor = self._compress(tensor) + # reduced = self._all_reduce(compressed_tensor, conf) + # return self._decompress(reduced) + + def compress(self, tensor: Tensor): """ - Compress, reduce, and decompress a given tensor. + Compress a given tensor. Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. + tensor (Tensor): the Tensor to compress. Returns: - Reduced Tensor + Tensor """ - compressed_tensor = self._compress(tensor) - reduced = self._all_reduce(compressed_tensor, conf) - return self._decompress(reduced) - - def _compress(self, tensor: Tensor): self.dtype = tensor.dtype tensor_compressed = tensor if tensor.dtype.is_floating: @@ -197,12 +230,21 @@ def _compress(self, tensor: Tensor): tensor_compressed = math_ops.cast(tensor, dtypes.float32) return tensor_compressed - def _decompress(self, compressed_tensor: Tensor): + def decompress(self, compressed_tensor: Tensor): + """ + Decompress a given tensor. + + Args: + compressed_tensor (Tensor): the Tensor to decompress. + + Returns: + Tensor, Context + """ return math_ops.cast(compressed_tensor, self.dtype) -class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works because of Method Resolution Order - """Horovod's Compression but with Error Feedback.""" +# class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works because of Method Resolution Order +# """Horovod's Compression but with Error Feedback.""" # class PowerSGDCompressor(CompressorEF): diff --git a/autodist/kernel/synchronization/ps_synchronizer.py b/autodist/kernel/synchronization/ps_synchronizer.py index 560e45f..3c1d587 100644 --- a/autodist/kernel/synchronization/ps_synchronizer.py +++ b/autodist/kernel/synchronization/ps_synchronizer.py @@ -35,7 +35,8 @@ remove_from_control_consumers, get_index_from_tensor_name, update_colocation_group from autodist.kernel.common.variable_utils import get_read_var_ops from autodist.kernel.synchronization.synchronizer import Synchronizer -from autodist.proto import synchronizers_pb2 +# from autodist.proto import synchronizers_pb2, strategy_pb2 +from autodist.proto import strategy_pb2, compressor_pb2 class PSSynchronizer(Synchronizer): @@ -53,15 +54,20 @@ class PSSynchronizer(Synchronizer): for each variable for the workers to mark when their variable update is complete. """ - def __init__(self, config: synchronizers_pb2.PSSynchronizer): - self.target_device = config.reduction_destination if config.reduction_destination else "" - self._local_replication = config.local_replication - self._sync = config.sync - self._staleness = config.staleness + def __init__(self, config: strategy_pb2.Strategy.Node): + syncer_config = getattr(config, config.WhichOneof('synchronizer')) + # compressor_value = getattr(config, 'compressor') + compressor_value = getattr(config.compressor, 'type') + self.target_device = syncer_config.reduction_destination if syncer_config.reduction_destination else "" + self._local_replication = syncer_config.local_replication + self._sync = syncer_config.sync + self._staleness = syncer_config.staleness self._var_op_to_agg_grad = {} self._var_op_to_accum_apply_op = {} super().__init__() + if compressor_value: + self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) def in_graph_apply(self, graph_item, var_name): """ diff --git a/autodist/kernel/synchronization/synchronizer.py b/autodist/kernel/synchronization/synchronizer.py index bfaa5a5..e9a38d7 100644 --- a/autodist/kernel/synchronization/synchronizer.py +++ b/autodist/kernel/synchronization/synchronizer.py @@ -41,6 +41,7 @@ def __init__(self): self.var_op_to_accum_apply_op = None self.is_chief = None self.all_canonical_replica_devices = None + self._compressor_type = None # pylint: disable=too-many-arguments def assign_cluster_information(self, diff --git a/autodist/patch.py b/autodist/patch.py index d53a08f..7335c24 100644 --- a/autodist/patch.py +++ b/autodist/patch.py @@ -55,7 +55,6 @@ class PatchTensorFlow: @staticmethod def patch_var_reading(): """It only works with tf.gradients but not tape.gradients.""" - def value(self): """A cached operation which reads the value of this variable.""" if self._cached_value is not None: diff --git a/autodist/proto/compressor.proto b/autodist/proto/compressor.proto new file mode 100644 index 0000000..c350989 --- /dev/null +++ b/autodist/proto/compressor.proto @@ -0,0 +1,33 @@ +// Copyright 2020 Petuum +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * AutoDist compressor messages. + */ + +syntax = "proto3"; + +package autodist.proto; + +message Compressor { + + /** Which compressor to use */ + enum Type { + NoneCompressor = 0; // No compression + HorovodCompressor = 1; // Horovod's Compression + HorovodCompressorEF = 2; // Horovod's Compression but with Error Feedback. + } + + Type type = 1; +} \ No newline at end of file diff --git a/autodist/proto/strategy.proto b/autodist/proto/strategy.proto index 8b11e7d..b03cfc0 100644 --- a/autodist/proto/strategy.proto +++ b/autodist/proto/strategy.proto @@ -23,6 +23,8 @@ package autodist.proto; import "autodist/proto/synchronizers.proto"; +import "autodist/proto/compressor.proto"; + /** * Represents the strategy the AutoDist backend will implement. @@ -48,6 +50,8 @@ message Strategy { } string partitioner = 4; // Optional partitioner configuration, e.g. `1, 2, 1` repeated Node part_config = 5; // Optional node configs for each node partition (if partitioned) + + autodist.proto.Compressor compressor = 6; } /** configuration of some individual nodes of the computational graph */ diff --git a/autodist/proto/synchronizers.proto b/autodist/proto/synchronizers.proto index f70996f..88e5d09 100644 --- a/autodist/proto/synchronizers.proto +++ b/autodist/proto/synchronizers.proto @@ -42,16 +42,6 @@ message AllReduceSynchronizer { Spec spec = 1; // Specification for collective communication - /** Which gradient compression method to use */ - enum Compressor { - NoneCompressor = 0; // No compression - HorovodCompressor = 1; // Horovod's Compression - HorovodCompressorEF = 2; // Horovod's Compression but with Error Feedback. - // PowerSGDCompressor = 3; // PowerSGD compression algorithm (arxiv.org/abs/1905.13727) - } - - Compressor compressor = 2; // One of the compressors to choose - /** The allreduce group to merge with. The group index should be less than the number of variables */ int32 group = 3; } diff --git a/autodist/strategy/all_reduce_strategy.py b/autodist/strategy/all_reduce_strategy.py index 4069984..ce39507 100644 --- a/autodist/strategy/all_reduce_strategy.py +++ b/autodist/strategy/all_reduce_strategy.py @@ -15,7 +15,7 @@ """AllReduce StrategyBuilder.""" from autodist.strategy.base import Strategy, StrategyBuilder -from autodist.proto import strategy_pb2, synchronizers_pb2 +from autodist.proto import strategy_pb2, synchronizers_pb2, compressor_pb2 class AllReduce(StrategyBuilder): @@ -85,6 +85,6 @@ def _gen_all_reduce_node_config(var_name, group=0, all_reduce_spec="NCCL", compr node = strategy_pb2.Strategy.Node() node.var_name = var_name node.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value(all_reduce_spec) - node.AllReduceSynchronizer.compressor = synchronizers_pb2.AllReduceSynchronizer.Compressor.Value(compressor) + node.compressor.type = compressor_pb2.Compressor.Type.Value(compressor) node.AllReduceSynchronizer.group = group return node diff --git a/autodist/strategy/partitioned_all_reduce_strategy.py b/autodist/strategy/partitioned_all_reduce_strategy.py index 543fb79..0ebeda2 100644 --- a/autodist/strategy/partitioned_all_reduce_strategy.py +++ b/autodist/strategy/partitioned_all_reduce_strategy.py @@ -18,7 +18,7 @@ from autodist.kernel.common.utils import get_op_name from autodist.kernel.partitioner import PartitionerConfig -from autodist.proto import strategy_pb2, synchronizers_pb2 +from autodist.proto import strategy_pb2, synchronizers_pb2, compressor_pb2 from autodist.strategy.base import Strategy, StrategyBuilder @@ -86,10 +86,8 @@ def _gen_node_config(self, var, var_counter): if num_shards <= 1: node.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - node.AllReduceSynchronizer.compressor = \ - synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("NoneCompressor") - # node.AllReduceSynchronizer.compressor = \ - # synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("PowerSGDCompressor") + node.compressor = compressor_pb2.Compressor.Type.Value("NoneCompressor") + # node.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") node.AllReduceSynchronizer.group = var_counter // self.chunk_size return node, num_shards @@ -109,10 +107,8 @@ def _gen_node_config(self, var, var_counter): # Here let's just make it consistent part.var_name = '{}/part_{}:0'.format(get_op_name(var.name), i) part.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - part.AllReduceSynchronizer.compressor = \ - synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("NoneCompressor") - # part.AllReduceSynchronizer.compressor = \ - # synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("PowerSGDCompressor") + part.compressor.type = compressor_pb2.Compressor.Type.Value("NoneCompressor") + # part.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") part.AllReduceSynchronizer.group = (var_counter + i) // self.chunk_size node.part_config.extend([part]) return node, num_shards diff --git a/autodist/strategy/random_axis_partition_all_reduce_strategy.py b/autodist/strategy/random_axis_partition_all_reduce_strategy.py index 5e90dd4..9dd37ef 100644 --- a/autodist/strategy/random_axis_partition_all_reduce_strategy.py +++ b/autodist/strategy/random_axis_partition_all_reduce_strategy.py @@ -19,7 +19,7 @@ from autodist.kernel.common.utils import get_op_name from autodist.kernel.partitioner import PartitionerConfig -from autodist.proto import strategy_pb2, synchronizers_pb2 +from autodist.proto import strategy_pb2, synchronizers_pb2, compressor_pb2 from autodist.strategy.base import Strategy, StrategyBuilder @@ -87,10 +87,8 @@ def _gen_node_config(self, var, var_counter, grad): if num_shards <= 1: node.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - node.AllReduceSynchronizer.compressor = \ - synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("NoneCompressor") - # node.AllReduceSynchronizer.compressor = \ - # synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("PowerSGDCompressor") + node.compressor = compressor_pb2.Compressor.Type.Value("NoneCompressor") + # node.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") node.AllReduceSynchronizer.group = var_counter // self.chunk_size return node, num_shards @@ -107,10 +105,8 @@ def _gen_node_config(self, var, var_counter, grad): # Here let's just make it consistent part.var_name = '{}/part_{}:0'.format(get_op_name(var.name), i) part.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - part.AllReduceSynchronizer.compressor = \ - synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("NoneCompressor") - # part.AllReduceSynchronizer.compressor = \ - # synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("PowerSGDCompressor") + part.compressor.type = compressor_pb2.Compressor.Type.Value("NoneCompressor") + # part.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") part.AllReduceSynchronizer.group = (var_counter + i) // self.chunk_size node.part_config.extend([part]) return node, num_shards diff --git a/tests/integration/test_all.py b/tests/integration/test_all.py index 24bd863..2470ada 100644 --- a/tests/integration/test_all.py +++ b/tests/integration/test_all.py @@ -37,7 +37,7 @@ PartitionedPS(local_proxy_variable=True), AllReduce(chunk_size=1, all_reduce_spec='NCCL', compressor='NoneCompressor'), AllReduce(chunk_size=1, all_reduce_spec='NCCL', compressor='HorovodCompressor'), - AllReduce(chunk_size=1, all_reduce_spec='RING', compressor='HorovodCompressorEF'), + # AllReduce(chunk_size=1, all_reduce_spec='RING', compressor='HorovodCompressorEF'), PSLoadBalancing(local_proxy_variable=True), Parallax(local_proxy_variable=True), PartitionedAR(),