From d48effbd633d5a350e4bbc9a9d6ddbf45a874025 Mon Sep 17 00:00:00 2001 From: Tony Zhang Date: Thu, 22 Oct 2020 11:19:19 -0400 Subject: [PATCH 1/7] decouple compressor before test --- autodist/kernel/graph_transformer.py | 6 ++-- .../all_reduce_synchronizer.py | 6 ++-- autodist/proto/compressor.proto | 33 +++++++++++++++++++ autodist/proto/strategy.proto | 4 +++ autodist/proto/synchronizers.proto | 10 ------ autodist/strategy/all_reduce_strategy.py | 4 +-- .../partitioned_all_reduce_strategy.py | 14 +++----- ...ndom_axis_partition_all_reduce_strategy.py | 14 +++----- 8 files changed, 56 insertions(+), 35 deletions(-) create mode 100644 autodist/proto/compressor.proto diff --git a/autodist/kernel/graph_transformer.py b/autodist/kernel/graph_transformer.py index 4f6c2ee..e2ce5b4 100644 --- a/autodist/kernel/graph_transformer.py +++ b/autodist/kernel/graph_transformer.py @@ -99,11 +99,13 @@ def _initialize_synchronizers(self): for part in node.part_config: self._synchronizers[part.var_name] = \ Synchronizer.create(part.WhichOneof('synchronizer'), - getattr(part, part.WhichOneof('synchronizer'))) + getattr(part, part.WhichOneof('synchronizer')), + part.compressor) else: self._synchronizers[node.var_name] = \ Synchronizer.create(node.WhichOneof('synchronizer'), - getattr(node, node.WhichOneof('synchronizer'))) + getattr(node, node.WhichOneof('synchronizer')). + node.compressor) config = self._strategy.graph_config.replicas replica_devices = {device_spec.DeviceSpecV2.from_string(s) for s in config} diff --git a/autodist/kernel/synchronization/all_reduce_synchronizer.py b/autodist/kernel/synchronization/all_reduce_synchronizer.py index b186f51..98a3ac3 100644 --- a/autodist/kernel/synchronization/all_reduce_synchronizer.py +++ b/autodist/kernel/synchronization/all_reduce_synchronizer.py @@ -27,7 +27,7 @@ from autodist.kernel.synchronization.collective_key import get_collective_keys from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig from autodist.kernel.synchronization.synchronizer import Synchronizer -from autodist.proto import synchronizers_pb2 +from autodist.proto import synchronizers_pb2, compressor_pb2 from autodist.utils import logging @@ -50,7 +50,7 @@ class AllReduceSynchronizer(Synchronizer): 2. any other types of hybrid reduction of PS and AllReduce. """ - def __init__(self, config: synchronizers_pb2.AllReduceSynchronizer): + def __init__(self, config: synchronizers_pb2.AllReduceSynchronizer, compressor_value): self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(config.spec) if autodist.float_major_minor_tf_version < 1.15 or autodist.float_major_minor_tf_version < 2.1: logging.warning('Collective synchronizer spec "{}" a.k.a communication_hint has no effect ' @@ -58,7 +58,7 @@ def __init__(self, config: synchronizers_pb2.AllReduceSynchronizer): .format(self._spec)) self._spec = None - self._compressor_type = synchronizers_pb2.AllReduceSynchronizer.Compressor.Name(config.compressor) + self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) # Collective ops within the same group will be merged by the scoped optimizer. # Normally the group index shall be smaller than the number of variables in the graph; this kernel assumes diff --git a/autodist/proto/compressor.proto b/autodist/proto/compressor.proto new file mode 100644 index 0000000..c350989 --- /dev/null +++ b/autodist/proto/compressor.proto @@ -0,0 +1,33 @@ +// Copyright 2020 Petuum +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * AutoDist compressor messages. + */ + +syntax = "proto3"; + +package autodist.proto; + +message Compressor { + + /** Which compressor to use */ + enum Type { + NoneCompressor = 0; // No compression + HorovodCompressor = 1; // Horovod's Compression + HorovodCompressorEF = 2; // Horovod's Compression but with Error Feedback. + } + + Type type = 1; +} \ No newline at end of file diff --git a/autodist/proto/strategy.proto b/autodist/proto/strategy.proto index 8b11e7d..b03cfc0 100644 --- a/autodist/proto/strategy.proto +++ b/autodist/proto/strategy.proto @@ -23,6 +23,8 @@ package autodist.proto; import "autodist/proto/synchronizers.proto"; +import "autodist/proto/compressor.proto"; + /** * Represents the strategy the AutoDist backend will implement. @@ -48,6 +50,8 @@ message Strategy { } string partitioner = 4; // Optional partitioner configuration, e.g. `1, 2, 1` repeated Node part_config = 5; // Optional node configs for each node partition (if partitioned) + + autodist.proto.Compressor compressor = 6; } /** configuration of some individual nodes of the computational graph */ diff --git a/autodist/proto/synchronizers.proto b/autodist/proto/synchronizers.proto index f70996f..88e5d09 100644 --- a/autodist/proto/synchronizers.proto +++ b/autodist/proto/synchronizers.proto @@ -42,16 +42,6 @@ message AllReduceSynchronizer { Spec spec = 1; // Specification for collective communication - /** Which gradient compression method to use */ - enum Compressor { - NoneCompressor = 0; // No compression - HorovodCompressor = 1; // Horovod's Compression - HorovodCompressorEF = 2; // Horovod's Compression but with Error Feedback. - // PowerSGDCompressor = 3; // PowerSGD compression algorithm (arxiv.org/abs/1905.13727) - } - - Compressor compressor = 2; // One of the compressors to choose - /** The allreduce group to merge with. The group index should be less than the number of variables */ int32 group = 3; } diff --git a/autodist/strategy/all_reduce_strategy.py b/autodist/strategy/all_reduce_strategy.py index 4069984..dc0abe8 100644 --- a/autodist/strategy/all_reduce_strategy.py +++ b/autodist/strategy/all_reduce_strategy.py @@ -15,7 +15,7 @@ """AllReduce StrategyBuilder.""" from autodist.strategy.base import Strategy, StrategyBuilder -from autodist.proto import strategy_pb2, synchronizers_pb2 +from autodist.proto import strategy_pb2, synchronizers_pb2, compressor_pb2 class AllReduce(StrategyBuilder): @@ -85,6 +85,6 @@ def _gen_all_reduce_node_config(var_name, group=0, all_reduce_spec="NCCL", compr node = strategy_pb2.Strategy.Node() node.var_name = var_name node.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value(all_reduce_spec) - node.AllReduceSynchronizer.compressor = synchronizers_pb2.AllReduceSynchronizer.Compressor.Value(compressor) + node.compressor = compressor_pb2.Compressor.Type.Value(compressor) node.AllReduceSynchronizer.group = group return node diff --git a/autodist/strategy/partitioned_all_reduce_strategy.py b/autodist/strategy/partitioned_all_reduce_strategy.py index 543fb79..e19cdae 100644 --- a/autodist/strategy/partitioned_all_reduce_strategy.py +++ b/autodist/strategy/partitioned_all_reduce_strategy.py @@ -18,7 +18,7 @@ from autodist.kernel.common.utils import get_op_name from autodist.kernel.partitioner import PartitionerConfig -from autodist.proto import strategy_pb2, synchronizers_pb2 +from autodist.proto import strategy_pb2, synchronizers_pb2, compressor_pb2 from autodist.strategy.base import Strategy, StrategyBuilder @@ -86,10 +86,8 @@ def _gen_node_config(self, var, var_counter): if num_shards <= 1: node.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - node.AllReduceSynchronizer.compressor = \ - synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("NoneCompressor") - # node.AllReduceSynchronizer.compressor = \ - # synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("PowerSGDCompressor") + node.compressor = compressor_pb2.Compressor.Type.Value("NoneCompressor") + # node.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") node.AllReduceSynchronizer.group = var_counter // self.chunk_size return node, num_shards @@ -109,10 +107,8 @@ def _gen_node_config(self, var, var_counter): # Here let's just make it consistent part.var_name = '{}/part_{}:0'.format(get_op_name(var.name), i) part.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - part.AllReduceSynchronizer.compressor = \ - synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("NoneCompressor") - # part.AllReduceSynchronizer.compressor = \ - # synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("PowerSGDCompressor") + part.compressor = compressor_pb2.Compressor.Type.Value("NoneCompressor") + # part.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") part.AllReduceSynchronizer.group = (var_counter + i) // self.chunk_size node.part_config.extend([part]) return node, num_shards diff --git a/autodist/strategy/random_axis_partition_all_reduce_strategy.py b/autodist/strategy/random_axis_partition_all_reduce_strategy.py index 5e90dd4..53b6cdc 100644 --- a/autodist/strategy/random_axis_partition_all_reduce_strategy.py +++ b/autodist/strategy/random_axis_partition_all_reduce_strategy.py @@ -19,7 +19,7 @@ from autodist.kernel.common.utils import get_op_name from autodist.kernel.partitioner import PartitionerConfig -from autodist.proto import strategy_pb2, synchronizers_pb2 +from autodist.proto import strategy_pb2, synchronizers_pb2, compressor_pb2 from autodist.strategy.base import Strategy, StrategyBuilder @@ -87,10 +87,8 @@ def _gen_node_config(self, var, var_counter, grad): if num_shards <= 1: node.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - node.AllReduceSynchronizer.compressor = \ - synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("NoneCompressor") - # node.AllReduceSynchronizer.compressor = \ - # synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("PowerSGDCompressor") + node.compressor = compressor_pb2.Compressor.Type.Value("NoneCompressor") + # node.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") node.AllReduceSynchronizer.group = var_counter // self.chunk_size return node, num_shards @@ -107,10 +105,8 @@ def _gen_node_config(self, var, var_counter, grad): # Here let's just make it consistent part.var_name = '{}/part_{}:0'.format(get_op_name(var.name), i) part.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - part.AllReduceSynchronizer.compressor = \ - synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("NoneCompressor") - # part.AllReduceSynchronizer.compressor = \ - # synchronizers_pb2.AllReduceSynchronizer.Compressor.Value("PowerSGDCompressor") + part.compressor = compressor_pb2.Compressor.Type.Value("NoneCompressor") + # part.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") part.AllReduceSynchronizer.group = (var_counter + i) // self.chunk_size node.part_config.extend([part]) return node, num_shards From 5f3071397775d77df238fe81ec212847a726f595 Mon Sep 17 00:00:00 2001 From: Tony Zhang Date: Tue, 27 Oct 2020 17:22:16 -0400 Subject: [PATCH 2/7] for discussion --- autodist/kernel/new_graph_transformer.py | 202 ++++++++++ .../new_all_reduce_synchronizer.py | 197 ++++++++++ .../kernel/synchronization/new_compressor.py | 356 ++++++++++++++++++ 3 files changed, 755 insertions(+) create mode 100644 autodist/kernel/new_graph_transformer.py create mode 100644 autodist/kernel/synchronization/new_all_reduce_synchronizer.py create mode 100644 autodist/kernel/synchronization/new_compressor.py diff --git a/autodist/kernel/new_graph_transformer.py b/autodist/kernel/new_graph_transformer.py new file mode 100644 index 0000000..15430c4 --- /dev/null +++ b/autodist/kernel/new_graph_transformer.py @@ -0,0 +1,202 @@ +# Copyright 2020 Petuum, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Graph Transformer.""" + +from tensorflow.core.framework.attr_value_pb2 import AttrValue as pb2_AttrValue +from tensorflow.python.eager import context +from tensorflow.python.framework import device_spec + +from autodist.graph_item import GraphItem +from autodist.kernel.partitioner import VariablePartitioner +from autodist.kernel.replicator import Replicator +from autodist.kernel.synchronization.synchronizer import Synchronizer +from autodist.kernel.synchronization.compressor import Comopressor +from autodist.utils import logging, visualization_util + + +class GraphTransformer: + """ + Graph Transformer. + + This is the bulk of the AutoDist backend logic, taking a single-node, + single-GPU graph and transforming it into a distributed + graph. This all happens based on the `Strategy` provided. + + The transformation occurs over several steps: + + 1. Partitions the necessary variables + 2. Replicates the graph the desired number of times + 3. Within the graph, synchronizes gradients with in-graph logic + 4. Adds between-graph gradient synchronization logic + + """ + + def __init__(self, compiled_strategy, cluster, graph_item): + self._strategy = compiled_strategy + self._cluster = cluster + self.graph_item = graph_item + + # Set in _initialize_synchronizers + self._num_local_replicas = 0 + self._num_workers = 0 + self._synchronizers = {} + self._compressors = {} + + def transform(self): + """Call graph transformer to transform a graph item based on strategy and cluster.""" + logging.info('Transforming the original graph to a distributed graph...') + with context.graph_mode(): + graph_item = self.graph_item + # Ensure the transformation happens under graph mode, no matter the outer mode is under eager or graph. + + visualization_util.log_graph(graph=graph_item.graph, name='0-original') + + graph_item, self._strategy.node_config = VariablePartitioner.apply(self._strategy.node_config, graph_item) + + visualization_util.log_graph(graph=graph_item.graph, name='1-after-partition') + + # Create Synchronizers for each node in the strategy + self._initialize_synchronizers_and_copmpressors() + + # Replicate the graph (both in-graph and between-graph) + new_graph_item = Replicator.apply( + config=self._strategy.graph_config.replicas, + cluster=self._cluster, + graph_item=graph_item + ) + + # Apply synchronizers + if self._num_local_replicas >= 1: + new_graph_item = self._in_graph_apply(new_graph_item) + logging.debug('Successfully applied local in-graph replication') + visualization_util.log_graph(new_graph_item.graph, '2-after-in-graph') + + if self._num_workers >= 1: + new_graph_item = self._between_graph_apply(new_graph_item) + logging.debug('Successfully applied between-graph replication') + + final_item = new_graph_item + logging.info('Successfully built the distributed graph.') + visualization_util.log_graph(graph=final_item.graph, name='3-transformed') + + return final_item + + def _initialize_synchronizers_and_copmpressors(self): + self._synchronizers = {} + for node in self._strategy.node_config: + partitioner = getattr(node, 'partitioner') + if partitioner: + for part in node.part_config: + self._synchronizers[part.var_name] = \ + Synchronizer.create(part.WhichOneof('synchronizer'), + getattr(part, part.WhichOneof('synchronizer'))) + self._compressors[part.var_name] = Compressor.create(part.compressor, part.var_name) + else: + self._synchronizers[node.var_name] = \ + Synchronizer.create(node.WhichOneof('synchronizer'), + getattr(node, node.WhichOneof('synchronizer'))) + self._compressors[node.var_name] = Compressor.create(node.compressor, node.var_name) + + config = self._strategy.graph_config.replicas + replica_devices = {device_spec.DeviceSpecV2.from_string(s) for s in config} + replica_hosts = {self._cluster.get_address_from_task(d.job, d.task) for d in replica_devices} + self._num_workers = len(replica_hosts) + + local_canonical_replica_devices = sorted({ + d.to_string() for d in replica_devices + if self._cluster.get_local_address() == self._cluster.get_address_from_task(d.job, d.task) + }) + logging.debug('Local replica devices: {}'.format(local_canonical_replica_devices)) + self._num_local_replicas = len(local_canonical_replica_devices) + + local_worker_id = self._cluster.get_local_worker_task_index() + local_worker_device = '/job:worker/task:{}'.format(local_worker_id) + + for synchronizer in self._synchronizers.values(): + synchronizer.assign_cluster_information( + num_workers=self._num_workers, + num_replicas=self._num_local_replicas, + worker_device=local_worker_device, + worker_id=local_worker_id, + canonical_replica_devices=sorted({d.to_string() for d in replica_devices}), + is_chief=self._cluster.is_chief()) + + for compressor in self._compressors.values(): + compressor.assign_cluster_information( + num_workers=self._num_workers, + num_replicas=self._num_local_replicas, + worker_device=local_worker_device, + worker_id=local_worker_id, + canonical_replica_devices=sorted({d.to_string() for d in replica_devices}), + is_chief=self._cluster.is_chief()) + + @property + def num_local_replicas(self): + """Return the number of local replicas.""" + assert self._num_local_replicas != 0 # ensure initialized + return self._num_local_replicas + + def _in_graph_apply(self, graph_item: GraphItem): + """ + Perform in-graph synchronization of the graph. + + Args: + graph_item (GraphItem): The graph to replication. + + Returns: + GraphItem + """ + new_graph_item = graph_item + for var_name, syncer in self._synchronizers.items(): + new_graph_item = syncer.in_graph_apply(new_graph_item, var_name) + return new_graph_item + + def _between_graph_apply(self, multi_gpu_graph_item: GraphItem): + """ + Perform between-graph replication of the graph. + + Args: + multi_gpu_graph_item (GraphItem): The graph to replication. + + Returns: + GraphItem + """ + new_graph_item = multi_gpu_graph_item + for var_name, syncer in self._synchronizers.items(): + new_graph_item = syncer.between_graph_apply(new_graph_item, var_name) + self._prune_colocation_groups(new_graph_item) + # TODO: make this work + # update_shard_values_for_worker(num_workers, worker_id) + return new_graph_item + + # TODO(Hao): this seems still problematic + @staticmethod + def _prune_colocation_groups(graph_item): + for op in graph_item.graph.get_operations(): + # Now prune the graph to have the right colocation constraints + colocation_groups = [(c, graph_item.get_colocation_op(c)) for c in op.colocation_groups()] + # We don't want any colocation groups that are just this `op` + colocation_groups = [(c, bind_op) for (c, bind_op) in colocation_groups if bind_op != op] + if colocation_groups: + device_to_bind_to = colocation_groups[-1][1].device + new_colocation_groups = [c for (c, op) in colocation_groups if op.device == device_to_bind_to] + op._set_device(device_to_bind_to) + op._set_attr("_class", pb2_AttrValue(list=pb2_AttrValue.ListValue(s=new_colocation_groups))) + else: + try: + if op.get_attr("_class"): + op._clear_attr("_class") + except ValueError: + pass diff --git a/autodist/kernel/synchronization/new_all_reduce_synchronizer.py b/autodist/kernel/synchronization/new_all_reduce_synchronizer.py new file mode 100644 index 0000000..6a32f31 --- /dev/null +++ b/autodist/kernel/synchronization/new_all_reduce_synchronizer.py @@ -0,0 +1,197 @@ +# Copyright 2020 Petuum, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""AllReduce Synchronizer.""" +from collections import defaultdict + +from tensorflow.python import ops +from tensorflow.python.framework import device_spec +from tensorflow.python.ops import collective_ops + +import autodist +from autodist.const import ENV +from autodist.kernel.common.utils import get_consumers, update_consumers, \ + replica_prefix, get_control_consumers, update_control_consumers +from autodist.kernel.common.utils import get_op_name +from autodist.kernel.synchronization.collective_key import get_collective_keys +from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig +from autodist.kernel.synchronization.synchronizer import Synchronizer +from autodist.proto import synchronizers_pb2, compressor_pb2 +from autodist.utils import logging + + +class AllReduceSynchronizer(Synchronizer): + """ + AllReduce Synchronizer. + + This AllReduce Synchronizer currently uses TensorFlow's `collective_device_ops` + to insert their AllReduce ops into our graph. + + The class AllReduceSynchronizer class contains the following possible instantiations: + + 1. spec=`auto`: single-node multiple devices, or cross-node AllReduce based on collective ops + 2. spec=`nccl`: single-node multiple devices, or cross-node AllReduce based on NCCL + 3. spec=`ring`/'tree', AllReduce with different reduction structures: ring, tree, etc. + + However note that it does not contain the following instantiations: + + 1. shuffle reduce (reduce to CPU or GPU as in PS) + AllReduce across nodes + 2. any other types of hybrid reduction of PS and AllReduce. + """ + + def __init__(self, config: synchronizers_pb2.AllReduceSynchronizer, compressor_value): + self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(config.spec) + if autodist.float_major_minor_tf_version < 1.15 or autodist.float_major_minor_tf_version < 2.1: + logging.warning('Collective synchronizer spec "{}" a.k.a communication_hint has no effect ' + 'until tensorflow-gpu 1.x>= 1.15 or 2.x>=2.1. It may cause error currently.' + .format(self._spec)) + self._spec = None + + self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) + + # Collective ops within the same group will be merged by the scoped optimizer. + # Normally the group index shall be smaller than the number of variables in the graph; this kernel assumes + # the strategy will validate the group assignments are legitimate. + self._group = config.group + super().__init__() + + def in_graph_apply(self, graph_item, var_name): + """ + Perform in-graph synchronization based on AllReduce and TensorFlow Collective Ops. + + Note that collective ops now only supports dense tensors. + + Args: + graph_item (graph_item.GraphItem): the graph_item to be distributed + var_name (str): the corresponded variable name + + Returns: + graph_item.GraphItem: The new graph + """ + # Skip allreduce synchronizer when rank <= 1 + if self.num_replicas * self.num_workers <= 1: + return graph_item + + item = graph_item + var_op_name = get_op_name(var_name) + + # Throw an error if the variable is sparse + master_op_name = ops.prepend_name_scope(var_op_name, replica_prefix(0)) + grad, _, _ = graph_item.var_op_name_to_grad_info[master_op_name] + with item.graph.as_default(): + self._share_initializer(item, var_op_name, master_replica=0) + if isinstance(grad, ops.IndexedSlices): + self._collect_sparse_gradients(item, var_op_name) + else: + self._collect_dense_gradients(item, var_op_name) + return item + + def _collect_dense_gradients(self, graph_item, var_op_name): + """Append collective ops after the gradient is calculated.""" + if self.num_replicas * self.num_workers <= 1: + raise ValueError('CollectiveOps requires collective group size > 1') + + compressors = defaultdict(lambda: Compressor.create(self._compressor_type, var_op_name)) + + conf = CollectiveOpsConfig() + conf.group_size = len(self.all_canonical_replica_devices) + conf.group_key = get_collective_keys().get_group_key(self.all_canonical_replica_devices) + conf.instance_key = get_collective_keys().get_instance_key(var_op_name) + conf.merge_op = 'Add' + conf.final_op = 'Div' + if self._spec: + setattr(conf, 'communication_hint', self._spec) + + for i in range(0, self.num_replicas): + op_name = ops.prepend_name_scope(var_op_name, replica_prefix(i)) + grad, _, _ = graph_item.var_op_name_to_grad_info[op_name] + # TODO (Tairui): (3) Merge of reduction for performance + grad_consumers = get_consumers(grad.op) # this line must happen before the reduction + + # "\/" is added for name scope reuse + with ops.name_scope(replica_prefix(i) + "/collective-group-{}/".format(self._group)): + with ops.colocate_with(grad.op): + reduced_grad = compressors[i].reduce(grad, conf) + update_consumers(grad_consumers, grad, reduced_grad) + # TODO(Hao): update grad, target pair here or not? + + def _collect_sparse_gradients(self, graph_item, var_op_name): + """Append collective ops after the gradient is calculated.""" + if self.num_workers > 1 and not ENV.AUTODIST_INTERNAL_TF.value: + raise NotImplementedError('Currently the collective NCCL AllGather is not supported in TensorFlow release.' + 'Please choose another strategy.') + conf = {} + if self._spec: + conf = {'communication_hint': self._spec} + if self._compressor_type: + logging.warning('AllGather currently does not support AutoDist compressor so it skips.') + if self.num_replicas * self.num_workers <= 1: + raise ValueError('CollectiveOps requires collective group size > 1') + for i in range(0, self.num_replicas): + op_name = ops.prepend_name_scope(var_op_name, replica_prefix(i)) + grad, _, _ = graph_item.var_op_name_to_grad_info[op_name] + # TODO (Tairui): (3) Merge of reduction for performance + indices_c_ops = grad.indices.consumers() + indices_cc_ops = get_control_consumers(grad.indices.op) + values_c_ops = grad.values.consumers() + values_cc_ops = get_control_consumers(grad.values.op) + with ops.name_scope(replica_prefix(i)): + with ops.colocate_with(grad.indices.op): + new_indices = collective_ops.all_gather( + grad.indices, + self.num_replicas * self.num_workers, + get_collective_keys().get_group_key(self.all_canonical_replica_devices), + get_collective_keys().get_instance_key(var_op_name + '-indices'), + **conf + ) + with ops.colocate_with(grad.values.op): + new_values = collective_ops.all_gather( + grad.values, + self.num_replicas * self.num_workers, + get_collective_keys().get_group_key(self.all_canonical_replica_devices), + get_collective_keys().get_instance_key(var_op_name + '-values'), + **conf + ) + update_consumers(indices_c_ops, grad.indices, new_indices) + update_control_consumers(indices_cc_ops, grad.indices.op, new_indices.op) + update_consumers(values_c_ops, grad.values, new_values) + update_control_consumers(values_cc_ops, grad.values.op, new_values) + + def _share_initializer(self, graph_item, var_op_name, master_replica=0): + """Share the initializers of all replica variables to use initializer on replica=master_replica.""" + # find the initial value of the var on master_replica + master_var_op = graph_item.graph.get_operation_by_name( + ops.prepend_name_scope(var_op_name, replica_prefix(master_replica))) + master_var = graph_item.trainable_var_op_to_var[master_var_op] + master_init_tensor = graph_item.graph.get_tensor_by_name(master_var.initial_value.name) + master_init_op = master_init_tensor.op + # set the device of the init ops to reside on the chief device + master_init_device = device_spec.DeviceSpecV2.from_string(master_init_op.device) \ + .replace(task=0) + master_init_op._set_device_from_string(master_init_device.to_string()) + + for i in range(0, self.num_replicas): + if i == master_replica: + continue + var_op = graph_item.graph.get_operation_by_name( + ops.prepend_name_scope(var_op_name, replica_prefix(i))) + var = graph_item.trainable_var_op_to_var[var_op] + init_op = graph_item.graph.get_tensor_by_name(var.initial_value.name).op + init_assign_op = get_consumers(init_op)[0] + init_assign_op._update_input(1, master_init_tensor) + + # pylint: disable=no-self-use + def between_graph_apply(self, graph_item, var_name): + """Allreduce synchronizer will do nothing in between-graph synchronization.""" + return graph_item diff --git a/autodist/kernel/synchronization/new_compressor.py b/autodist/kernel/synchronization/new_compressor.py new file mode 100644 index 0000000..ce4368b --- /dev/null +++ b/autodist/kernel/synchronization/new_compressor.py @@ -0,0 +1,356 @@ +# Copyright 2020 Petuum, Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Gradient Compressors for All-Reduce.""" +from abc import ABC, abstractmethod +from tensorflow.python.framework import dtypes +from tensorflow.python.framework.ops import Tensor +from tensorflow.python.ops import collective_ops, math_ops + +from autodist.kernel.common.utils import get_op_name +from autodist.proto import compressor_pb2 + +#from tensorflow.python.ops import array_ops, collective_ops, linalg_ops, math_ops, random_ops +#from autodist.kernel.synchronization.collective_key import get_collective_keys +#from autodist.utils import logging + + +class CollectiveOpsConfig: + """Config for using Collective Ops.""" + + group_size: int + group_key: str + instance_key: str + merge_op: str + final_op: str + + +class Compressor(ABC): + """ + Wraps CollectiveOps.All_Reduce with compression and decompression for network efficiency. + + This means that it only wraps gradient transmission for AllReduce + synchronized variables, not PS ops or other ops like network reads. + """ + + def __init__(self, var_name): + var_op_name = get_op_name(var_name) + self.var_op_name = var_op_name + self.num_workers = None + self.num_replicas = None + self.worker_device = None + self.worker_id = None + self.is_chief = None + self.all_canonical_replica_devices = None + + @abstractmethod + def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + """ + Compress, reduce, and decompress a given tensor. + + Args: + tensor (Tensor): the Tensor to reduce. + conf (CollectiveOpsConfig): the config for Collective Ops. + + Returns: + Reduced Tensor + """ + + @abstractmethod + def _compress(self, tensor: Tensor): + """ + Compress a given tensor. + + Args: + tensor (Tensor): the Tensor to compress. + + Returns: + Tensor + """ + + @abstractmethod + def _decompress(self, compressed_tensor: Tensor): + """ + Decompress a given tensor. + + Args: + compressed_tensor (Tensor): the Tensor to decompress. + + Returns: + Tensor, Context + """ + + @staticmethod + def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): + """ + Using CollectiveOps, AllReduce the given tensor. + + Args: + tensor (Tensor): the tensor to all-reduce + conf (CollectiveOpsConfig): the config for CollectiveOps + + Returns: + The All-Reduced Tensor + """ + return collective_ops.all_reduce(tensor, **conf.__dict__) + + + def in_graph_compress(self, graph_item, var_name): + item = graph_item + var_op_name = get_op_name(var_name) + master_op_name = ops.prepend_name_scope(var_op_name, replica_prefix(0)) + grad, _, _ = graph_item.var_op_name_to_grad_info[master_op_name] + with item.graph.as_default(): + self._share_initializer(item, var_op_name, master_replica=0) + if isinstance(grad, ops.IndexedSlices): + self._compress_sparse_gradients(item, var_op_name) + return item + + + def in_graph_decompress(self, graph_item, var_name): + item = graph_item + var_op_name = get_op_name(var_name) + master_op_name = ops.prepend_name_scope(var_op_name, replica_prefix(0)) + + def _compress_sparse_gradients(item, var_op_name): + + + + def _share_initializer(self, graph_item, var_op_name, master_replica=0): + """Share the initializers of all replica variables to use initializer on replica=master_replica.""" + # find the initial value of the var on master_replica + master_var_op = graph_item.graph.get_operation_by_name( + ops.prepend_name_scope(var_op_name, replica_prefix(master_replica))) + master_var = graph_item.trainable_var_op_to_var[master_var_op] + master_init_tensor = graph_item.graph.get_tensor_by_name(master_var.initial_value.name) + master_init_op = master_init_tensor.op + # set the device of the init ops to reside on the chief device + master_init_device = device_spec.DeviceSpecV2.from_string(master_init_op.device) \ + .replace(task=0) + master_init_op._set_device_from_string(master_init_device.to_string()) + + for i in range(0, self.num_replicas): + if i == master_replica: + continue + var_op = graph_item.graph.get_operation_by_name( + ops.prepend_name_scope(var_op_name, replica_prefix(i))) + var = graph_item.trainable_var_op_to_var[var_op] + init_op = graph_item.graph.get_tensor_by_name(var.initial_value.name).op + init_assign_op = get_consumers(init_op)[0] + init_assign_op._update_input(1, master_init_tensor) + + def assign_cluster_information(self, + num_workers, + num_replicas, + worker_device, + worker_id, + canonical_replica_devices, + is_chief=False): + """Store cluster information in the synchronizer.""" + self.num_workers = num_workers + self.num_replicas = num_replicas + self.worker_device = worker_device # local worker device + self.worker_id = worker_id # local worker id + self.all_canonical_replica_devices = canonical_replica_devices + self.is_chief = is_chief + return self + + @classmethod + def create(cls, name, *args, **kwargs): + """ + Create new Compressor instance given subclass name. + + Args: + name: Name of the Compressor subclass (e.g. NoneCompressor). + *args: Any args for the subclass constructor. + **kwargs: Any kwargs for the subclass constructor. + + Returns: + Compressor + """ + name = compressor_pb2.Compressor.Type.Name(compressor_value) + subclass = next(subclass for subclass in cls._get_subclasses() if subclass.__name__ == name) + return subclass(*args, **kwargs) + + @classmethod + def _get_subclasses(cls): + return set(cls.__subclasses__()).union([s for c in cls.__subclasses__() for s in c._get_subclasses()]) + + +# pylint: disable=abstract-method +class CompressorEF(Compressor, ABC): + """A Compressor with Error Feedback.""" + + def __init__(self, var_op_name): + self.error = None + super().__init__(var_op_name) + + def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + """ + Compress, reduce, and decompress a given tensor. + + Args: + tensor (Tensor): the Tensor to reduce. + conf (CollectiveOpsConfig): the config for Collective Ops. + + Returns: + Reduced Tensor + """ + if self.error is not None: + tensor += self.error + compressed_tensor = self._compress(tensor) + self.error = tensor - self._decompress(compressed_tensor) + reduced = self._all_reduce(compressed_tensor, conf) + return self._decompress(reduced) + + +class NoneCompressor(Compressor): + """An identity Compressor.""" + + def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + """ + Compress, reduce, and decompress a given tensor. + + Args: + tensor (Tensor): the Tensor to reduce. + conf (CollectiveOpsConfig): the config for Collective Ops. + + Returns: + Reduced Tensor + """ + return self._all_reduce(tensor, conf) + + def _compress(self, tensor: Tensor): + return tensor + + def _decompress(self, compressed_tensor: Tensor): + return compressed_tensor + + +class HorovodCompressor(Compressor): + """Implements Horovod's Compression.""" + + def __init__(self, var_op_name): + self.dtype = None + super().__init__(var_op_name) + + def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + """ + Compress, reduce, and decompress a given tensor. + + Args: + tensor (Tensor): the Tensor to reduce. + conf (CollectiveOpsConfig): the config for Collective Ops. + + Returns: + Reduced Tensor + """ + compressed_tensor = self._compress(tensor) + reduced = self._all_reduce(compressed_tensor, conf) + return self._decompress(reduced) + + def _compress(self, tensor: Tensor): + self.dtype = tensor.dtype + tensor_compressed = tensor + if tensor.dtype.is_floating: + # Only allow compression from other floating point types + # TODO: dtypes.float16 if using TF2.1.x (errors in 2.0) + tensor_compressed = math_ops.cast(tensor, dtypes.float32) + return tensor_compressed + + def _decompress(self, compressed_tensor: Tensor): + return math_ops.cast(compressed_tensor, self.dtype) + + +class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works because of Method Resolution Order + """Horovod's Compression but with Error Feedback.""" + + +# class PowerSGDCompressor(CompressorEF): +# """An implementation of the PowerSGD compression algorithm (arxiv.org/abs/1905.13727).""" + +# def __init__(self, var_op_name, rank=1): +# self.rank = rank +# self.og_shape, self.ndims, self.new_shape, self.compressor = None, None, None, None +# super().__init__(var_op_name) + +# def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): +# """ +# Compress, reduce, and decompress a given tensor. + +# Args: +# tensor (Tensor): the Tensor to reduce. +# conf (CollectiveOpsConfig): the config for Collective Ops. + +# Returns: +# Reduced Tensor +# """ +# if self.og_shape is None: +# self.og_shape = tensor.shape +# self.ndims = len(self.og_shape) + +# # Check if rank 1 tensor (this shouldn't be called with sparse tensors) +# # Just reduce it if it is, no need to compress +# if self._is_1d: +# return self._all_reduce(tensor, conf) + +# logging.info(f"Compressing tensor {tensor.name} (var {self.var_op_name}) with shape {tensor.shape}") +# if self.ndims > 2: +# tensor = array_ops.reshape(tensor, [self.og_shape[0], -1]) + +# if self.compressor is None: +# self.new_shape = array_ops.shape_v2(tensor) +# self.compressor = random_ops.random_normal([self.new_shape[1], self.rank]) + +# if self.error is not None: +# tensor += self.error + +# compressed_tensor = self._compress(tensor) +# self.error = tensor - self._decompress(compressed_tensor) + +# # all reduce mean p +# reduced = self._all_reduce(compressed_tensor, conf) +# reduced = self._orthogonalize(reduced) + +# # update compressor +# self.compressor = math_ops.matmul(tensor, reduced, transpose_a=True) +# conf.instance_key = get_collective_keys().get_instance_key(self.var_op_name + "/compressor") +# self.compressor = self._all_reduce(self.compressor, conf) +# return array_ops.reshape(self._decompress(reduced), self.og_shape) \ +# if self.ndims > 2 else self._decompress(reduced) + +# def _compress(self, tensor: Tensor): +# return math_ops.matmul(tensor, self.compressor) + +# def _decompress(self, compressed_tensor: Tensor): +# return math_ops.matmul(compressed_tensor, self.compressor, transpose_b=True) + +# @property +# def _is_1d(self): +# return self.ndims <= 1 or ( +# self.ndims == 2 and any(d == 1 for d in self.og_shape) +# ) + +# @staticmethod +# def _orthogonalize(matrix): +# _, m = matrix.shape +# for i in range(m): +# v = matrix[:, i] +# v /= linalg_ops.norm_v2(v) +# v = array_ops.expand_dims_v2(v, 1) + +# begin, rest = matrix[:, :i], matrix[:, (i + 1):] +# rest -= math_ops.matmul(v, rest, transpose_a=True) * v +# matrix = array_ops.concat([begin, v, rest], 1) +# return matrix From 651d0be76b3233149e041e198fb1f5e173be917d Mon Sep 17 00:00:00 2001 From: Tony Zhang Date: Sun, 1 Nov 2020 18:07:57 -0500 Subject: [PATCH 3/7] not tested --- autodist/kernel/graph_transformer.py | 6 +- autodist/kernel/new_graph_transformer.py | 202 ---------- .../all_reduce_synchronizer.py | 36 +- autodist/kernel/synchronization/compressor.py | 149 ++++---- .../new_all_reduce_synchronizer.py | 197 ---------- .../kernel/synchronization/new_compressor.py | 356 ------------------ .../kernel/synchronization/ps_synchronizer.py | 16 +- .../kernel/synchronization/synchronizer.py | 1 + tests/integration/test_all.py | 2 +- 9 files changed, 119 insertions(+), 846 deletions(-) delete mode 100644 autodist/kernel/new_graph_transformer.py delete mode 100644 autodist/kernel/synchronization/new_all_reduce_synchronizer.py delete mode 100644 autodist/kernel/synchronization/new_compressor.py diff --git a/autodist/kernel/graph_transformer.py b/autodist/kernel/graph_transformer.py index e2ce5b4..6f28938 100644 --- a/autodist/kernel/graph_transformer.py +++ b/autodist/kernel/graph_transformer.py @@ -99,13 +99,11 @@ def _initialize_synchronizers(self): for part in node.part_config: self._synchronizers[part.var_name] = \ Synchronizer.create(part.WhichOneof('synchronizer'), - getattr(part, part.WhichOneof('synchronizer')), - part.compressor) + part) else: self._synchronizers[node.var_name] = \ Synchronizer.create(node.WhichOneof('synchronizer'), - getattr(node, node.WhichOneof('synchronizer')). - node.compressor) + node) config = self._strategy.graph_config.replicas replica_devices = {device_spec.DeviceSpecV2.from_string(s) for s in config} diff --git a/autodist/kernel/new_graph_transformer.py b/autodist/kernel/new_graph_transformer.py deleted file mode 100644 index 15430c4..0000000 --- a/autodist/kernel/new_graph_transformer.py +++ /dev/null @@ -1,202 +0,0 @@ -# Copyright 2020 Petuum, Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Graph Transformer.""" - -from tensorflow.core.framework.attr_value_pb2 import AttrValue as pb2_AttrValue -from tensorflow.python.eager import context -from tensorflow.python.framework import device_spec - -from autodist.graph_item import GraphItem -from autodist.kernel.partitioner import VariablePartitioner -from autodist.kernel.replicator import Replicator -from autodist.kernel.synchronization.synchronizer import Synchronizer -from autodist.kernel.synchronization.compressor import Comopressor -from autodist.utils import logging, visualization_util - - -class GraphTransformer: - """ - Graph Transformer. - - This is the bulk of the AutoDist backend logic, taking a single-node, - single-GPU graph and transforming it into a distributed - graph. This all happens based on the `Strategy` provided. - - The transformation occurs over several steps: - - 1. Partitions the necessary variables - 2. Replicates the graph the desired number of times - 3. Within the graph, synchronizes gradients with in-graph logic - 4. Adds between-graph gradient synchronization logic - - """ - - def __init__(self, compiled_strategy, cluster, graph_item): - self._strategy = compiled_strategy - self._cluster = cluster - self.graph_item = graph_item - - # Set in _initialize_synchronizers - self._num_local_replicas = 0 - self._num_workers = 0 - self._synchronizers = {} - self._compressors = {} - - def transform(self): - """Call graph transformer to transform a graph item based on strategy and cluster.""" - logging.info('Transforming the original graph to a distributed graph...') - with context.graph_mode(): - graph_item = self.graph_item - # Ensure the transformation happens under graph mode, no matter the outer mode is under eager or graph. - - visualization_util.log_graph(graph=graph_item.graph, name='0-original') - - graph_item, self._strategy.node_config = VariablePartitioner.apply(self._strategy.node_config, graph_item) - - visualization_util.log_graph(graph=graph_item.graph, name='1-after-partition') - - # Create Synchronizers for each node in the strategy - self._initialize_synchronizers_and_copmpressors() - - # Replicate the graph (both in-graph and between-graph) - new_graph_item = Replicator.apply( - config=self._strategy.graph_config.replicas, - cluster=self._cluster, - graph_item=graph_item - ) - - # Apply synchronizers - if self._num_local_replicas >= 1: - new_graph_item = self._in_graph_apply(new_graph_item) - logging.debug('Successfully applied local in-graph replication') - visualization_util.log_graph(new_graph_item.graph, '2-after-in-graph') - - if self._num_workers >= 1: - new_graph_item = self._between_graph_apply(new_graph_item) - logging.debug('Successfully applied between-graph replication') - - final_item = new_graph_item - logging.info('Successfully built the distributed graph.') - visualization_util.log_graph(graph=final_item.graph, name='3-transformed') - - return final_item - - def _initialize_synchronizers_and_copmpressors(self): - self._synchronizers = {} - for node in self._strategy.node_config: - partitioner = getattr(node, 'partitioner') - if partitioner: - for part in node.part_config: - self._synchronizers[part.var_name] = \ - Synchronizer.create(part.WhichOneof('synchronizer'), - getattr(part, part.WhichOneof('synchronizer'))) - self._compressors[part.var_name] = Compressor.create(part.compressor, part.var_name) - else: - self._synchronizers[node.var_name] = \ - Synchronizer.create(node.WhichOneof('synchronizer'), - getattr(node, node.WhichOneof('synchronizer'))) - self._compressors[node.var_name] = Compressor.create(node.compressor, node.var_name) - - config = self._strategy.graph_config.replicas - replica_devices = {device_spec.DeviceSpecV2.from_string(s) for s in config} - replica_hosts = {self._cluster.get_address_from_task(d.job, d.task) for d in replica_devices} - self._num_workers = len(replica_hosts) - - local_canonical_replica_devices = sorted({ - d.to_string() for d in replica_devices - if self._cluster.get_local_address() == self._cluster.get_address_from_task(d.job, d.task) - }) - logging.debug('Local replica devices: {}'.format(local_canonical_replica_devices)) - self._num_local_replicas = len(local_canonical_replica_devices) - - local_worker_id = self._cluster.get_local_worker_task_index() - local_worker_device = '/job:worker/task:{}'.format(local_worker_id) - - for synchronizer in self._synchronizers.values(): - synchronizer.assign_cluster_information( - num_workers=self._num_workers, - num_replicas=self._num_local_replicas, - worker_device=local_worker_device, - worker_id=local_worker_id, - canonical_replica_devices=sorted({d.to_string() for d in replica_devices}), - is_chief=self._cluster.is_chief()) - - for compressor in self._compressors.values(): - compressor.assign_cluster_information( - num_workers=self._num_workers, - num_replicas=self._num_local_replicas, - worker_device=local_worker_device, - worker_id=local_worker_id, - canonical_replica_devices=sorted({d.to_string() for d in replica_devices}), - is_chief=self._cluster.is_chief()) - - @property - def num_local_replicas(self): - """Return the number of local replicas.""" - assert self._num_local_replicas != 0 # ensure initialized - return self._num_local_replicas - - def _in_graph_apply(self, graph_item: GraphItem): - """ - Perform in-graph synchronization of the graph. - - Args: - graph_item (GraphItem): The graph to replication. - - Returns: - GraphItem - """ - new_graph_item = graph_item - for var_name, syncer in self._synchronizers.items(): - new_graph_item = syncer.in_graph_apply(new_graph_item, var_name) - return new_graph_item - - def _between_graph_apply(self, multi_gpu_graph_item: GraphItem): - """ - Perform between-graph replication of the graph. - - Args: - multi_gpu_graph_item (GraphItem): The graph to replication. - - Returns: - GraphItem - """ - new_graph_item = multi_gpu_graph_item - for var_name, syncer in self._synchronizers.items(): - new_graph_item = syncer.between_graph_apply(new_graph_item, var_name) - self._prune_colocation_groups(new_graph_item) - # TODO: make this work - # update_shard_values_for_worker(num_workers, worker_id) - return new_graph_item - - # TODO(Hao): this seems still problematic - @staticmethod - def _prune_colocation_groups(graph_item): - for op in graph_item.graph.get_operations(): - # Now prune the graph to have the right colocation constraints - colocation_groups = [(c, graph_item.get_colocation_op(c)) for c in op.colocation_groups()] - # We don't want any colocation groups that are just this `op` - colocation_groups = [(c, bind_op) for (c, bind_op) in colocation_groups if bind_op != op] - if colocation_groups: - device_to_bind_to = colocation_groups[-1][1].device - new_colocation_groups = [c for (c, op) in colocation_groups if op.device == device_to_bind_to] - op._set_device(device_to_bind_to) - op._set_attr("_class", pb2_AttrValue(list=pb2_AttrValue.ListValue(s=new_colocation_groups))) - else: - try: - if op.get_attr("_class"): - op._clear_attr("_class") - except ValueError: - pass diff --git a/autodist/kernel/synchronization/all_reduce_synchronizer.py b/autodist/kernel/synchronization/all_reduce_synchronizer.py index 98a3ac3..177d368 100644 --- a/autodist/kernel/synchronization/all_reduce_synchronizer.py +++ b/autodist/kernel/synchronization/all_reduce_synchronizer.py @@ -25,11 +25,24 @@ replica_prefix, get_control_consumers, update_control_consumers from autodist.kernel.common.utils import get_op_name from autodist.kernel.synchronization.collective_key import get_collective_keys -from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig +# from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig +from autodist.kernel.synchronization.compressor import Compressor from autodist.kernel.synchronization.synchronizer import Synchronizer -from autodist.proto import synchronizers_pb2, compressor_pb2 +from autodist.proto import synchronizers_pb2, compressor_pb2, strategy_pb2 from autodist.utils import logging +from tensorflow.python.ops import collective_ops + + +class CollectiveOpsConfig: + """Config for using Collective Ops.""" + + group_size: int + group_key: str + instance_key: str + merge_op: str + final_op: str + class AllReduceSynchronizer(Synchronizer): """ @@ -50,21 +63,28 @@ class AllReduceSynchronizer(Synchronizer): 2. any other types of hybrid reduction of PS and AllReduce. """ - def __init__(self, config: synchronizers_pb2.AllReduceSynchronizer, compressor_value): - self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(config.spec) + def __init__(self, config: strategy_pb2.Strategy.Node): + compressor_value = getattr(config, 'compressor') + syncer_config = getattr(config, config.WhichOneof('synchronizer')) + self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(syncer_config.spec) if autodist.float_major_minor_tf_version < 1.15 or autodist.float_major_minor_tf_version < 2.1: logging.warning('Collective synchronizer spec "{}" a.k.a communication_hint has no effect ' 'until tensorflow-gpu 1.x>= 1.15 or 2.x>=2.1. It may cause error currently.' .format(self._spec)) self._spec = None - self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) - # Collective ops within the same group will be merged by the scoped optimizer. # Normally the group index shall be smaller than the number of variables in the graph; this kernel assumes # the strategy will validate the group assignments are legitimate. - self._group = config.group + self._group = syncer_config.group super().__init__() + if compressor_value: + self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) + + def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): + + return collective_ops.all_reduce(tensor, **conf.__dict__) + def in_graph_apply(self, graph_item, var_name): """ @@ -122,7 +142,7 @@ def _collect_dense_gradients(self, graph_item, var_op_name): # "\/" is added for name scope reuse with ops.name_scope(replica_prefix(i) + "/collective-group-{}/".format(self._group)): with ops.colocate_with(grad.op): - reduced_grad = compressors[i].reduce(grad, conf) + reduced_grad = compressors[i].compress(grad, conf) update_consumers(grad_consumers, grad, reduced_grad) # TODO(Hao): update grad, target pair here or not? diff --git a/autodist/kernel/synchronization/compressor.py b/autodist/kernel/synchronization/compressor.py index 4878d48..e952a64 100644 --- a/autodist/kernel/synchronization/compressor.py +++ b/autodist/kernel/synchronization/compressor.py @@ -23,14 +23,14 @@ #from autodist.utils import logging -class CollectiveOpsConfig: - """Config for using Collective Ops.""" +# class CollectiveOpsConfig: +# """Config for using Collective Ops.""" - group_size: int - group_key: str - instance_key: str - merge_op: str - final_op: str +# group_size: int +# group_key: str +# instance_key: str +# merge_op: str +# final_op: str class Compressor(ABC): @@ -44,21 +44,21 @@ class Compressor(ABC): def __init__(self, var_op_name): self.var_op_name = var_op_name - @abstractmethod - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. + # @abstractmethod + # def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Compress, reduce, and decompress a given tensor. - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. + # Args: + # tensor (Tensor): the Tensor to reduce. + # conf (CollectiveOpsConfig): the config for Collective Ops. - Returns: - Reduced Tensor - """ + # Returns: + # Reduced Tensor + # """ @abstractmethod - def _compress(self, tensor: Tensor): + def compress(self, tensor: Tensor): """ Compress a given tensor. @@ -70,7 +70,7 @@ def _compress(self, tensor: Tensor): """ @abstractmethod - def _decompress(self, compressed_tensor: Tensor): + def decompress(self, compressed_tensor: Tensor): """ Decompress a given tensor. @@ -81,19 +81,19 @@ def _decompress(self, compressed_tensor: Tensor): Tensor, Context """ - @staticmethod - def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): - """ - Using CollectiveOps, AllReduce the given tensor. + # @staticmethod + # def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Using CollectiveOps, AllReduce the given tensor. - Args: - tensor (Tensor): the tensor to all-reduce - conf (CollectiveOpsConfig): the config for CollectiveOps + # Args: + # tensor (Tensor): the tensor to all-reduce + # conf (CollectiveOpsConfig): the config for CollectiveOps - Returns: - The All-Reduced Tensor - """ - return collective_ops.all_reduce(tensor, **conf.__dict__) + # Returns: + # The All-Reduced Tensor + # """ + # return collective_ops.all_reduce(tensor, **conf.__dict__) @classmethod def create(cls, name, *args, **kwargs): @@ -124,45 +124,50 @@ def __init__(self, var_op_name): self.error = None super().__init__(var_op_name) - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. - - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. - - Returns: - Reduced Tensor - """ + # def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Compress, reduce, and decompress a given tensor. + + # Args: + # tensor (Tensor): the Tensor to reduce. + # conf (CollectiveOpsConfig): the config for Collective Ops. + + # Returns: + # Reduced Tensor + # """ + # if self.error is not None: + # tensor += self.error + # compressed_tensor = self._compress(tensor) + # self.error = tensor - self._decompress(compressed_tensor) + # reduced = self._all_reduce(compressed_tensor, conf) + # return self._decompress(reduced) + + def compute_error(self, tensor: Tensor): if self.error is not None: tensor += self.error compressed_tensor = self._compress(tensor) self.error = tensor - self._decompress(compressed_tensor) - reduced = self._all_reduce(compressed_tensor, conf) - return self._decompress(reduced) - class NoneCompressor(Compressor): """An identity Compressor.""" - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. + # def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Compress, reduce, and decompress a given tensor. - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. + # Args: + # tensor (Tensor): the Tensor to reduce. + # conf (CollectiveOpsConfig): the config for Collective Ops. - Returns: - Reduced Tensor - """ - return self._all_reduce(tensor, conf) + # Returns: + # Reduced Tensor + # """ + # return self._all_reduce(tensor, conf) - def _compress(self, tensor: Tensor): + def compress(self, tensor: Tensor): return tensor - def _decompress(self, compressed_tensor: Tensor): + def decompress(self, compressed_tensor: Tensor): return compressed_tensor @@ -173,22 +178,22 @@ def __init__(self, var_op_name): self.dtype = None super().__init__(var_op_name) - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. + # def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + # """ + # Compress, reduce, and decompress a given tensor. - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. + # Args: + # tensor (Tensor): the Tensor to reduce. + # conf (CollectiveOpsConfig): the config for Collective Ops. - Returns: - Reduced Tensor - """ - compressed_tensor = self._compress(tensor) - reduced = self._all_reduce(compressed_tensor, conf) - return self._decompress(reduced) + # Returns: + # Reduced Tensor + # """ + # compressed_tensor = self._compress(tensor) + # reduced = self._all_reduce(compressed_tensor, conf) + # return self._decompress(reduced) - def _compress(self, tensor: Tensor): + def compress(self, tensor: Tensor): self.dtype = tensor.dtype tensor_compressed = tensor if tensor.dtype.is_floating: @@ -197,12 +202,12 @@ def _compress(self, tensor: Tensor): tensor_compressed = math_ops.cast(tensor, dtypes.float32) return tensor_compressed - def _decompress(self, compressed_tensor: Tensor): + def decompress(self, compressed_tensor: Tensor): return math_ops.cast(compressed_tensor, self.dtype) -class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works because of Method Resolution Order - """Horovod's Compression but with Error Feedback.""" +# class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works because of Method Resolution Order +# """Horovod's Compression but with Error Feedback.""" # class PowerSGDCompressor(CompressorEF): diff --git a/autodist/kernel/synchronization/new_all_reduce_synchronizer.py b/autodist/kernel/synchronization/new_all_reduce_synchronizer.py deleted file mode 100644 index 6a32f31..0000000 --- a/autodist/kernel/synchronization/new_all_reduce_synchronizer.py +++ /dev/null @@ -1,197 +0,0 @@ -# Copyright 2020 Petuum, Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""AllReduce Synchronizer.""" -from collections import defaultdict - -from tensorflow.python import ops -from tensorflow.python.framework import device_spec -from tensorflow.python.ops import collective_ops - -import autodist -from autodist.const import ENV -from autodist.kernel.common.utils import get_consumers, update_consumers, \ - replica_prefix, get_control_consumers, update_control_consumers -from autodist.kernel.common.utils import get_op_name -from autodist.kernel.synchronization.collective_key import get_collective_keys -from autodist.kernel.synchronization.compressor import Compressor, CollectiveOpsConfig -from autodist.kernel.synchronization.synchronizer import Synchronizer -from autodist.proto import synchronizers_pb2, compressor_pb2 -from autodist.utils import logging - - -class AllReduceSynchronizer(Synchronizer): - """ - AllReduce Synchronizer. - - This AllReduce Synchronizer currently uses TensorFlow's `collective_device_ops` - to insert their AllReduce ops into our graph. - - The class AllReduceSynchronizer class contains the following possible instantiations: - - 1. spec=`auto`: single-node multiple devices, or cross-node AllReduce based on collective ops - 2. spec=`nccl`: single-node multiple devices, or cross-node AllReduce based on NCCL - 3. spec=`ring`/'tree', AllReduce with different reduction structures: ring, tree, etc. - - However note that it does not contain the following instantiations: - - 1. shuffle reduce (reduce to CPU or GPU as in PS) + AllReduce across nodes - 2. any other types of hybrid reduction of PS and AllReduce. - """ - - def __init__(self, config: synchronizers_pb2.AllReduceSynchronizer, compressor_value): - self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(config.spec) - if autodist.float_major_minor_tf_version < 1.15 or autodist.float_major_minor_tf_version < 2.1: - logging.warning('Collective synchronizer spec "{}" a.k.a communication_hint has no effect ' - 'until tensorflow-gpu 1.x>= 1.15 or 2.x>=2.1. It may cause error currently.' - .format(self._spec)) - self._spec = None - - self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) - - # Collective ops within the same group will be merged by the scoped optimizer. - # Normally the group index shall be smaller than the number of variables in the graph; this kernel assumes - # the strategy will validate the group assignments are legitimate. - self._group = config.group - super().__init__() - - def in_graph_apply(self, graph_item, var_name): - """ - Perform in-graph synchronization based on AllReduce and TensorFlow Collective Ops. - - Note that collective ops now only supports dense tensors. - - Args: - graph_item (graph_item.GraphItem): the graph_item to be distributed - var_name (str): the corresponded variable name - - Returns: - graph_item.GraphItem: The new graph - """ - # Skip allreduce synchronizer when rank <= 1 - if self.num_replicas * self.num_workers <= 1: - return graph_item - - item = graph_item - var_op_name = get_op_name(var_name) - - # Throw an error if the variable is sparse - master_op_name = ops.prepend_name_scope(var_op_name, replica_prefix(0)) - grad, _, _ = graph_item.var_op_name_to_grad_info[master_op_name] - with item.graph.as_default(): - self._share_initializer(item, var_op_name, master_replica=0) - if isinstance(grad, ops.IndexedSlices): - self._collect_sparse_gradients(item, var_op_name) - else: - self._collect_dense_gradients(item, var_op_name) - return item - - def _collect_dense_gradients(self, graph_item, var_op_name): - """Append collective ops after the gradient is calculated.""" - if self.num_replicas * self.num_workers <= 1: - raise ValueError('CollectiveOps requires collective group size > 1') - - compressors = defaultdict(lambda: Compressor.create(self._compressor_type, var_op_name)) - - conf = CollectiveOpsConfig() - conf.group_size = len(self.all_canonical_replica_devices) - conf.group_key = get_collective_keys().get_group_key(self.all_canonical_replica_devices) - conf.instance_key = get_collective_keys().get_instance_key(var_op_name) - conf.merge_op = 'Add' - conf.final_op = 'Div' - if self._spec: - setattr(conf, 'communication_hint', self._spec) - - for i in range(0, self.num_replicas): - op_name = ops.prepend_name_scope(var_op_name, replica_prefix(i)) - grad, _, _ = graph_item.var_op_name_to_grad_info[op_name] - # TODO (Tairui): (3) Merge of reduction for performance - grad_consumers = get_consumers(grad.op) # this line must happen before the reduction - - # "\/" is added for name scope reuse - with ops.name_scope(replica_prefix(i) + "/collective-group-{}/".format(self._group)): - with ops.colocate_with(grad.op): - reduced_grad = compressors[i].reduce(grad, conf) - update_consumers(grad_consumers, grad, reduced_grad) - # TODO(Hao): update grad, target pair here or not? - - def _collect_sparse_gradients(self, graph_item, var_op_name): - """Append collective ops after the gradient is calculated.""" - if self.num_workers > 1 and not ENV.AUTODIST_INTERNAL_TF.value: - raise NotImplementedError('Currently the collective NCCL AllGather is not supported in TensorFlow release.' - 'Please choose another strategy.') - conf = {} - if self._spec: - conf = {'communication_hint': self._spec} - if self._compressor_type: - logging.warning('AllGather currently does not support AutoDist compressor so it skips.') - if self.num_replicas * self.num_workers <= 1: - raise ValueError('CollectiveOps requires collective group size > 1') - for i in range(0, self.num_replicas): - op_name = ops.prepend_name_scope(var_op_name, replica_prefix(i)) - grad, _, _ = graph_item.var_op_name_to_grad_info[op_name] - # TODO (Tairui): (3) Merge of reduction for performance - indices_c_ops = grad.indices.consumers() - indices_cc_ops = get_control_consumers(grad.indices.op) - values_c_ops = grad.values.consumers() - values_cc_ops = get_control_consumers(grad.values.op) - with ops.name_scope(replica_prefix(i)): - with ops.colocate_with(grad.indices.op): - new_indices = collective_ops.all_gather( - grad.indices, - self.num_replicas * self.num_workers, - get_collective_keys().get_group_key(self.all_canonical_replica_devices), - get_collective_keys().get_instance_key(var_op_name + '-indices'), - **conf - ) - with ops.colocate_with(grad.values.op): - new_values = collective_ops.all_gather( - grad.values, - self.num_replicas * self.num_workers, - get_collective_keys().get_group_key(self.all_canonical_replica_devices), - get_collective_keys().get_instance_key(var_op_name + '-values'), - **conf - ) - update_consumers(indices_c_ops, grad.indices, new_indices) - update_control_consumers(indices_cc_ops, grad.indices.op, new_indices.op) - update_consumers(values_c_ops, grad.values, new_values) - update_control_consumers(values_cc_ops, grad.values.op, new_values) - - def _share_initializer(self, graph_item, var_op_name, master_replica=0): - """Share the initializers of all replica variables to use initializer on replica=master_replica.""" - # find the initial value of the var on master_replica - master_var_op = graph_item.graph.get_operation_by_name( - ops.prepend_name_scope(var_op_name, replica_prefix(master_replica))) - master_var = graph_item.trainable_var_op_to_var[master_var_op] - master_init_tensor = graph_item.graph.get_tensor_by_name(master_var.initial_value.name) - master_init_op = master_init_tensor.op - # set the device of the init ops to reside on the chief device - master_init_device = device_spec.DeviceSpecV2.from_string(master_init_op.device) \ - .replace(task=0) - master_init_op._set_device_from_string(master_init_device.to_string()) - - for i in range(0, self.num_replicas): - if i == master_replica: - continue - var_op = graph_item.graph.get_operation_by_name( - ops.prepend_name_scope(var_op_name, replica_prefix(i))) - var = graph_item.trainable_var_op_to_var[var_op] - init_op = graph_item.graph.get_tensor_by_name(var.initial_value.name).op - init_assign_op = get_consumers(init_op)[0] - init_assign_op._update_input(1, master_init_tensor) - - # pylint: disable=no-self-use - def between_graph_apply(self, graph_item, var_name): - """Allreduce synchronizer will do nothing in between-graph synchronization.""" - return graph_item diff --git a/autodist/kernel/synchronization/new_compressor.py b/autodist/kernel/synchronization/new_compressor.py deleted file mode 100644 index ce4368b..0000000 --- a/autodist/kernel/synchronization/new_compressor.py +++ /dev/null @@ -1,356 +0,0 @@ -# Copyright 2020 Petuum, Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Gradient Compressors for All-Reduce.""" -from abc import ABC, abstractmethod -from tensorflow.python.framework import dtypes -from tensorflow.python.framework.ops import Tensor -from tensorflow.python.ops import collective_ops, math_ops - -from autodist.kernel.common.utils import get_op_name -from autodist.proto import compressor_pb2 - -#from tensorflow.python.ops import array_ops, collective_ops, linalg_ops, math_ops, random_ops -#from autodist.kernel.synchronization.collective_key import get_collective_keys -#from autodist.utils import logging - - -class CollectiveOpsConfig: - """Config for using Collective Ops.""" - - group_size: int - group_key: str - instance_key: str - merge_op: str - final_op: str - - -class Compressor(ABC): - """ - Wraps CollectiveOps.All_Reduce with compression and decompression for network efficiency. - - This means that it only wraps gradient transmission for AllReduce - synchronized variables, not PS ops or other ops like network reads. - """ - - def __init__(self, var_name): - var_op_name = get_op_name(var_name) - self.var_op_name = var_op_name - self.num_workers = None - self.num_replicas = None - self.worker_device = None - self.worker_id = None - self.is_chief = None - self.all_canonical_replica_devices = None - - @abstractmethod - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. - - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. - - Returns: - Reduced Tensor - """ - - @abstractmethod - def _compress(self, tensor: Tensor): - """ - Compress a given tensor. - - Args: - tensor (Tensor): the Tensor to compress. - - Returns: - Tensor - """ - - @abstractmethod - def _decompress(self, compressed_tensor: Tensor): - """ - Decompress a given tensor. - - Args: - compressed_tensor (Tensor): the Tensor to decompress. - - Returns: - Tensor, Context - """ - - @staticmethod - def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): - """ - Using CollectiveOps, AllReduce the given tensor. - - Args: - tensor (Tensor): the tensor to all-reduce - conf (CollectiveOpsConfig): the config for CollectiveOps - - Returns: - The All-Reduced Tensor - """ - return collective_ops.all_reduce(tensor, **conf.__dict__) - - - def in_graph_compress(self, graph_item, var_name): - item = graph_item - var_op_name = get_op_name(var_name) - master_op_name = ops.prepend_name_scope(var_op_name, replica_prefix(0)) - grad, _, _ = graph_item.var_op_name_to_grad_info[master_op_name] - with item.graph.as_default(): - self._share_initializer(item, var_op_name, master_replica=0) - if isinstance(grad, ops.IndexedSlices): - self._compress_sparse_gradients(item, var_op_name) - return item - - - def in_graph_decompress(self, graph_item, var_name): - item = graph_item - var_op_name = get_op_name(var_name) - master_op_name = ops.prepend_name_scope(var_op_name, replica_prefix(0)) - - def _compress_sparse_gradients(item, var_op_name): - - - - def _share_initializer(self, graph_item, var_op_name, master_replica=0): - """Share the initializers of all replica variables to use initializer on replica=master_replica.""" - # find the initial value of the var on master_replica - master_var_op = graph_item.graph.get_operation_by_name( - ops.prepend_name_scope(var_op_name, replica_prefix(master_replica))) - master_var = graph_item.trainable_var_op_to_var[master_var_op] - master_init_tensor = graph_item.graph.get_tensor_by_name(master_var.initial_value.name) - master_init_op = master_init_tensor.op - # set the device of the init ops to reside on the chief device - master_init_device = device_spec.DeviceSpecV2.from_string(master_init_op.device) \ - .replace(task=0) - master_init_op._set_device_from_string(master_init_device.to_string()) - - for i in range(0, self.num_replicas): - if i == master_replica: - continue - var_op = graph_item.graph.get_operation_by_name( - ops.prepend_name_scope(var_op_name, replica_prefix(i))) - var = graph_item.trainable_var_op_to_var[var_op] - init_op = graph_item.graph.get_tensor_by_name(var.initial_value.name).op - init_assign_op = get_consumers(init_op)[0] - init_assign_op._update_input(1, master_init_tensor) - - def assign_cluster_information(self, - num_workers, - num_replicas, - worker_device, - worker_id, - canonical_replica_devices, - is_chief=False): - """Store cluster information in the synchronizer.""" - self.num_workers = num_workers - self.num_replicas = num_replicas - self.worker_device = worker_device # local worker device - self.worker_id = worker_id # local worker id - self.all_canonical_replica_devices = canonical_replica_devices - self.is_chief = is_chief - return self - - @classmethod - def create(cls, name, *args, **kwargs): - """ - Create new Compressor instance given subclass name. - - Args: - name: Name of the Compressor subclass (e.g. NoneCompressor). - *args: Any args for the subclass constructor. - **kwargs: Any kwargs for the subclass constructor. - - Returns: - Compressor - """ - name = compressor_pb2.Compressor.Type.Name(compressor_value) - subclass = next(subclass for subclass in cls._get_subclasses() if subclass.__name__ == name) - return subclass(*args, **kwargs) - - @classmethod - def _get_subclasses(cls): - return set(cls.__subclasses__()).union([s for c in cls.__subclasses__() for s in c._get_subclasses()]) - - -# pylint: disable=abstract-method -class CompressorEF(Compressor, ABC): - """A Compressor with Error Feedback.""" - - def __init__(self, var_op_name): - self.error = None - super().__init__(var_op_name) - - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. - - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. - - Returns: - Reduced Tensor - """ - if self.error is not None: - tensor += self.error - compressed_tensor = self._compress(tensor) - self.error = tensor - self._decompress(compressed_tensor) - reduced = self._all_reduce(compressed_tensor, conf) - return self._decompress(reduced) - - -class NoneCompressor(Compressor): - """An identity Compressor.""" - - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. - - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. - - Returns: - Reduced Tensor - """ - return self._all_reduce(tensor, conf) - - def _compress(self, tensor: Tensor): - return tensor - - def _decompress(self, compressed_tensor: Tensor): - return compressed_tensor - - -class HorovodCompressor(Compressor): - """Implements Horovod's Compression.""" - - def __init__(self, var_op_name): - self.dtype = None - super().__init__(var_op_name) - - def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): - """ - Compress, reduce, and decompress a given tensor. - - Args: - tensor (Tensor): the Tensor to reduce. - conf (CollectiveOpsConfig): the config for Collective Ops. - - Returns: - Reduced Tensor - """ - compressed_tensor = self._compress(tensor) - reduced = self._all_reduce(compressed_tensor, conf) - return self._decompress(reduced) - - def _compress(self, tensor: Tensor): - self.dtype = tensor.dtype - tensor_compressed = tensor - if tensor.dtype.is_floating: - # Only allow compression from other floating point types - # TODO: dtypes.float16 if using TF2.1.x (errors in 2.0) - tensor_compressed = math_ops.cast(tensor, dtypes.float32) - return tensor_compressed - - def _decompress(self, compressed_tensor: Tensor): - return math_ops.cast(compressed_tensor, self.dtype) - - -class HorovodCompressorEF(CompressorEF, HorovodCompressor): # This works because of Method Resolution Order - """Horovod's Compression but with Error Feedback.""" - - -# class PowerSGDCompressor(CompressorEF): -# """An implementation of the PowerSGD compression algorithm (arxiv.org/abs/1905.13727).""" - -# def __init__(self, var_op_name, rank=1): -# self.rank = rank -# self.og_shape, self.ndims, self.new_shape, self.compressor = None, None, None, None -# super().__init__(var_op_name) - -# def reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): -# """ -# Compress, reduce, and decompress a given tensor. - -# Args: -# tensor (Tensor): the Tensor to reduce. -# conf (CollectiveOpsConfig): the config for Collective Ops. - -# Returns: -# Reduced Tensor -# """ -# if self.og_shape is None: -# self.og_shape = tensor.shape -# self.ndims = len(self.og_shape) - -# # Check if rank 1 tensor (this shouldn't be called with sparse tensors) -# # Just reduce it if it is, no need to compress -# if self._is_1d: -# return self._all_reduce(tensor, conf) - -# logging.info(f"Compressing tensor {tensor.name} (var {self.var_op_name}) with shape {tensor.shape}") -# if self.ndims > 2: -# tensor = array_ops.reshape(tensor, [self.og_shape[0], -1]) - -# if self.compressor is None: -# self.new_shape = array_ops.shape_v2(tensor) -# self.compressor = random_ops.random_normal([self.new_shape[1], self.rank]) - -# if self.error is not None: -# tensor += self.error - -# compressed_tensor = self._compress(tensor) -# self.error = tensor - self._decompress(compressed_tensor) - -# # all reduce mean p -# reduced = self._all_reduce(compressed_tensor, conf) -# reduced = self._orthogonalize(reduced) - -# # update compressor -# self.compressor = math_ops.matmul(tensor, reduced, transpose_a=True) -# conf.instance_key = get_collective_keys().get_instance_key(self.var_op_name + "/compressor") -# self.compressor = self._all_reduce(self.compressor, conf) -# return array_ops.reshape(self._decompress(reduced), self.og_shape) \ -# if self.ndims > 2 else self._decompress(reduced) - -# def _compress(self, tensor: Tensor): -# return math_ops.matmul(tensor, self.compressor) - -# def _decompress(self, compressed_tensor: Tensor): -# return math_ops.matmul(compressed_tensor, self.compressor, transpose_b=True) - -# @property -# def _is_1d(self): -# return self.ndims <= 1 or ( -# self.ndims == 2 and any(d == 1 for d in self.og_shape) -# ) - -# @staticmethod -# def _orthogonalize(matrix): -# _, m = matrix.shape -# for i in range(m): -# v = matrix[:, i] -# v /= linalg_ops.norm_v2(v) -# v = array_ops.expand_dims_v2(v, 1) - -# begin, rest = matrix[:, :i], matrix[:, (i + 1):] -# rest -= math_ops.matmul(v, rest, transpose_a=True) * v -# matrix = array_ops.concat([begin, v, rest], 1) -# return matrix diff --git a/autodist/kernel/synchronization/ps_synchronizer.py b/autodist/kernel/synchronization/ps_synchronizer.py index 560e45f..65e65f8 100644 --- a/autodist/kernel/synchronization/ps_synchronizer.py +++ b/autodist/kernel/synchronization/ps_synchronizer.py @@ -35,7 +35,7 @@ remove_from_control_consumers, get_index_from_tensor_name, update_colocation_group from autodist.kernel.common.variable_utils import get_read_var_ops from autodist.kernel.synchronization.synchronizer import Synchronizer -from autodist.proto import synchronizers_pb2 +from autodist.proto import synchronizers_pb2, strategy_pb2 class PSSynchronizer(Synchronizer): @@ -53,15 +53,19 @@ class PSSynchronizer(Synchronizer): for each variable for the workers to mark when their variable update is complete. """ - def __init__(self, config: synchronizers_pb2.PSSynchronizer): - self.target_device = config.reduction_destination if config.reduction_destination else "" - self._local_replication = config.local_replication - self._sync = config.sync - self._staleness = config.staleness + def __init__(self, config: strategy_pb2.Strategy.Node): + syncer_config = getattr(config, config.WhichOneof('synchronizer')) + compressor_value = getattr(config, 'compressor') + self.target_device = syncer_config.reduction_destination if syncer_config.reduction_destination else "" + self._local_replication = syncer_config.local_replication + self._sync = syncer_config.sync + self._staleness = syncer_config.staleness self._var_op_to_agg_grad = {} self._var_op_to_accum_apply_op = {} super().__init__() + if compressor_value: + self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) def in_graph_apply(self, graph_item, var_name): """ diff --git a/autodist/kernel/synchronization/synchronizer.py b/autodist/kernel/synchronization/synchronizer.py index bfaa5a5..e9a38d7 100644 --- a/autodist/kernel/synchronization/synchronizer.py +++ b/autodist/kernel/synchronization/synchronizer.py @@ -41,6 +41,7 @@ def __init__(self): self.var_op_to_accum_apply_op = None self.is_chief = None self.all_canonical_replica_devices = None + self._compressor_type = None # pylint: disable=too-many-arguments def assign_cluster_information(self, diff --git a/tests/integration/test_all.py b/tests/integration/test_all.py index 24bd863..2470ada 100644 --- a/tests/integration/test_all.py +++ b/tests/integration/test_all.py @@ -37,7 +37,7 @@ PartitionedPS(local_proxy_variable=True), AllReduce(chunk_size=1, all_reduce_spec='NCCL', compressor='NoneCompressor'), AllReduce(chunk_size=1, all_reduce_spec='NCCL', compressor='HorovodCompressor'), - AllReduce(chunk_size=1, all_reduce_spec='RING', compressor='HorovodCompressorEF'), + # AllReduce(chunk_size=1, all_reduce_spec='RING', compressor='HorovodCompressorEF'), PSLoadBalancing(local_proxy_variable=True), Parallax(local_proxy_variable=True), PartitionedAR(), From bef8a953afdc7f32e53c28d3d47dd0b17f9c969e Mon Sep 17 00:00:00 2001 From: Tony Zhang Date: Tue, 3 Nov 2020 08:50:57 -0500 Subject: [PATCH 4/7] ci test 0 --- .../all_reduce_synchronizer.py | 5 +- autodist/kernel/synchronization/compressor.py | 46 +++++++++++++++++-- .../kernel/synchronization/ps_synchronizer.py | 3 +- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/autodist/kernel/synchronization/all_reduce_synchronizer.py b/autodist/kernel/synchronization/all_reduce_synchronizer.py index 177d368..bdf531f 100644 --- a/autodist/kernel/synchronization/all_reduce_synchronizer.py +++ b/autodist/kernel/synchronization/all_reduce_synchronizer.py @@ -31,8 +31,6 @@ from autodist.proto import synchronizers_pb2, compressor_pb2, strategy_pb2 from autodist.utils import logging -from tensorflow.python.ops import collective_ops - class CollectiveOpsConfig: """Config for using Collective Ops.""" @@ -81,11 +79,10 @@ def __init__(self, config: strategy_pb2.Strategy.Node): if compressor_value: self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) - def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): + def _all_reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): return collective_ops.all_reduce(tensor, **conf.__dict__) - def in_graph_apply(self, graph_item, var_name): """ Perform in-graph synchronization based on AllReduce and TensorFlow Collective Ops. diff --git a/autodist/kernel/synchronization/compressor.py b/autodist/kernel/synchronization/compressor.py index e952a64..aab6d4b 100644 --- a/autodist/kernel/synchronization/compressor.py +++ b/autodist/kernel/synchronization/compressor.py @@ -16,8 +16,8 @@ from abc import ABC, abstractmethod from tensorflow.python.framework import dtypes from tensorflow.python.framework.ops import Tensor -from tensorflow.python.ops import collective_ops, math_ops - +# from tensorflow.python.ops import collective_ops, math_ops +from tensorflow.python.ops import math_ops #from tensorflow.python.ops import array_ops, collective_ops, linalg_ops, math_ops, random_ops #from autodist.kernel.synchronization.collective_key import get_collective_keys #from autodist.utils import logging @@ -142,11 +142,11 @@ def __init__(self, var_op_name): # reduced = self._all_reduce(compressed_tensor, conf) # return self._decompress(reduced) - def compute_error(self, tensor: Tensor): + def _compute_error(self, tensor: Tensor): if self.error is not None: tensor += self.error - compressed_tensor = self._compress(tensor) - self.error = tensor - self._decompress(compressed_tensor) + compressed_tensor = self.compress(tensor) + self.error = tensor - self.decompress(compressed_tensor) class NoneCompressor(Compressor): """An identity Compressor.""" @@ -165,9 +165,27 @@ class NoneCompressor(Compressor): # return self._all_reduce(tensor, conf) def compress(self, tensor: Tensor): + """ + Compress a given tensor. + + Args: + tensor (Tensor): the Tensor to compress. + + Returns: + Tensor + """ return tensor def decompress(self, compressed_tensor: Tensor): + """ + Decompress a given tensor. + + Args: + compressed_tensor (Tensor): the Tensor to decompress. + + Returns: + Tensor, Context + """ return compressed_tensor @@ -194,6 +212,15 @@ def __init__(self, var_op_name): # return self._decompress(reduced) def compress(self, tensor: Tensor): + """ + Compress a given tensor. + + Args: + tensor (Tensor): the Tensor to compress. + + Returns: + Tensor + """ self.dtype = tensor.dtype tensor_compressed = tensor if tensor.dtype.is_floating: @@ -203,6 +230,15 @@ def compress(self, tensor: Tensor): return tensor_compressed def decompress(self, compressed_tensor: Tensor): + """ + Decompress a given tensor. + + Args: + compressed_tensor (Tensor): the Tensor to decompress. + + Returns: + Tensor, Context + """ return math_ops.cast(compressed_tensor, self.dtype) diff --git a/autodist/kernel/synchronization/ps_synchronizer.py b/autodist/kernel/synchronization/ps_synchronizer.py index 65e65f8..56dcc56 100644 --- a/autodist/kernel/synchronization/ps_synchronizer.py +++ b/autodist/kernel/synchronization/ps_synchronizer.py @@ -35,7 +35,8 @@ remove_from_control_consumers, get_index_from_tensor_name, update_colocation_group from autodist.kernel.common.variable_utils import get_read_var_ops from autodist.kernel.synchronization.synchronizer import Synchronizer -from autodist.proto import synchronizers_pb2, strategy_pb2 +# from autodist.proto import synchronizers_pb2, strategy_pb2 +from autodist.proto import strategy_pb2, compressor_pb2 class PSSynchronizer(Synchronizer): From a94d82cb1261b029d60fa61f41d0c077ec5c7f15 Mon Sep 17 00:00:00 2001 From: Tony Zhang Date: Tue, 3 Nov 2020 14:59:53 -0500 Subject: [PATCH 5/7] ci test 1 --- autodist/graph_item.py | 1 - .../all_reduce_synchronizer.py | 20 ++++++++++++++++--- autodist/kernel/synchronization/compressor.py | 1 + autodist/patch.py | 1 - 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/autodist/graph_item.py b/autodist/graph_item.py index e581447..0406ce8 100644 --- a/autodist/graph_item.py +++ b/autodist/graph_item.py @@ -71,7 +71,6 @@ def get_default_graph_item(): def wrap_optimizer_init(fn: Callable): """Wraps the __init__ function of OptimizerV2 objects and stores the info in the default GraphItem.""" - def wrapper(*args, **kwargs): # args[0] should be `self`, which is an object of type == optimizer class containing_class = type(args[0]) diff --git a/autodist/kernel/synchronization/all_reduce_synchronizer.py b/autodist/kernel/synchronization/all_reduce_synchronizer.py index bdf531f..05825f5 100644 --- a/autodist/kernel/synchronization/all_reduce_synchronizer.py +++ b/autodist/kernel/synchronization/all_reduce_synchronizer.py @@ -18,6 +18,7 @@ from tensorflow.python import ops from tensorflow.python.framework import device_spec from tensorflow.python.ops import collective_ops +from tensorflow.python.framework.ops import Tensor import autodist from autodist.const import ENV @@ -62,7 +63,8 @@ class AllReduceSynchronizer(Synchronizer): """ def __init__(self, config: strategy_pb2.Strategy.Node): - compressor_value = getattr(config, 'compressor') + # compressor_value = getattr(config, 'compressor') + compressor_value = getattr(config.compressor, 'type') syncer_config = getattr(config, config.WhichOneof('synchronizer')) self._spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Name(syncer_config.spec) if autodist.float_major_minor_tf_version < 1.15 or autodist.float_major_minor_tf_version < 2.1: @@ -79,8 +81,18 @@ def __init__(self, config: strategy_pb2.Strategy.Node): if compressor_value: self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) - def _all_reduce(self, tensor: Tensor, conf: CollectiveOpsConfig): + @staticmethod + def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): + """ + Using CollectiveOps, AllReduce the given tensor. + + Args: + tensor (Tensor): the tensor to all-reduce + conf (CollectiveOpsConfig): the config for CollectiveOps + Returns: + The All-Reduced Tensor + """ return collective_ops.all_reduce(tensor, **conf.__dict__) def in_graph_apply(self, graph_item, var_name): @@ -139,7 +151,9 @@ def _collect_dense_gradients(self, graph_item, var_op_name): # "\/" is added for name scope reuse with ops.name_scope(replica_prefix(i) + "/collective-group-{}/".format(self._group)): with ops.colocate_with(grad.op): - reduced_grad = compressors[i].compress(grad, conf) + compressed_grad = compressors[i].compress(grad) + reduced = self._all_reduce(compressed_grad, conf) + reduced_grad = compressors[i].compress(reduced) update_consumers(grad_consumers, grad, reduced_grad) # TODO(Hao): update grad, target pair here or not? diff --git a/autodist/kernel/synchronization/compressor.py b/autodist/kernel/synchronization/compressor.py index aab6d4b..44a980d 100644 --- a/autodist/kernel/synchronization/compressor.py +++ b/autodist/kernel/synchronization/compressor.py @@ -148,6 +148,7 @@ def _compute_error(self, tensor: Tensor): compressed_tensor = self.compress(tensor) self.error = tensor - self.decompress(compressed_tensor) + class NoneCompressor(Compressor): """An identity Compressor.""" diff --git a/autodist/patch.py b/autodist/patch.py index d53a08f..7335c24 100644 --- a/autodist/patch.py +++ b/autodist/patch.py @@ -55,7 +55,6 @@ class PatchTensorFlow: @staticmethod def patch_var_reading(): """It only works with tf.gradients but not tape.gradients.""" - def value(self): """A cached operation which reads the value of this variable.""" if self._cached_value is not None: From b87f876e00c92c8d5434c99bf9d0977b295ee210 Mon Sep 17 00:00:00 2001 From: Tony Zhang Date: Wed, 4 Nov 2020 00:57:16 -0500 Subject: [PATCH 6/7] fix test-local tf1 --- .../kernel/synchronization/all_reduce_synchronizer.py | 9 +++++++-- autodist/kernel/synchronization/compressor.py | 2 ++ autodist/kernel/synchronization/ps_synchronizer.py | 3 ++- autodist/strategy/all_reduce_strategy.py | 2 +- autodist/strategy/partitioned_all_reduce_strategy.py | 2 +- .../random_axis_partition_all_reduce_strategy.py | 2 +- 6 files changed, 14 insertions(+), 6 deletions(-) diff --git a/autodist/kernel/synchronization/all_reduce_synchronizer.py b/autodist/kernel/synchronization/all_reduce_synchronizer.py index 05825f5..1dc4915 100644 --- a/autodist/kernel/synchronization/all_reduce_synchronizer.py +++ b/autodist/kernel/synchronization/all_reduce_synchronizer.py @@ -78,8 +78,11 @@ def __init__(self, config: strategy_pb2.Strategy.Node): # the strategy will validate the group assignments are legitimate. self._group = syncer_config.group super().__init__() - if compressor_value: + print('='*89) + print(compressor_value) + if compressor_value is not None: self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) + print(self._compressor_type) @staticmethod def _all_reduce(tensor: Tensor, conf: CollectiveOpsConfig): @@ -150,10 +153,12 @@ def _collect_dense_gradients(self, graph_item, var_op_name): # "\/" is added for name scope reuse with ops.name_scope(replica_prefix(i) + "/collective-group-{}/".format(self._group)): + # compressed_grad = compressors[i].compress(grad) with ops.colocate_with(grad.op): compressed_grad = compressors[i].compress(grad) reduced = self._all_reduce(compressed_grad, conf) - reduced_grad = compressors[i].compress(reduced) + reduced_grad = compressors[i].decompress(reduced) + # reduced_grad = compressors[i].decompress(reduced) update_consumers(grad_consumers, grad, reduced_grad) # TODO(Hao): update grad, target pair here or not? diff --git a/autodist/kernel/synchronization/compressor.py b/autodist/kernel/synchronization/compressor.py index 44a980d..4dea062 100644 --- a/autodist/kernel/synchronization/compressor.py +++ b/autodist/kernel/synchronization/compressor.py @@ -108,6 +108,8 @@ def create(cls, name, *args, **kwargs): Returns: Compressor """ + print('='*89) + print(name) subclass = next(subclass for subclass in cls._get_subclasses() if subclass.__name__ == name) return subclass(*args, **kwargs) diff --git a/autodist/kernel/synchronization/ps_synchronizer.py b/autodist/kernel/synchronization/ps_synchronizer.py index 56dcc56..3c1d587 100644 --- a/autodist/kernel/synchronization/ps_synchronizer.py +++ b/autodist/kernel/synchronization/ps_synchronizer.py @@ -56,7 +56,8 @@ class PSSynchronizer(Synchronizer): def __init__(self, config: strategy_pb2.Strategy.Node): syncer_config = getattr(config, config.WhichOneof('synchronizer')) - compressor_value = getattr(config, 'compressor') + # compressor_value = getattr(config, 'compressor') + compressor_value = getattr(config.compressor, 'type') self.target_device = syncer_config.reduction_destination if syncer_config.reduction_destination else "" self._local_replication = syncer_config.local_replication self._sync = syncer_config.sync diff --git a/autodist/strategy/all_reduce_strategy.py b/autodist/strategy/all_reduce_strategy.py index dc0abe8..ce39507 100644 --- a/autodist/strategy/all_reduce_strategy.py +++ b/autodist/strategy/all_reduce_strategy.py @@ -85,6 +85,6 @@ def _gen_all_reduce_node_config(var_name, group=0, all_reduce_spec="NCCL", compr node = strategy_pb2.Strategy.Node() node.var_name = var_name node.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value(all_reduce_spec) - node.compressor = compressor_pb2.Compressor.Type.Value(compressor) + node.compressor.type = compressor_pb2.Compressor.Type.Value(compressor) node.AllReduceSynchronizer.group = group return node diff --git a/autodist/strategy/partitioned_all_reduce_strategy.py b/autodist/strategy/partitioned_all_reduce_strategy.py index e19cdae..0ebeda2 100644 --- a/autodist/strategy/partitioned_all_reduce_strategy.py +++ b/autodist/strategy/partitioned_all_reduce_strategy.py @@ -107,7 +107,7 @@ def _gen_node_config(self, var, var_counter): # Here let's just make it consistent part.var_name = '{}/part_{}:0'.format(get_op_name(var.name), i) part.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - part.compressor = compressor_pb2.Compressor.Type.Value("NoneCompressor") + part.compressor.type = compressor_pb2.Compressor.Type.Value("NoneCompressor") # part.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") part.AllReduceSynchronizer.group = (var_counter + i) // self.chunk_size node.part_config.extend([part]) diff --git a/autodist/strategy/random_axis_partition_all_reduce_strategy.py b/autodist/strategy/random_axis_partition_all_reduce_strategy.py index 53b6cdc..9dd37ef 100644 --- a/autodist/strategy/random_axis_partition_all_reduce_strategy.py +++ b/autodist/strategy/random_axis_partition_all_reduce_strategy.py @@ -105,7 +105,7 @@ def _gen_node_config(self, var, var_counter, grad): # Here let's just make it consistent part.var_name = '{}/part_{}:0'.format(get_op_name(var.name), i) part.AllReduceSynchronizer.spec = synchronizers_pb2.AllReduceSynchronizer.Spec.Value("AUTO") - part.compressor = compressor_pb2.Compressor.Type.Value("NoneCompressor") + part.compressor.type = compressor_pb2.Compressor.Type.Value("NoneCompressor") # part.compressor = compressor_pb2.Compressor.Type.Value("PowerSGDCompressor") part.AllReduceSynchronizer.group = (var_counter + i) // self.chunk_size node.part_config.extend([part]) From 96542e846d400fccbfab257493e5645e42d13619 Mon Sep 17 00:00:00 2001 From: Tony Zhang Date: Wed, 4 Nov 2020 01:00:55 -0500 Subject: [PATCH 7/7] fix test-local tf1 --- autodist/kernel/synchronization/all_reduce_synchronizer.py | 2 -- autodist/kernel/synchronization/compressor.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/autodist/kernel/synchronization/all_reduce_synchronizer.py b/autodist/kernel/synchronization/all_reduce_synchronizer.py index 1dc4915..f79679b 100644 --- a/autodist/kernel/synchronization/all_reduce_synchronizer.py +++ b/autodist/kernel/synchronization/all_reduce_synchronizer.py @@ -78,8 +78,6 @@ def __init__(self, config: strategy_pb2.Strategy.Node): # the strategy will validate the group assignments are legitimate. self._group = syncer_config.group super().__init__() - print('='*89) - print(compressor_value) if compressor_value is not None: self._compressor_type = compressor_pb2.Compressor.Type.Name(compressor_value) print(self._compressor_type) diff --git a/autodist/kernel/synchronization/compressor.py b/autodist/kernel/synchronization/compressor.py index 4dea062..44a980d 100644 --- a/autodist/kernel/synchronization/compressor.py +++ b/autodist/kernel/synchronization/compressor.py @@ -108,8 +108,6 @@ def create(cls, name, *args, **kwargs): Returns: Compressor """ - print('='*89) - print(name) subclass = next(subclass for subclass in cls._get_subclasses() if subclass.__name__ == name) return subclass(*args, **kwargs)