Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Radu Carpa committed Sep 26, 2023
1 parent 6e6b466 commit ee28ce4
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 171 deletions.
156 changes: 16 additions & 140 deletions lib/rucio/core/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@
import threading
import weakref
from collections.abc import Callable, Iterable, Iterator
from collections import defaultdict
from decimal import Decimal, localcontext
from decimal import Decimal
from typing import TYPE_CHECKING, cast, Any, Generic, Optional, TypeVar, Union

from sqlalchemy import and_, select

from rucio.common.config import config_get_int, config_get
from rucio.common.exception import NoDistance, RSEProtocolNotSupported, InvalidRSEExpression
from rucio.common.utils import PriorityQueue
from rucio.core.monitor import MetricManager
from rucio.core.rse import RseCollection, RseData
from rucio.core.rse_expression_parser import parse_expression
from rucio.db.sqla import models
from rucio.db.sqla.session import read_session, transactional_session
from rucio.rse import rsemanager as rsemgr

LoggerFunction = Callable[..., Any]
_Number = Union[float, int]
_Number = Union[float, int, Decimal]
TN = TypeVar("TN", bound="Node")
TE = TypeVar("TE", bound="Edge")

Expand All @@ -44,6 +42,7 @@

from typing import Protocol


class _StateProvider(Protocol):
@property
def cost(self) -> _Number:
Expand All @@ -53,8 +52,10 @@ def cost(self) -> _Number:
def enabled(self) -> bool:
...

TNState = TypeVar("TNState", bound=_StateProvider)
TEState = TypeVar("TEState", bound=_StateProvider)


METRICS = MetricManager(module=__name__)
DEFAULT_HOP_PENALTY = 10
INF = float('inf')

Expand All @@ -67,7 +68,6 @@ def __init__(self, rse_id: str):
self.out_edges = weakref.WeakKeyDictionary()

self.cost: _Number = 0
self.capacity: _Number = 10**9
self.enabled: bool = True
self.used_for_multihop = False

Expand All @@ -78,7 +78,6 @@ def __init__(self, src_node: TN, dst_node: TN):
self._dst_node = weakref.ref(dst_node)

self.cost: _Number = 1
self.capacity: _Number = 10**9
self.enabled: bool = True

self.__hash = None
Expand Down Expand Up @@ -127,6 +126,7 @@ class Topology(RseCollection, Generic[TN, TE]):
"""
Helper private class used to easily fetch topological information for a subset of RSEs.
"""

def __init__(
self,
rse_ids: Optional[Iterable[str]] = None,
Expand Down Expand Up @@ -155,6 +155,10 @@ def get_or_create(self, rse_id: str) -> "TN":
self._edges_loaded = False
return rse_data

@property
def edges(self):
return self._edges

def edge(self, src_node: TN, dst_node: TN) -> "Optional[TE]":
return self._edges.get((src_node, dst_node))

Expand Down Expand Up @@ -357,9 +361,9 @@ def dijkstra_spf(
self,
dst_node: TN,
nodes_to_find: Optional[set[TN]] = None,
node_state_provider: "Callable[[TN], _StateProvider]" = lambda x: x,
edge_state_provider: "Callable[[TE], _StateProvider]" = lambda x: x,
) -> "Iterator[tuple[TN, _Number, _StateProvider, TE, _StateProvider]]":
node_state_provider: "Callable[[TN], TNState]" = lambda x: x,
edge_state_provider: "Callable[[TE], TEState]" = lambda x: x,
) -> "Iterator[tuple[TN, _Number, TNState, TE, TEState]]":
"""
Does a Backwards Dijkstra's algorithm: start from destination and follow inbound links to other nodes.
If multihop is disabled, stop after analysing direct connections to dest_rse.
Expand All @@ -371,7 +375,7 @@ def dijkstra_spf(

priority_q = PriorityQueue()
priority_q[dst_node] = 0
next_hops: dict[TN, tuple[_Number, _StateProvider, Optional[TE], Optional[_StateProvider]]] =\
next_hops: dict[TN, tuple[_Number, _StateProvider, Optional[TE], Optional[_StateProvider]]] = \
{dst_node: (0, node_state_provider(dst_node), None, None)}
while priority_q:
node = priority_q.pop()
Expand All @@ -389,139 +393,11 @@ def dijkstra_spf(

edge_state = edge_state_provider(edge)
new_adjacent_dist = node_dist + node_state.cost + edge_state.cost
if new_adjacent_dist < next_hops.get(adjacent_node, (INF, ))[0] and edge_state.enabled:
if new_adjacent_dist < next_hops.get(adjacent_node, (INF,))[0] and edge_state.enabled:
adj_node_state = node_state_provider(adjacent_node)
next_hops[adjacent_node] = new_adjacent_dist, adj_node_state, edge, edge_state
priority_q[adjacent_node] = new_adjacent_dist

@METRICS.time_it()
def karakostas_multicommodity_flow(self, demands, epsilon: Decimal = Decimal('0.1')):
"""
Compute the maximum multicommodity flow [1]. This corresponds to the most load-balanced way of transferring the given demand.
The input demand must be in the format {source_node: {destination_node: amount_of_bytes_to_transfer_from_source_to_destination}}
The resulted solution is fractional: a flow from source_node to destination_node can be split at will over all possible paths.
Uses the following algorithm. The result will be within (1 - epsilon)**(-3) from the optimal solution.
[1] Karakostas, G. (2008). Faster approximation schemes for fractional multicommodity flow problems. ACM Transactions on Algorithms (TALG)
"""
flow = defaultdict(lambda: defaultdict(Decimal))

total_flow_from_node = defaultdict(Decimal)
total_flow_to_node = defaultdict(Decimal)
total_flow_via_edge = defaultdict(Decimal)

with localcontext() as ctx:
# We will manipulate very small numbers
ctx.prec = 50

num_n_e = len(self.rse_id_to_data_map) + len(self._edges)
# constants are described (with the accompanying proof) in the linked paper
delta = (Decimal(1) / pow(Decimal(1) + epsilon, (Decimal(1) - epsilon) / epsilon)) * pow((Decimal(1) - epsilon) / num_n_e, Decimal(1) / epsilon)
flow_scale_factor = ((Decimal(1) + epsilon) / delta).ln() / (Decimal(1) + epsilon).ln()
max_iterations = 2 * (Decimal(1) / epsilon) * (num_n_e * (Decimal(1) + epsilon) / (Decimal(1) - epsilon)).ln() / (Decimal(1) + epsilon).ln()

costs = {}

class _DictStateProvider:
enabled = True

def __init__(self, element: "TN | TE"):
self.cost = costs.get(element, None)
if self.cost is None:
costs[element] = self.cost = delta / element.capacity

demand_multiplier = 1
iteration = 0
dl = Decimal(delta * num_n_e)
while dl < 1:

# If the demands are very small compared to edge capacities, this algorithm will require an unreasonable
# number of iterations to complete. The following mechanism is described in the linked paper and is meant to
# speed up convergence time by doubling the demands multiple times.
iteration += 1
if iteration > max_iterations:
iteration = 0
demand_multiplier = demand_multiplier * 2

for dst_node, demand_towards_dst_node in demands.items():
demand_towards_dst_node = {n: d * demand_multiplier for n, d in demand_towards_dst_node.items()}

any_remaining_demand = True
while dl < 1 and any_remaining_demand:
sorted_nodes = list(self.dijkstra_spf(dst_node=dst_node,
nodes_to_find=set(demand_towards_dst_node),
node_state_provider=_DictStateProvider,
edge_state_provider=_DictStateProvider))

# Find how much flow will pass via every node and edge if we push all demands via the shortest paths.
# This may result in an edge or node to get overloaded. overflow_ratio will record by how much it was overloaded
overflow_ratio = Decimal(1)
inbound_flow_by_node = defaultdict(Decimal)
for node, _, _, edge_to_next_hop, _ in reversed(sorted_nodes):
next_hop = edge_to_next_hop.dst_node

inbound_flow = inbound_flow_by_node[node]
# Whatever flow goes out of the current node will have to go into the next hop
outbound_flow = inbound_flow + demand_towards_dst_node.get(node, Decimal(0))
inbound_flow_by_node[next_hop] = inbound_flow_by_node[next_hop] + outbound_flow
# Only accept a fraction of the demand if it risks to overload any edge or node
overflow_ratio = max(
overflow_ratio,
inbound_flow / node.capacity,
outbound_flow / edge_to_next_hop.capacity,
)
overflow_ratio = max(overflow_ratio, inbound_flow_by_node[dst_node] / dst_node.capacity)

# Push each demand. If overflow_ratio is bigger than one, only push the fraction of demand which
# doesn't overload anything.
any_remaining_demand = False
sorted_nodes.append((dst_node, None, None, None, None))
for node, _, _, edge_to_next_hop, _ in sorted_nodes:
desired_demand = demand_towards_dst_node.get(node, Decimal(0))

accepted_demand = desired_demand / overflow_ratio
accepted_inbound = inbound_flow_by_node[node] / overflow_ratio
accepted_outbound = accepted_inbound + accepted_demand

if desired_demand > 0:
remaining_demand = desired_demand - accepted_demand
demand_towards_dst_node[node] = remaining_demand
flow[dst_node][node] += accepted_demand
if remaining_demand:
any_remaining_demand = True

node_volume_increase = 0
if accepted_inbound > 0:
total_flow_to_node[node] += accepted_inbound

node_volume_increase = epsilon * accepted_inbound * costs[node]
costs[node] += node_volume_increase / node.capacity

edge_volume_increase = 0
if edge_to_next_hop is not None and accepted_outbound > 0:
total_flow_from_node[node] += accepted_outbound
total_flow_via_edge[edge_to_next_hop] += accepted_outbound

edge_volume_increase = epsilon * accepted_outbound * costs[edge_to_next_hop]
costs[edge_to_next_hop] += edge_volume_increase / edge_to_next_hop.capacity

dl += edge_volume_increase + node_volume_increase

# The computed flow violated edge and node capacity constraints. We need to scale it down.
lam = Decimal('inf')
for dst_node, demand_per_src in demands.items():
for src_node, demand_from_src in demand_per_src.items():
scaled_flow = flow[dst_node][src_node] / flow_scale_factor
flow[dst_node][src_node] = scaled_flow
lam = min(lam, scaled_flow / demand_from_src)
for node in total_flow_to_node:
total_flow_to_node[node] /= flow_scale_factor
for node in total_flow_from_node:
total_flow_from_node[node] /= flow_scale_factor
for edge in total_flow_via_edge:
total_flow_via_edge[edge] /= flow_scale_factor
print(lam)


class ExpiringObjectCache:
"""
Expand Down
Loading

0 comments on commit ee28ce4

Please sign in to comment.