From ee28ce45e9618c66e35ae3c2d39ef43cdfcce1d6 Mon Sep 17 00:00:00 2001 From: Radu Carpa Date: Tue, 26 Sep 2023 17:29:47 +0200 Subject: [PATCH] wip --- lib/rucio/core/topology.py | 156 +++-------------------- lib/rucio/core/transfer.py | 247 ++++++++++++++++++++++++++++++++++++- tests/test_transfer.py | 34 +---- 3 files changed, 266 insertions(+), 171 deletions(-) diff --git a/lib/rucio/core/topology.py b/lib/rucio/core/topology.py index 4206590d590..9cc9100ad8b 100644 --- a/lib/rucio/core/topology.py +++ b/lib/rucio/core/topology.py @@ -18,8 +18,7 @@ import threading import weakref from collections.abc import Callable, Iterable, Iterator -from collections import defaultdict -from decimal import Decimal, localcontext +from decimal import Decimal from typing import TYPE_CHECKING, cast, Any, Generic, Optional, TypeVar, Union from sqlalchemy import and_, select @@ -27,7 +26,6 @@ from rucio.common.config import config_get_int, config_get from rucio.common.exception import NoDistance, RSEProtocolNotSupported, InvalidRSEExpression from rucio.common.utils import PriorityQueue -from rucio.core.monitor import MetricManager from rucio.core.rse import RseCollection, RseData from rucio.core.rse_expression_parser import parse_expression from rucio.db.sqla import models @@ -35,7 +33,7 @@ from rucio.rse import rsemanager as rsemgr LoggerFunction = Callable[..., Any] -_Number = Union[float, int] +_Number = Union[float, int, Decimal] TN = TypeVar("TN", bound="Node") TE = TypeVar("TE", bound="Edge") @@ -44,6 +42,7 @@ from typing import Protocol + class _StateProvider(Protocol): @property def cost(self) -> _Number: @@ -53,8 +52,10 @@ def cost(self) -> _Number: def enabled(self) -> bool: ... + TNState = TypeVar("TNState", bound=_StateProvider) + TEState = TypeVar("TEState", bound=_StateProvider) + -METRICS = MetricManager(module=__name__) DEFAULT_HOP_PENALTY = 10 INF = float('inf') @@ -67,7 +68,6 @@ def __init__(self, rse_id: str): self.out_edges = weakref.WeakKeyDictionary() self.cost: _Number = 0 - self.capacity: _Number = 10**9 self.enabled: bool = True self.used_for_multihop = False @@ -78,7 +78,6 @@ def __init__(self, src_node: TN, dst_node: TN): self._dst_node = weakref.ref(dst_node) self.cost: _Number = 1 - self.capacity: _Number = 10**9 self.enabled: bool = True self.__hash = None @@ -127,6 +126,7 @@ class Topology(RseCollection, Generic[TN, TE]): """ Helper private class used to easily fetch topological information for a subset of RSEs. """ + def __init__( self, rse_ids: Optional[Iterable[str]] = None, @@ -155,6 +155,10 @@ def get_or_create(self, rse_id: str) -> "TN": self._edges_loaded = False return rse_data + @property + def edges(self): + return self._edges + def edge(self, src_node: TN, dst_node: TN) -> "Optional[TE]": return self._edges.get((src_node, dst_node)) @@ -357,9 +361,9 @@ def dijkstra_spf( self, dst_node: TN, nodes_to_find: Optional[set[TN]] = None, - node_state_provider: "Callable[[TN], _StateProvider]" = lambda x: x, - edge_state_provider: "Callable[[TE], _StateProvider]" = lambda x: x, - ) -> "Iterator[tuple[TN, _Number, _StateProvider, TE, _StateProvider]]": + node_state_provider: "Callable[[TN], TNState]" = lambda x: x, + edge_state_provider: "Callable[[TE], TEState]" = lambda x: x, + ) -> "Iterator[tuple[TN, _Number, TNState, TE, TEState]]": """ Does a Backwards Dijkstra's algorithm: start from destination and follow inbound links to other nodes. If multihop is disabled, stop after analysing direct connections to dest_rse. @@ -371,7 +375,7 @@ def dijkstra_spf( priority_q = PriorityQueue() priority_q[dst_node] = 0 - next_hops: dict[TN, tuple[_Number, _StateProvider, Optional[TE], Optional[_StateProvider]]] =\ + next_hops: dict[TN, tuple[_Number, _StateProvider, Optional[TE], Optional[_StateProvider]]] = \ {dst_node: (0, node_state_provider(dst_node), None, None)} while priority_q: node = priority_q.pop() @@ -389,139 +393,11 @@ def dijkstra_spf( edge_state = edge_state_provider(edge) new_adjacent_dist = node_dist + node_state.cost + edge_state.cost - if new_adjacent_dist < next_hops.get(adjacent_node, (INF, ))[0] and edge_state.enabled: + if new_adjacent_dist < next_hops.get(adjacent_node, (INF,))[0] and edge_state.enabled: adj_node_state = node_state_provider(adjacent_node) next_hops[adjacent_node] = new_adjacent_dist, adj_node_state, edge, edge_state priority_q[adjacent_node] = new_adjacent_dist - @METRICS.time_it() - def karakostas_multicommodity_flow(self, demands, epsilon: Decimal = Decimal('0.1')): - """ - Compute the maximum multicommodity flow [1]. This corresponds to the most load-balanced way of transferring the given demand. - The input demand must be in the format {source_node: {destination_node: amount_of_bytes_to_transfer_from_source_to_destination}} - The resulted solution is fractional: a flow from source_node to destination_node can be split at will over all possible paths. - - Uses the following algorithm. The result will be within (1 - epsilon)**(-3) from the optimal solution. - [1] Karakostas, G. (2008). Faster approximation schemes for fractional multicommodity flow problems. ACM Transactions on Algorithms (TALG) - """ - flow = defaultdict(lambda: defaultdict(Decimal)) - - total_flow_from_node = defaultdict(Decimal) - total_flow_to_node = defaultdict(Decimal) - total_flow_via_edge = defaultdict(Decimal) - - with localcontext() as ctx: - # We will manipulate very small numbers - ctx.prec = 50 - - num_n_e = len(self.rse_id_to_data_map) + len(self._edges) - # constants are described (with the accompanying proof) in the linked paper - delta = (Decimal(1) / pow(Decimal(1) + epsilon, (Decimal(1) - epsilon) / epsilon)) * pow((Decimal(1) - epsilon) / num_n_e, Decimal(1) / epsilon) - flow_scale_factor = ((Decimal(1) + epsilon) / delta).ln() / (Decimal(1) + epsilon).ln() - max_iterations = 2 * (Decimal(1) / epsilon) * (num_n_e * (Decimal(1) + epsilon) / (Decimal(1) - epsilon)).ln() / (Decimal(1) + epsilon).ln() - - costs = {} - - class _DictStateProvider: - enabled = True - - def __init__(self, element: "TN | TE"): - self.cost = costs.get(element, None) - if self.cost is None: - costs[element] = self.cost = delta / element.capacity - - demand_multiplier = 1 - iteration = 0 - dl = Decimal(delta * num_n_e) - while dl < 1: - - # If the demands are very small compared to edge capacities, this algorithm will require an unreasonable - # number of iterations to complete. The following mechanism is described in the linked paper and is meant to - # speed up convergence time by doubling the demands multiple times. - iteration += 1 - if iteration > max_iterations: - iteration = 0 - demand_multiplier = demand_multiplier * 2 - - for dst_node, demand_towards_dst_node in demands.items(): - demand_towards_dst_node = {n: d * demand_multiplier for n, d in demand_towards_dst_node.items()} - - any_remaining_demand = True - while dl < 1 and any_remaining_demand: - sorted_nodes = list(self.dijkstra_spf(dst_node=dst_node, - nodes_to_find=set(demand_towards_dst_node), - node_state_provider=_DictStateProvider, - edge_state_provider=_DictStateProvider)) - - # Find how much flow will pass via every node and edge if we push all demands via the shortest paths. - # This may result in an edge or node to get overloaded. overflow_ratio will record by how much it was overloaded - overflow_ratio = Decimal(1) - inbound_flow_by_node = defaultdict(Decimal) - for node, _, _, edge_to_next_hop, _ in reversed(sorted_nodes): - next_hop = edge_to_next_hop.dst_node - - inbound_flow = inbound_flow_by_node[node] - # Whatever flow goes out of the current node will have to go into the next hop - outbound_flow = inbound_flow + demand_towards_dst_node.get(node, Decimal(0)) - inbound_flow_by_node[next_hop] = inbound_flow_by_node[next_hop] + outbound_flow - # Only accept a fraction of the demand if it risks to overload any edge or node - overflow_ratio = max( - overflow_ratio, - inbound_flow / node.capacity, - outbound_flow / edge_to_next_hop.capacity, - ) - overflow_ratio = max(overflow_ratio, inbound_flow_by_node[dst_node] / dst_node.capacity) - - # Push each demand. If overflow_ratio is bigger than one, only push the fraction of demand which - # doesn't overload anything. - any_remaining_demand = False - sorted_nodes.append((dst_node, None, None, None, None)) - for node, _, _, edge_to_next_hop, _ in sorted_nodes: - desired_demand = demand_towards_dst_node.get(node, Decimal(0)) - - accepted_demand = desired_demand / overflow_ratio - accepted_inbound = inbound_flow_by_node[node] / overflow_ratio - accepted_outbound = accepted_inbound + accepted_demand - - if desired_demand > 0: - remaining_demand = desired_demand - accepted_demand - demand_towards_dst_node[node] = remaining_demand - flow[dst_node][node] += accepted_demand - if remaining_demand: - any_remaining_demand = True - - node_volume_increase = 0 - if accepted_inbound > 0: - total_flow_to_node[node] += accepted_inbound - - node_volume_increase = epsilon * accepted_inbound * costs[node] - costs[node] += node_volume_increase / node.capacity - - edge_volume_increase = 0 - if edge_to_next_hop is not None and accepted_outbound > 0: - total_flow_from_node[node] += accepted_outbound - total_flow_via_edge[edge_to_next_hop] += accepted_outbound - - edge_volume_increase = epsilon * accepted_outbound * costs[edge_to_next_hop] - costs[edge_to_next_hop] += edge_volume_increase / edge_to_next_hop.capacity - - dl += edge_volume_increase + node_volume_increase - - # The computed flow violated edge and node capacity constraints. We need to scale it down. - lam = Decimal('inf') - for dst_node, demand_per_src in demands.items(): - for src_node, demand_from_src in demand_per_src.items(): - scaled_flow = flow[dst_node][src_node] / flow_scale_factor - flow[dst_node][src_node] = scaled_flow - lam = min(lam, scaled_flow / demand_from_src) - for node in total_flow_to_node: - total_flow_to_node[node] /= flow_scale_factor - for node in total_flow_from_node: - total_flow_from_node[node] /= flow_scale_factor - for edge in total_flow_via_edge: - total_flow_via_edge[edge] /= flow_scale_factor - print(lam) - class ExpiringObjectCache: """ diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 2ae95b89884..f4fc3c80510 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -18,6 +18,8 @@ import re import time import traceback +from collections import defaultdict +from decimal import Decimal, localcontext from typing import TYPE_CHECKING from dogpile.cache import make_region @@ -48,10 +50,10 @@ if TYPE_CHECKING: from collections.abc import Callable, Generator, Iterable - from typing import Any, Optional + from typing import Any, Optional, Mapping from sqlalchemy.orm import Session from rucio.common.types import InternalAccount - from rucio.core.topology import Topology + from rucio.core.topology import Topology, Node, Edge LoggerFunction = Callable[..., Any] @@ -284,6 +286,247 @@ def legacy_sources(self): return self._legacy_sources +class FlowManager: + class EdgeFlow: + def __init__(self, edge): + self.edge = edge + self.capacity = 10 ** 9 + self.cost = 1 + self.flow = 0 + + class NodeFlow: + def __init__(self, node): + self.node = node + self.capacity = 10 ** 9 + self.cost = 1 + self.in_flow = 0 + self.out_flow = 0 + + def __init__(self, topology: "Topology"): + self.topology = topology + + @read_session + def _demand_from_requests(self, *, session: "Session"): + """ + Scans the Requests table and returns the demand + """ + topology = self.topology + db_stats = request_core.get_request_stats( + state=[RequestState.QUEUED, + RequestState.SUBMITTING, + RequestState.SUBMITTED, + RequestState.WAITING], + session=session, + ) + + pledged_demand = defaultdict(lambda: defaultdict(Decimal)) + pending_demand = defaultdict(Decimal) + for db_stat in db_stats: + if (db_stat.dest_rse_id not in topology) or (db_stat.source_rse_id and db_stat.source_rse_id not in topology): + # The RSE was deleted. Ignore + continue + + src_node = topology[db_stat.source_rse_id] if db_stat.source_rse_id else None + dst_node = topology[db_stat.dest_rse_id] + if src_node: + pledged_demand[dst_node][src_node] += Decimal(db_stat.bytes) + else: + pending_demand[dst_node] += Decimal(db_stat.bytes) + + return pledged_demand, pending_demand + + def _route_in_tree( + self, + root_node, + demand_towards_root_node: "Mapping[Node, Decimal]", + node_flow_provider: "Callable[[Node], NodeFlow]", + edge_flow_provider: "Callable[[Edge], EdgeFlow]", + ignore_capacity: bool = False, + ): + """ + Route the demands va shortest paths in the network and increase the associated flow. + """ + accepted_demand_towards_root_node = {} + remaining_demand_towards_root_node = {} + sorted_nodes = list( + self.topology.dijkstra_spf( + dst_node=root_node, + nodes_to_find=set(demand_towards_root_node), + node_state_provider=node_flow_provider, + edge_state_provider=edge_flow_provider, + ) + ) + # Adding the demand may result in an edge or node to become overloaded. record by how much it was overloaded + overflow_ratio = Decimal(1) + inbound_flow_by_node = defaultdict(Decimal) + for node, _, node_state, edge_to_next_hop, edge_state in reversed(sorted_nodes): + next_hop = edge_to_next_hop.dst_node + + inbound_flow = inbound_flow_by_node[node] + # Whatever flow goes out of the current node will have to go into the next hop + outbound_flow = inbound_flow + demand_towards_root_node.get(node, Decimal(0)) + inbound_flow_by_node[next_hop] = inbound_flow_by_node[next_hop] + outbound_flow + # Detect if we overload a node or an edge. And by how much + if not ignore_capacity: + overflow_ratio = max( + overflow_ratio, + inbound_flow / node_state.capacity, + outbound_flow / edge_state.capacity + ) + if not ignore_capacity: + overflow_ratio = max(overflow_ratio, inbound_flow_by_node[root_node] / root_node.capacity) + + # Push each demand. If overflow_ratio is bigger than one, only push the fraction of demand which + # doesn't overload anything. + sorted_nodes.append((root_node, 0, node_flow_provider(root_node), None, None)) + for node, _, node_state, edge_to_next_hop, edge_state in sorted_nodes: + desired_demand = demand_towards_root_node.get(node, Decimal(0)) + + # Only accept the fraction of the demand which doesn't overload a node or an edge + accepted_demand = desired_demand / overflow_ratio + accepted_inbound = inbound_flow_by_node[node] / overflow_ratio + accepted_outbound = accepted_inbound + accepted_demand + + if desired_demand > 0: + remaining_demand = desired_demand - accepted_demand + accepted_demand_towards_root_node[node] = accepted_demand + if remaining_demand > 0: + remaining_demand_towards_root_node[node] = remaining_demand + + if accepted_inbound > 0: + node_state.in_flow += accepted_inbound + + if edge_state is not None and accepted_outbound > 0: + node_state.out_flow += accepted_outbound + edge_state.flow += accepted_outbound + + return accepted_demand_towards_root_node, remaining_demand_towards_root_node + + @METRICS.time_it() + def _karakostas_multicommodity_flow( + self, + demands: "Mapping[Node, Mapping[Node, float]]", + epsilon: Decimal = Decimal('0.1') + ): + """ + Compute the maximum multicommodity flow [1]. This corresponds to the most load-balanced way of transferring the given demand. + The input demand must be in the format {source_node: {destination_node: amount_of_bytes_to_transfer_from_source_to_destination}} + The resulted solution is fractional: a flow from source_node to destination_node can be split at will over all possible paths. + + Uses the following algorithm. The result will be within (1 - epsilon)**(-3) from the optimal solution. + [1] Karakostas, G. (2008). Faster approximation schemes for fractional multicommodity flow problems. ACM Transactions on Algorithms (TALG) + """ + flow = defaultdict(lambda: defaultdict(Decimal)) + if not demands: + return flow + + with localcontext() as ctx: + # We will manipulate very small numbers + ctx.prec = 50 + + num_n_e = len(self.topology.rse_id_to_data_map) + len(self.topology.edges) + # constants are described (with the accompanying proof) in the linked paper + delta = (Decimal(1) / pow(Decimal(1) + epsilon, (Decimal(1) - epsilon) / epsilon)) * pow((Decimal(1) - epsilon) / num_n_e, Decimal(1) / epsilon) + flow_scale_factor = ((Decimal(1) + epsilon) / delta).ln() / (Decimal(1) + epsilon).ln() + max_iterations = 2 * (Decimal(1) / epsilon) * (num_n_e * (Decimal(1) + epsilon) / (Decimal(1) - epsilon)).ln() / (Decimal(1) + epsilon).ln() + + dl = Decimal(delta * num_n_e) + + class _FlowProxy: + """ + Stores the value of a flow while, also, automatically adjusting the cost + and the total volume (as neded for the Karakosta's algorithm) each time + an augmenting flow is pushed through the graph. + """ + def __init__(self, element_flow: "_NodeFlow | _EdgeFlow"): + self.value = Decimal(0) + self.element_flow = element_flow + + def __add__(self, other): + self.value += other + volume_increase = epsilon * other * self.element_flow.cost + self.element_flow.cost += volume_increase / self.element_flow.capacity + nonlocal dl + dl += volume_increase + return self + + class _EdgeFlow(self.EdgeFlow): + + def __init__(self, edge): + super().__init__(edge) + self.enabled = True + self.capacity = 10 ** 9 + self.cost = delta / self.capacity + self.flow = _FlowProxy(self) + + class _NodeFlow(self.NodeFlow): + + def __init__(self, node): + super().__init__(node) + self.enabled = True + self.capacity = 10 ** 9 + self.cost = delta / self.capacity + self.in_flow = _FlowProxy(self) + self.out_flow = Decimal(0) + + node_flows = {node: _NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()} + edge_flows = {edge: _EdgeFlow(edge) for edge in self.topology.edges.values()} + + demand_multiplier = 1 + iteration = 0 + while dl < 1: + # If the demands are very small compared to edge capacities, this algorithm will require an unreasonable + # number of iterations to complete. The following mechanism is described in the linked paper and is meant to + # speed up convergence time by doubling the demands multiple times. + iteration += 1 + if iteration > max_iterations: + iteration = 0 + demand_multiplier = demand_multiplier * 2 + + for dst_node, demand_towards_dst_node in demands.items(): + demand_towards_dst_node = {n: d * demand_multiplier for n, d in demand_towards_dst_node.items()} + + while dl < 1 and demand_towards_dst_node: + accepted_flows, demand_towards_dst_node = self._route_in_tree( + root_node=dst_node, + demand_towards_root_node=demand_towards_dst_node, + node_flow_provider=node_flows.get, + edge_flow_provider=edge_flows.get, + ) + for node, accepted_flow in accepted_flows.items(): + flow[dst_node][node] += accepted_flow + + # The computed flow violated edge and node capacity constraints. We need to scale it down. + lam = Decimal('inf') + for dst_node, demand_per_src in demands.items(): + for src_node, demand_from_src in demand_per_src.items(): + scaled_flow = flow[dst_node][src_node] / flow_scale_factor + flow[dst_node][src_node] = scaled_flow + lam = min(lam, scaled_flow / demand_from_src) + for node_state in node_flows.values(): + node_state.in_flow.value /= flow_scale_factor + node_state.out_flow /= flow_scale_factor + for edge_state in edge_flows.values(): + edge_state.flow.value /= flow_scale_factor + return flow + + @read_session + def optimize_flow(self, *, session: "Session"): + pledged_demand, pending_demand = self._demand_from_requests(session=session) + + node_flows = {node: self.NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()} + edge_flows = {edge: self.EdgeFlow(edge) for edge in self.topology.edges.values()} + for dst_node, demand_towards_dst_node in pledged_demand.items(): + inbound_flow_by_node, _ = self._route_in_tree( + root_node=dst_node, + demand_towards_root_node=demand_towards_dst_node, + node_flow_provider=node_flows.get, + edge_flow_provider=edge_flows.get, + ) + + self._karakostas_multicommodity_flow() + + def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str: """ an implementation of __str__ for a transfer path, which is a list of direct transfers, so not really an object diff --git a/tests/test_transfer.py b/tests/test_transfer.py index 1b974003695..814da363267 100644 --- a/tests/test_transfer.py +++ b/tests/test_transfer.py @@ -383,10 +383,9 @@ def test_fk_error_on_source_creation(rse_factory, did_factory, root_account): 'rucio.core.rse_expression_parser.REGION', # The list of multihop RSEs is retrieved by an expression ]}], indirect=True) def test_topology(rse_factory, did_factory, root_account, vo, caches_mock): - from rucio.core.request import get_request_stats - from collections import defaultdict - from rucio.core.rse import list_rses + from rucio.core.topology import Topology + from rucio.core.transfer import FlowManager topology = Topology(rse_ids=[rse['id'] for rse in list_rses()]).configure_multihop() topology.ensure_loaded(load_name=True, load_columns=True, load_usage=True, load_info=True, load_attributes=True, load_limits=True) @@ -406,30 +405,6 @@ def test_topology(rse_factory, did_factory, root_account, vo, caches_mock): rse3 = topology[rse3_id] rse4 = topology[rse4_id] - db_stats = get_request_stats( - state=[RequestState.QUEUED, - RequestState.SUBMITTING, - RequestState.SUBMITTED, - RequestState.WAITING], - ) - - pending_transfers = defaultdict(float) - for db_stat in db_stats: - - if (db_stat.dest_rse_id not in topology) or (db_stat.source_rse_id and db_stat.source_rse_id not in topology): - # The RSE was deleted. Ignore - continue - - src_node = topology[db_stat.source_rse_id] if db_stat.source_rse_id else None - dst_node = topology[db_stat.dest_rse_id] - if src_node: - edge = topology.get_or_create_edge(src_node, dst_node) - if edge: - edge.usage += db_stat.bytes - # add usage to src and dst rse - else: - pending_transfers[dst_node] += db_stat.bytes - demand = { rse1: { rse2: 10**9, @@ -446,5 +421,6 @@ def test_topology(rse_factory, did_factory, root_account, vo, caches_mock): rse2.capacity = 10 ** 9 rse3.capacity = 10 ** 8 rse4.capacity = 10 ** 9 - topology.karakostas_multicommodity_flow(demand) - print(pending_transfers) + + flow_manager = FlowManager(topology) + flow_manager.optimize_flow()