From b61db3504827f8031ba0370475bdfc14a2c64329 Mon Sep 17 00:00:00 2001 From: sangkeun00 Date: Mon, 31 Aug 2020 18:04:16 -0400 Subject: [PATCH 1/4] BytePS init --- autodist/resource_spec.py | 9 ++++++- autodist/strategy/__init__.py | 1 + autodist/strategy/byte_ps_strategy.py | 39 +++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 autodist/strategy/byte_ps_strategy.py diff --git a/autodist/resource_spec.py b/autodist/resource_spec.py index b534a86..9c7adf3 100644 --- a/autodist/resource_spec.py +++ b/autodist/resource_spec.py @@ -100,6 +100,13 @@ def cpu_devices(self): self.__cpu_devices = {k: v for k, v in self.__devices.items() if v.device_type is DeviceType.CPU} return self.__cpu_devices.items() + @property + def cpu_only_devices(self): + """String-to-device_spec mapping of all cpu ONLY devices.""" + gpu_addresses = set([k.split(':')[0] for k, _ in self.gpu_devices]) + cpu_only_devices = {k: v for k, v in self.cpu_devices if k.split(':')[0] not in gpu_addresses} + return cpu_only_devices.items() + @property def num_cpus(self): """Number of all cpu devices.""" @@ -124,7 +131,7 @@ def node_gpu_devices(self): @property def node_cpu_devices(self): - """Node_address-to-device_string mapping of all cpu devices.""" + """Node_address-to-device_string mapping of all cpu devices.""" _cpu_devices = dict() for device in self.cpu_devices: _cpu_devices.setdefault(device[0].split(':')[0], []).append(device[0]) diff --git a/autodist/strategy/__init__.py b/autodist/strategy/__init__.py index 0424dd5..601ccfc 100644 --- a/autodist/strategy/__init__.py +++ b/autodist/strategy/__init__.py @@ -25,3 +25,4 @@ from .partitioned_all_reduce_strategy import PartitionedAR from .random_axis_partition_all_reduce_strategy import RandomAxisPartitionAR from .uneven_partition_ps_strategy import UnevenPartitionedPS +from .byte_ps_strategy import BytePS diff --git a/autodist/strategy/byte_ps_strategy.py b/autodist/strategy/byte_ps_strategy.py new file mode 100644 index 0000000..1db543f --- /dev/null +++ b/autodist/strategy/byte_ps_strategy.py @@ -0,0 +1,39 @@ +"""BytePS StrategyBuilder.""" +from tensorflow.python.framework import ops + +from autodist.strategy.base import Strategy +from autodist.strategy.ps_lb_strategy import PSLoadBalancing +from autodist.strategy.all_reduce_strategy import AllReduce +from autodist.kernel.common.utils import get_op_name + + +class BytePS(PSLoadBalancing): + """ + Generates the BytePS Strategy from https://github.com/bytedance/byteps. + + The BytePS strategy exploits CPU-only nodes for communication while GPU nodes + for computatoin. + """ + + def __init__(self, local_proxy_variable=False, sync=True, staleness=0): + PSLoadBalancing.__init__(self, local_proxy_variable, sync, staleness) + + # pylint: disable=attribute-defined-outside-init + def build(self, graph_item, resource_spec): + """Generate the strategy.""" + expr = Strategy() + + # get each variable, generate variable synchronizer config + expr.graph_config.replicas.extend([k for k, v in resource_spec.gpu_devices]) + + # find all variables + variables = graph_item.get_trainable_variables() + reduction_device_names = [k for k, _ in resource_spec.cpu_only_devices] + self.loads = {ps: 0.0 for ps in reduction_device_names} + + # Mark each variable to be synchronized with a Parameter Server + node_config = [self._gen_ps_node_config(var, self._local_proxy_variable, self._sync, self._staleness) + for var in variables] + expr.node_config.extend(node_config) + + return expr From 055f3ac94d3f3dfe802cfaba92b695ecbf56ce54 Mon Sep 17 00:00:00 2001 From: sangkeun00 Date: Tue, 8 Sep 2020 21:18:48 -0400 Subject: [PATCH 2/4] fixed pylint errors --- autodist/resource_spec.py | 2 +- autodist/strategy/byte_ps_strategy.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/autodist/resource_spec.py b/autodist/resource_spec.py index 9c7adf3..84843e5 100644 --- a/autodist/resource_spec.py +++ b/autodist/resource_spec.py @@ -103,7 +103,7 @@ def cpu_devices(self): @property def cpu_only_devices(self): """String-to-device_spec mapping of all cpu ONLY devices.""" - gpu_addresses = set([k.split(':')[0] for k, _ in self.gpu_devices]) + gpu_addresses = {k.split(':')[0] for k, _ in self.gpu_devices} cpu_only_devices = {k: v for k, v in self.cpu_devices if k.split(':')[0] not in gpu_addresses} return cpu_only_devices.items() diff --git a/autodist/strategy/byte_ps_strategy.py b/autodist/strategy/byte_ps_strategy.py index 1db543f..740a086 100644 --- a/autodist/strategy/byte_ps_strategy.py +++ b/autodist/strategy/byte_ps_strategy.py @@ -1,10 +1,6 @@ """BytePS StrategyBuilder.""" -from tensorflow.python.framework import ops - from autodist.strategy.base import Strategy from autodist.strategy.ps_lb_strategy import PSLoadBalancing -from autodist.strategy.all_reduce_strategy import AllReduce -from autodist.kernel.common.utils import get_op_name class BytePS(PSLoadBalancing): From 1978865c1aca1fbd65864385662812ce1cc7e123 Mon Sep 17 00:00:00 2001 From: sangkeun00 Date: Wed, 9 Sep 2020 12:28:28 -0400 Subject: [PATCH 3/4] add multinomial byteps --- autodist/strategy/byte_ps_strategy.py | 71 +++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/autodist/strategy/byte_ps_strategy.py b/autodist/strategy/byte_ps_strategy.py index 740a086..dd43913 100644 --- a/autodist/strategy/byte_ps_strategy.py +++ b/autodist/strategy/byte_ps_strategy.py @@ -1,6 +1,7 @@ -"""BytePS StrategyBuilder.""" -from autodist.strategy.base import Strategy -from autodist.strategy.ps_lb_strategy import PSLoadBalancing +"""BytePS StrategyBuilder(s).""" +from autodist.strategy.base import Strategy, StrategyBuilder +from autodist.proto import strategy_pb2 +from autodist.strategy.ps_lb_strategy import PSLoadBalancing, byte_size_load_fn class BytePS(PSLoadBalancing): @@ -33,3 +34,67 @@ def build(self, graph_item, resource_spec): expr.node_config.extend(node_config) return expr + + +class MultinomialBytePS(StrategyBuilder): + """ + BytePS with Multinomial Load Balancing. + + Each PS gets assigned variables of which size is proportional to + its bandwidth. + """ + + def __init__(self, local_proxy_variable=False, sync=True, staleness=0): + self._local_proxy_variable = local_proxy_variable + self._sync = sync + self._staleness = staleness + if self._staleness > 0: + assert self._sync, 'If staleness is positive, sync has to be set true' + self.loads = {} + self.bandwidth = {} + super().__init__() + + def build(self, graph_item, resource_spec): + """Generate the Strategy.""" + expr = Strategy() + + # get each variable, generate variable synchronizer config + expr.graph_config.replicas.extend([k for k, v in resource_spec.gpu_devices]) + for k, v in resource_spec.node_cpu_devices.items(): + if k not in resource_spec.node_gpu_devices: + expr.graph_config.replicas.extend(v) + + # find all variables + variables = graph_item.get_trainable_variables() + reduction_device_names = [k for k, _ in resource_spec.cpu_devices] + bandwidth = resource_spec.network_bandwidth + self.bandwidth = {ps: bandwidth[ps.split(':')[0]] for ps in reduction_device_names} + self.loads = {ps: 1e-8 / self.bandwidth[ps] for ps in reduction_device_names} + + # Mark each variable to be synchronized with a Parameter Server + node_config = [self._gen_ps_node_config(var, self._local_proxy_variable, self._sync, self._staleness) + for var in variables] + expr.node_config.extend(node_config) + + return expr + + def _gen_ps_node_config(self, var, local_proxy_variable, sync, staleness): + """ + Creates a NodeConfig specifying synchronization with Parameter Servers. + + Args: + var (Variable): The variable to generate a config for. + + Returns: + strategy_pb2.Strategy.Node: the config for the node. + """ + min_ps = min(self.loads, key=self.loads.get) + self.loads[min_ps] += byte_size_load_fn(var) / self.bandwidth[min_ps] + + node = strategy_pb2.Strategy.Node() + node.var_name = var.name + node.PSSynchronizer.reduction_destination = min_ps + node.PSSynchronizer.local_replication = local_proxy_variable + node.PSSynchronizer.sync = sync + node.PSSynchronizer.staleness = staleness + return node From 3b3196a0182b76558ac745ef1c1c11da8573fea5 Mon Sep 17 00:00:00 2001 From: sangkeun00 Date: Wed, 9 Sep 2020 12:37:09 -0400 Subject: [PATCH 4/4] fixed strategy/__init__.py --- autodist/strategy/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autodist/strategy/__init__.py b/autodist/strategy/__init__.py index 601ccfc..9df6036 100644 --- a/autodist/strategy/__init__.py +++ b/autodist/strategy/__init__.py @@ -25,4 +25,4 @@ from .partitioned_all_reduce_strategy import PartitionedAR from .random_axis_partition_all_reduce_strategy import RandomAxisPartitionAR from .uneven_partition_ps_strategy import UnevenPartitionedPS -from .byte_ps_strategy import BytePS +from .byte_ps_strategy import BytePS, MultinomialBytePS