diff --git a/lib/rucio/core/topology.py b/lib/rucio/core/topology.py index 5995da94424..9cc9100ad8b 100644 --- a/lib/rucio/core/topology.py +++ b/lib/rucio/core/topology.py @@ -18,6 +18,7 @@ import threading import weakref from collections.abc import Callable, Iterable, Iterator +from decimal import Decimal from typing import TYPE_CHECKING, cast, Any, Generic, Optional, TypeVar, Union from sqlalchemy import and_, select @@ -32,7 +33,7 @@ from rucio.rse import rsemanager as rsemgr LoggerFunction = Callable[..., Any] -_Number = Union[float, int] +_Number = Union[float, int, Decimal] TN = TypeVar("TN", bound="Node") TE = TypeVar("TE", bound="Edge") @@ -41,6 +42,7 @@ from typing import Protocol + class _StateProvider(Protocol): @property def cost(self) -> _Number: @@ -50,6 +52,9 @@ def cost(self) -> _Number: def enabled(self) -> bool: ... + TNState = TypeVar("TNState", bound=_StateProvider) + TEState = TypeVar("TEState", bound=_StateProvider) + DEFAULT_HOP_PENALTY = 10 INF = float('inf') @@ -75,6 +80,8 @@ def __init__(self, src_node: TN, dst_node: TN): self.cost: _Number = 1 self.enabled: bool = True + self.__hash = None + self.add_to_nodes() def add_to_nodes(self): @@ -107,13 +114,19 @@ def __eq__(self, other): return self._src_node == other._src_node and self._dst_node == other._dst_node def __str__(self): - return f'{self._src_node}-->{self._dst_node}' + return f'{self.src_node}-->{self.dst_node}' + + def __hash__(self): + if self.__hash is None: + self.__hash = hash((self.src_node, self.dst_node)) + return self.__hash class Topology(RseCollection, Generic[TN, TE]): """ Helper private class used to easily fetch topological information for a subset of RSEs. """ + def __init__( self, rse_ids: Optional[Iterable[str]] = None, @@ -142,6 +155,10 @@ def get_or_create(self, rse_id: str) -> "TN": self._edges_loaded = False return rse_data + @property + def edges(self): + return self._edges + def edge(self, src_node: TN, dst_node: TN) -> "Optional[TE]": return self._edges.get((src_node, dst_node)) @@ -344,9 +361,9 @@ def dijkstra_spf( self, dst_node: TN, nodes_to_find: Optional[set[TN]] = None, - node_state_provider: "Callable[[TN], _StateProvider]" = lambda x: x, - edge_state_provider: "Callable[[TE], _StateProvider]" = lambda x: x, - ) -> "Iterator[tuple[TN, _Number, _StateProvider, TE, _StateProvider]]": + node_state_provider: "Callable[[TN], TNState]" = lambda x: x, + edge_state_provider: "Callable[[TE], TEState]" = lambda x: x, + ) -> "Iterator[tuple[TN, _Number, TNState, TE, TEState]]": """ Does a Backwards Dijkstra's algorithm: start from destination and follow inbound links to other nodes. If multihop is disabled, stop after analysing direct connections to dest_rse. @@ -358,7 +375,7 @@ def dijkstra_spf( priority_q = PriorityQueue() priority_q[dst_node] = 0 - next_hops: dict[TN, tuple[_Number, _StateProvider, Optional[TE], Optional[_StateProvider]]] =\ + next_hops: dict[TN, tuple[_Number, _StateProvider, Optional[TE], Optional[_StateProvider]]] = \ {dst_node: (0, node_state_provider(dst_node), None, None)} while priority_q: node = priority_q.pop() @@ -376,7 +393,7 @@ def dijkstra_spf( edge_state = edge_state_provider(edge) new_adjacent_dist = node_dist + node_state.cost + edge_state.cost - if new_adjacent_dist < next_hops.get(adjacent_node, (INF, ))[0] and edge_state.enabled: + if new_adjacent_dist < next_hops.get(adjacent_node, (INF,))[0] and edge_state.enabled: adj_node_state = node_state_provider(adjacent_node) next_hops[adjacent_node] = new_adjacent_dist, adj_node_state, edge, edge_state priority_q[adjacent_node] = new_adjacent_dist diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 947253681fc..0af3f04a458 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -18,6 +18,8 @@ import re import time import traceback +from collections import defaultdict +from decimal import Decimal, localcontext from typing import TYPE_CHECKING from dogpile.cache import make_region @@ -49,10 +51,10 @@ if TYPE_CHECKING: from collections.abc import Callable, Generator, Iterable - from typing import Any, Optional + from typing import Any, Optional, Mapping from sqlalchemy.orm import Session from rucio.common.types import InternalAccount - from rucio.core.topology import Topology + from rucio.core.topology import Topology, Node, Edge LoggerFunction = Callable[..., Any] @@ -285,6 +287,251 @@ def legacy_sources(self): return self._legacy_sources +class FlowManager: + class EdgeFlow: + def __init__(self, edge): + self.edge = edge + self.capacity = 10 ** 9 + self.cost = 1 + self.flow = 0 + + class NodeFlow: + def __init__(self, node): + self.node = node + self.capacity = 10 ** 9 + self.cost = 1 + self.in_flow = 0 + self.out_flow = 0 + + def __init__(self, topology: "Topology"): + self.topology = topology + + @read_session + def _demand_from_requests(self, *, session: "Session"): + """ + Scans the Requests table and returns the current demand + """ + topology = self.topology + db_stats = request_core.get_request_stats( + state=[RequestState.QUEUED, + RequestState.SUBMITTING, + RequestState.SUBMITTED, + RequestState.WAITING], + session=session, + ) + + pledged_demand = defaultdict(lambda: defaultdict(Decimal)) + pending_demand = defaultdict(Decimal) + for db_stat in db_stats: + if (db_stat.dest_rse_id not in topology) or (db_stat.source_rse_id and db_stat.source_rse_id not in topology): + # The RSE was deleted. Ignore + continue + + src_node = topology[db_stat.source_rse_id] if db_stat.source_rse_id else None + dst_node = topology[db_stat.dest_rse_id] + if src_node: + pledged_demand[dst_node][src_node] += Decimal(db_stat.bytes) + else: + pending_demand[dst_node] += Decimal(db_stat.bytes) + + return pledged_demand, pending_demand + + def _capacity_from_requests_history(self, *, session: "Session"): + pass + + def _route_in_tree( + self, + root_node, + demand_towards_root_node: "Mapping[Node, Decimal]", + node_flow_provider: "Callable[[Node], NodeFlow]", + edge_flow_provider: "Callable[[Edge], EdgeFlow]", + ignore_capacity: bool = False, + ): + """ + Compute the shortest path tree towards the root node from all other nodes. + Route the demands in this tree and increase the associated flow. + """ + accepted_demand_towards_root_node = {} + remaining_demand_towards_root_node = {} + sorted_nodes = list( + self.topology.dijkstra_spf( + dst_node=root_node, + nodes_to_find=set(demand_towards_root_node), + node_state_provider=node_flow_provider, + edge_state_provider=edge_flow_provider, + ) + ) + # Adding the demand may result in an edge or node to become overloaded. record by how much it was overloaded + overflow_ratio = Decimal(1) + inbound_flow_by_node = defaultdict(Decimal) + for node, _, node_state, edge_to_next_hop, edge_state in reversed(sorted_nodes): + next_hop = edge_to_next_hop.dst_node + + inbound_flow = inbound_flow_by_node[node] + # Whatever flow goes out of the current node will have to go into the next hop + outbound_flow = inbound_flow + demand_towards_root_node.get(node, Decimal(0)) + inbound_flow_by_node[next_hop] = inbound_flow_by_node[next_hop] + outbound_flow + # Detect if we overload a node or an edge. And by how much + if not ignore_capacity: + overflow_ratio = max( + overflow_ratio, + inbound_flow / node_state.capacity, + outbound_flow / edge_state.capacity + ) + if not ignore_capacity: + overflow_ratio = max(overflow_ratio, inbound_flow_by_node[root_node] / root_node.capacity) + + # Push each demand. If overflow_ratio is bigger than one, only push the fraction of demand which + # doesn't overload anything. + sorted_nodes.append((root_node, 0, node_flow_provider(root_node), None, None)) + for node, _, node_state, edge_to_next_hop, edge_state in sorted_nodes: + desired_demand = demand_towards_root_node.get(node, Decimal(0)) + + # Only accept the fraction of the demand which doesn't overload a node or an edge + accepted_demand = desired_demand / overflow_ratio + accepted_inbound = inbound_flow_by_node[node] / overflow_ratio + accepted_outbound = accepted_inbound + accepted_demand + + if desired_demand > 0: + remaining_demand = desired_demand - accepted_demand + accepted_demand_towards_root_node[node] = accepted_demand + if remaining_demand > 0: + remaining_demand_towards_root_node[node] = remaining_demand + + if accepted_inbound > 0: + node_state.in_flow += accepted_inbound + + if edge_state is not None and accepted_outbound > 0: + node_state.out_flow += accepted_outbound + edge_state.flow += accepted_outbound + + return accepted_demand_towards_root_node, remaining_demand_towards_root_node + + @METRICS.time_it() + def _karakostas_multicommodity_flow( + self, + demands: "Mapping[Node, Mapping[Node, float]]", + epsilon: Decimal = Decimal('0.1') + ): + """ + Compute the maximum multicommodity flow [1]. This corresponds to the most load-balanced way of transferring the given demand. + The input demand must be in the format {source_node: {destination_node: amount_of_bytes_to_transfer_from_source_to_destination}} + The resulted solution is fractional: a flow from source_node to destination_node can be split at will over all possible paths. + + Uses the following algorithm. The result will be within (1 - epsilon)**(-3) from the optimal solution. + [1] Karakostas, G. (2008). Faster approximation schemes for fractional multicommodity flow problems. ACM Transactions on Algorithms (TALG) + """ + flow = defaultdict(lambda: defaultdict(Decimal)) + if not demands: + return flow + + with localcontext() as ctx: + # We will manipulate very small numbers + ctx.prec = 50 + + num_n_e = len(self.topology.rse_id_to_data_map) + len(self.topology.edges) + # constants are described (with the accompanying proof) in the linked paper + delta = (Decimal(1) / pow(Decimal(1) + epsilon, (Decimal(1) - epsilon) / epsilon)) * pow((Decimal(1) - epsilon) / num_n_e, Decimal(1) / epsilon) + flow_scale_factor = ((Decimal(1) + epsilon) / delta).ln() / (Decimal(1) + epsilon).ln() + max_iterations = 2 * (Decimal(1) / epsilon) * (num_n_e * (Decimal(1) + epsilon) / (Decimal(1) - epsilon)).ln() / (Decimal(1) + epsilon).ln() + + dl = Decimal(delta * num_n_e) + + class _FlowProxy: + """ + Stores the value of a flow while, also, automatically adjusting the cost + and the total volume (as neded for the Karakosta's algorithm) each time + an augmenting flow is pushed through the graph. + """ + def __init__(self, element_flow: "_NodeFlow | _EdgeFlow"): + self.value = Decimal(0) + self.element_flow = element_flow + + def __add__(self, other): + self.value += other + volume_increase = epsilon * other * self.element_flow.cost + self.element_flow.cost += volume_increase / self.element_flow.capacity + nonlocal dl + dl += volume_increase + return self + + class _EdgeFlow(self.EdgeFlow): + + def __init__(self, edge): + super().__init__(edge) + self.enabled = True + self.capacity = 10 ** 9 + self.cost = delta / self.capacity + self.flow = _FlowProxy(self) + + class _NodeFlow(self.NodeFlow): + + def __init__(self, node): + super().__init__(node) + self.enabled = True + self.capacity = 10 ** 9 + self.cost = delta / self.capacity + self.in_flow = _FlowProxy(self) + self.out_flow = Decimal(0) + + node_flows = {node: _NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()} + edge_flows = {edge: _EdgeFlow(edge) for edge in self.topology.edges.values()} + + demand_multiplier = 1 + iteration = 0 + while dl < 1: + # If the demands are very small compared to edge capacities, this algorithm will require an unreasonable + # number of iterations to complete. The following mechanism is described in the linked paper and is meant to + # speed up convergence time by doubling the demands multiple times. + iteration += 1 + if iteration > max_iterations: + iteration = 0 + demand_multiplier = demand_multiplier * 2 + + for dst_node, demand_towards_dst_node in demands.items(): + demand_towards_dst_node = {n: d * demand_multiplier for n, d in demand_towards_dst_node.items()} + + while dl < 1 and demand_towards_dst_node: + accepted_flows, demand_towards_dst_node = self._route_in_tree( + root_node=dst_node, + demand_towards_root_node=demand_towards_dst_node, + node_flow_provider=node_flows.get, + edge_flow_provider=edge_flows.get, + ) + for node, accepted_flow in accepted_flows.items(): + flow[dst_node][node] += accepted_flow + + # The computed flow violated edge and node capacity constraints. We need to scale it down. + lam = Decimal('inf') + for dst_node, demand_per_src in demands.items(): + for src_node, demand_from_src in demand_per_src.items(): + scaled_flow = flow[dst_node][src_node] / flow_scale_factor + flow[dst_node][src_node] = scaled_flow + lam = min(lam, scaled_flow / demand_from_src) + for node_state in node_flows.values(): + node_state.in_flow.value /= flow_scale_factor + node_state.out_flow /= flow_scale_factor + for edge_state in edge_flows.values(): + edge_state.flow.value /= flow_scale_factor + return flow + + @read_session + def optimize_flow(self, *, session: "Session"): + pledged_demand, pending_demand = self._demand_from_requests(session=session) + + node_flows = {node: self.NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()} + edge_flows = {edge: self.EdgeFlow(edge) for edge in self.topology.edges.values()} + for dst_node, demand_towards_dst_node in pledged_demand.items(): + inbound_flow_by_node, _ = self._route_in_tree( + root_node=dst_node, + demand_towards_root_node=demand_towards_dst_node, + node_flow_provider=node_flows.get, + edge_flow_provider=edge_flows.get, + ) + + self._karakostas_multicommodity_flow() + + def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str: """ an implementation of __str__ for a transfer path, which is a list of direct transfers, so not really an object diff --git a/lib/rucio/web/rest/flaskapi/v1/topology.py b/lib/rucio/web/rest/flaskapi/v1/topology.py new file mode 100644 index 00000000000..d3aeca81aaa --- /dev/null +++ b/lib/rucio/web/rest/flaskapi/v1/topology.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import flask +from flask import Flask, Response + +from rucio.api import request +from rucio.common.exception import RequestNotFound +from rucio.common.utils import APIEncoder, render_json +from rucio.core.rse import get_rses_with_attribute_value +from rucio.db.sqla.constants import RequestState +from rucio.web.rest.flaskapi.v1.common import check_accept_header_wrapper_flask, parse_scope_name, try_stream, \ + response_headers, generate_http_error_flask, ErrorHandlingMethodView +from rucio.web.rest.flaskapi.authenticated_bp import AuthenticatedBlueprint + + +class UsageGet(ErrorHandlingMethodView): + + @check_accept_header_wrapper_flask(['application/json']) + def get(self, scope_name, rse): + pass + + +class PathsGet(ErrorHandlingMethodView): + + @check_accept_header_wrapper_flask(['application/json']) + def get(self, scope_name, rse): + pass + + +def blueprint(): + bp = AuthenticatedBlueprint('topology', __name__, url_prefix='/topology') + + usage_get_view = UsageGet.as_view('usage_get') + bp.add_url_rule('/usage/', view_func=usage_get_view, methods=['get', ]) + paths_get_view = PathsGet.as_view('paths_get') + bp.add_url_rule('/paths///', view_func=paths_get_view, methods=['get', ]) + + bp.after_request(response_headers) + return bp + + +def make_doc(): + """ Only used for sphinx documentation """ + doc_app = Flask(__name__) + doc_app.register_blueprint(blueprint()) + return doc_app diff --git a/tests/test_transfer.py b/tests/test_transfer.py index a1c61d89e95..814da363267 100644 --- a/tests/test_transfer.py +++ b/tests/test_transfer.py @@ -377,3 +377,50 @@ def test_fk_error_on_source_creation(rse_factory, did_factory, root_account): transfer_path[0].rws.request_id = generate_uuid() to_submit, *_ = assign_paths_to_transfertool_and_create_hops(requests, default_tombstone_delay=0) assert not to_submit + + +@pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [ + 'rucio.core.rse_expression_parser.REGION', # The list of multihop RSEs is retrieved by an expression +]}], indirect=True) +def test_topology(rse_factory, did_factory, root_account, vo, caches_mock): + from rucio.core.rse import list_rses + from rucio.core.topology import Topology + from rucio.core.transfer import FlowManager + + topology = Topology(rse_ids=[rse['id'] for rse in list_rses()]).configure_multihop() + topology.ensure_loaded(load_name=True, load_columns=True, load_usage=True, load_info=True, load_attributes=True, load_limits=True) + topology.ensure_edges_loaded() + + rse1_name = 'XRD1' + rse1_id = rse_core.get_rse_id(rse=rse1_name, vo=vo) + rse2_name = 'XRD2' + rse2_id = rse_core.get_rse_id(rse=rse2_name, vo=vo) + rse3_name = 'XRD3' + rse3_id = rse_core.get_rse_id(rse=rse3_name, vo=vo) + rse4_name = 'XRD4' + rse4_id = rse_core.get_rse_id(rse=rse4_name, vo=vo) + + rse1 = topology[rse1_id] + rse2 = topology[rse2_id] + rse3 = topology[rse3_id] + rse4 = topology[rse4_id] + + demand = { + rse1: { + rse2: 10**9, + rse3: 10**9, + rse4: 10**9 + }, + #rse2: { + # rse1: 10**8, + # rse3: 10**8, + # rse4: 10**8 + #} + } + rse1.capacity = 10 ** 10 + rse2.capacity = 10 ** 9 + rse3.capacity = 10 ** 8 + rse4.capacity = 10 ** 9 + + flow_manager = FlowManager(topology) + flow_manager.optimize_flow()