diff --git a/lib/rucio/core/topology.py b/lib/rucio/core/topology.py index b2b0239f49..181b52e26a 100644 --- a/lib/rucio/core/topology.py +++ b/lib/rucio/core/topology.py @@ -79,6 +79,8 @@ def __init__(self, src_node: TN, dst_node: TN): self.cost: _Number = 1 self.enabled: bool = True + self.__hash = None + self.add_to_nodes() def add_to_nodes(self): @@ -111,7 +113,12 @@ def __eq__(self, other): return self._src_node == other._src_node and self._dst_node == other._dst_node def __str__(self): - return f'{self._src_node}-->{self._dst_node}' + return f'{self.src_node}-->{self.dst_node}' + + def __hash__(self): + if self.__hash is None: + self.__hash = hash((self.src_node, self.dst_node)) + return self.__hash class Topology(RseCollection, Generic[TN, TE]): @@ -146,6 +153,10 @@ def get_or_create(self, rse_id: str) -> "TN": self._edges_loaded = False return rse_data + @property + def edges(self): + return self._edges + def edge(self, src_node: TN, dst_node: TN) -> "Optional[TE]": return self._edges.get((src_node, dst_node)) diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 7332f599cf..9950cd0396 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -21,6 +21,7 @@ import traceback from collections import defaultdict from dataclasses import dataclass +from decimal import Decimal, localcontext from typing import TYPE_CHECKING from dogpile.cache import make_region @@ -52,10 +53,10 @@ if TYPE_CHECKING: from collections.abc import Callable, Generator, Iterable - from typing import Any, Optional + from typing import Any, Optional, Mapping from sqlalchemy.orm import Session from rucio.common.types import InternalAccount - from rucio.core.topology import Topology + from rucio.core.topology import Topology, Node, Edge LoggerFunction = Callable[..., Any] @@ -499,6 +500,255 @@ def slice_time( older_t = older_t - resolution +class FlowManager: + class EdgeFlow: + def __init__(self, edge): + self.edge = edge + self.capacity = 10 ** 9 + self.cost = 1 + self.flow = 0 + + class NodeFlow: + def __init__(self, node): + self.node = node + self.capacity = 10 ** 9 + self.cost = 1 + self.in_flow = 0 + self.out_flow = 0 + + def __init__(self, topology: "Topology"): + self.topology = topology + + @read_session + def _demand_from_requests(self, *, session: "Session"): + """ + Scans the Requests table and returns the current demand + """ + topology = self.topology + db_stats = request_core.get_request_stats( + state=[RequestState.QUEUED, + RequestState.SUBMITTING, + RequestState.SUBMITTED, + RequestState.WAITING], + session=session, + ) + + pledged_demand = defaultdict(lambda: defaultdict(Decimal)) + pending_demand = defaultdict(Decimal) + for db_stat in db_stats: + if (db_stat.dest_rse_id not in topology) or (db_stat.source_rse_id and db_stat.source_rse_id not in topology): + # The RSE was deleted. Ignore + continue + + src_node = topology[db_stat.source_rse_id] if db_stat.source_rse_id else None + dst_node = topology[db_stat.dest_rse_id] + if src_node: + pledged_demand[dst_node][src_node] += Decimal(db_stat.bytes) + else: + pending_demand[dst_node] += Decimal(db_stat.bytes) + + return pledged_demand, pending_demand + + def _route_in_tree( + self, + root_node, + demand_towards_root_node: "Mapping[Node, Decimal]", + node_flow_provider: "Callable[[Node], NodeFlow]", + edge_flow_provider: "Callable[[Edge], EdgeFlow]", + ignore_capacity: bool = False, + ): + """ + Compute the shortest path tree towards the root node from all other nodes. + Route the demands in this tree and increase the associated flow. + """ + accepted_demand_towards_root_node = {} + remaining_demand_towards_root_node = {} + sorted_nodes = list( + self.topology.dijkstra_spf( + dst_node=root_node, + nodes_to_find=set(demand_towards_root_node), + node_state_provider=node_flow_provider, + edge_state_provider=edge_flow_provider, + ) + ) + # Adding the demand may result in an edge or node to become overloaded. record by how much it was overloaded + overflow_ratio = Decimal(1) + inbound_flow_by_node = defaultdict(Decimal) + for node, _, node_state, edge_to_next_hop, edge_state in reversed(sorted_nodes): + next_hop = edge_to_next_hop.dst_node + + inbound_flow = inbound_flow_by_node[node] + # Whatever flow goes out of the current node will have to go into the next hop + outbound_flow = inbound_flow + demand_towards_root_node.get(node, Decimal(0)) + inbound_flow_by_node[next_hop] = inbound_flow_by_node[next_hop] + outbound_flow + # Detect if we overload a node or an edge. And by how much + if not ignore_capacity: + overflow_ratio = max( + overflow_ratio, + inbound_flow / node_state.capacity, + outbound_flow / edge_state.capacity + ) + if not ignore_capacity: + overflow_ratio = max(overflow_ratio, inbound_flow_by_node[root_node] / root_node.capacity) + + # Push each demand. If overflow_ratio is bigger than one, only push the fraction of demand which + # doesn't overload anything. + sorted_nodes.append((root_node, 0, node_flow_provider(root_node), None, None)) + for node, _, node_state, edge_to_next_hop, edge_state in sorted_nodes: + desired_demand = demand_towards_root_node.get(node, Decimal(0)) + + # Only accept the fraction of the demand which doesn't overload a node or an edge + accepted_demand = desired_demand / overflow_ratio + accepted_inbound = inbound_flow_by_node[node] / overflow_ratio + accepted_outbound = accepted_inbound + accepted_demand + + if desired_demand > 0: + remaining_demand = desired_demand - accepted_demand + accepted_demand_towards_root_node[node] = accepted_demand + if remaining_demand > 0: + remaining_demand_towards_root_node[node] = remaining_demand + + if accepted_inbound > 0: + node_state.in_flow += accepted_inbound + + if edge_state is not None and accepted_outbound > 0: + node_state.out_flow += accepted_outbound + edge_state.flow += accepted_outbound + + return accepted_demand_towards_root_node, remaining_demand_towards_root_node + + @METRICS.time_it() + def _karakostas_multicommodity_flow( + self, + demands: "Mapping[Node, Mapping[Node, float | Decimal]]", + epsilon: Decimal = Decimal('0.1') + ): + """ + Compute the maximum multicommodity flow [1]. This corresponds to the most load-balanced way of transferring the given demand. + The input demand must be in the format {source_node: {destination_node: amount_of_bytes_to_transfer_from_source_to_destination}} + The resulted solution is fractional: a flow from source_node to destination_node can be split at will over all possible paths. + + Uses the following algorithm. The result will be within (1 - epsilon)**(-3) from the optimal solution. + [1] Karakostas, G. (2008). Faster approximation schemes for fractional multicommodity flow problems. ACM Transactions on Algorithms (TALG) + """ + flow = defaultdict(lambda: defaultdict(Decimal)) + if not demands: + return None, None + + with localcontext() as ctx: + # We will manipulate very small numbers + ctx.prec = 50 + + num_n_e = len(self.topology.rse_id_to_data_map) + len(self.topology.edges) + # constants are described (with the accompanying proof) in the linked paper + delta = (Decimal(1) / pow(Decimal(1) + epsilon, (Decimal(1) - epsilon) / epsilon)) * pow((Decimal(1) - epsilon) / num_n_e, Decimal(1) / epsilon) + flow_scale_factor = ((Decimal(1) + epsilon) / delta).ln() / (Decimal(1) + epsilon).ln() + max_iterations = 2 * (Decimal(1) / epsilon) * (num_n_e * (Decimal(1) + epsilon) / (Decimal(1) - epsilon)).ln() / (Decimal(1) + epsilon).ln() + + dl = Decimal(delta * num_n_e) + + class _FlowProxy: + """ + Stores the value of a flow while, also, automatically adjusting the cost + and the total volume (as neded for the Karakosta's algorithm) each time + an augmenting flow is pushed through the graph. + """ + def __init__(self, element_flow: "_NodeFlow | _EdgeFlow"): + self.value = Decimal(0) + self.element_flow = element_flow + + def __add__(self, other): + self.value += other + volume_increase = epsilon * other * self.element_flow.cost + self.element_flow.cost += volume_increase / self.element_flow.capacity + nonlocal dl + dl += volume_increase + return self + + class _EdgeFlow(self.EdgeFlow): + + def __init__(self, edge): + super().__init__(edge) + self.enabled = True + self.capacity = 10 ** 9 + self.cost = delta / self.capacity + self.flow = _FlowProxy(self) + + class _NodeFlow(self.NodeFlow): + + def __init__(self, node): + super().__init__(node) + self.enabled = True + self.capacity = 10 ** 9 + self.cost = delta / self.capacity + self.in_flow = _FlowProxy(self) + self.out_flow = Decimal(0) + + node_flows = {node: _NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()} + edge_flows = {edge: _EdgeFlow(edge) for edge in self.topology.edges.values()} + + demand_multiplier = 1 + iteration = 0 + while dl < 1: + # If the demands are very small compared to edge capacities, this algorithm will require an unreasonable + # number of iterations to complete. The following mechanism is described in the linked paper and is meant to + # speed up convergence time by doubling the demands multiple times. + iteration += 1 + if iteration > max_iterations: + iteration = 0 + demand_multiplier = demand_multiplier * 2 + + for dst_node, demand_towards_dst_node in demands.items(): + demand_towards_dst_node = {n: d * demand_multiplier for n, d in demand_towards_dst_node.items()} + + while dl < 1 and demand_towards_dst_node: + accepted_flows, demand_towards_dst_node = self._route_in_tree( + root_node=dst_node, + demand_towards_root_node=demand_towards_dst_node, + node_flow_provider=node_flows.get, + edge_flow_provider=edge_flows.get, + ) + for node, accepted_flow in accepted_flows.items(): + flow[dst_node][node] += accepted_flow + + # The computed flow violated edge and node capacity constraints. We need to scale it down. + lam = Decimal('inf') + for dst_node, demand_per_src in demands.items(): + for src_node, demand_from_src in demand_per_src.items(): + scaled_flow = flow[dst_node][src_node] / flow_scale_factor + flow[dst_node][src_node] = scaled_flow + lam = min(lam, scaled_flow / demand_from_src) + for node_state in node_flows.values(): + node_state.in_flow.value /= flow_scale_factor + node_state.out_flow /= flow_scale_factor + for edge_state in edge_flows.values(): + edge_state.flow.value /= flow_scale_factor + return node_flows, edge_flows + + @read_session + def optimize_flow(self, *, session: "Session"): + pledged_demand, pending_demand = self._demand_from_requests(session=session) + + # Compute the current status of the network + current_node_flow = {node: self.NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()} + current_edge_flow = {edge: self.EdgeFlow(edge) for edge in self.topology.edges.values()} + for dst_node, demand_towards_dst_node in pledged_demand.items(): + inbound_flow_by_node, _ = self._route_in_tree( + root_node=dst_node, + demand_towards_root_node=demand_towards_dst_node, + node_flow_provider=current_node_flow.get, + edge_flow_provider=current_edge_flow.get, + ) + + # Compute what would be the optimal flow in the network + desired_node_flow, desired_edge_flow = self._karakostas_multicommodity_flow(pledged_demand) + + # TODO: combine the information from the current flow and the desired flow + # to pick the destination (during rule evaluation) or source (for transfers) + # - select in priority if desired_flow > current_flow + # - avoid if desired_flow < current_flow + + def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str: """ an implementation of __str__ for a transfer path, which is a list of direct transfers, so not really an object