Skip to content

Commit

Permalink
WIP: implement a flow overlay for topology
Browse files Browse the repository at this point in the history
Implement a wrapper class around topology which adds flow-related
information on top of the topological data. I.e. how much data is
entering/leaving a node and how much is crossing each link.

Initial implementation for the maximum concurrent flow algorithm
which would allow to find the near-optimal (fractional) placement
of the demand into the network.
  • Loading branch information
Radu Carpa committed Oct 13, 2023
1 parent 76107a0 commit c89dfa6
Show file tree
Hide file tree
Showing 4 changed files with 381 additions and 9 deletions.
31 changes: 24 additions & 7 deletions lib/rucio/core/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import threading
import weakref
from collections.abc import Callable, Iterable, Iterator
from decimal import Decimal
from typing import TYPE_CHECKING, cast, Any, Generic, Optional, TypeVar, Union

from sqlalchemy import and_, select
Expand All @@ -32,7 +33,7 @@
from rucio.rse import rsemanager as rsemgr

LoggerFunction = Callable[..., Any]
_Number = Union[float, int]
_Number = Union[float, int, Decimal]
TN = TypeVar("TN", bound="Node")
TE = TypeVar("TE", bound="Edge")

Expand All @@ -41,6 +42,7 @@

from typing import Protocol


class _StateProvider(Protocol):
@property
def cost(self) -> _Number:
Expand All @@ -50,6 +52,9 @@ def cost(self) -> _Number:
def enabled(self) -> bool:
...

TNState = TypeVar("TNState", bound=_StateProvider)
TEState = TypeVar("TEState", bound=_StateProvider)


DEFAULT_HOP_PENALTY = 10
INF = float('inf')
Expand All @@ -75,6 +80,8 @@ def __init__(self, src_node: TN, dst_node: TN):
self.cost: _Number = 1
self.enabled: bool = True

self.__hash = None

self.add_to_nodes()

def add_to_nodes(self):
Expand Down Expand Up @@ -107,13 +114,19 @@ def __eq__(self, other):
return self._src_node == other._src_node and self._dst_node == other._dst_node

def __str__(self):
return f'{self._src_node}-->{self._dst_node}'
return f'{self.src_node}-->{self.dst_node}'

def __hash__(self):
if self.__hash is None:
self.__hash = hash((self.src_node, self.dst_node))
return self.__hash


class Topology(RseCollection, Generic[TN, TE]):
"""
Helper private class used to easily fetch topological information for a subset of RSEs.
"""

def __init__(
self,
rse_ids: Optional[Iterable[str]] = None,
Expand Down Expand Up @@ -142,6 +155,10 @@ def get_or_create(self, rse_id: str) -> "TN":
self._edges_loaded = False
return rse_data

@property
def edges(self):
return self._edges

def edge(self, src_node: TN, dst_node: TN) -> "Optional[TE]":
return self._edges.get((src_node, dst_node))

Expand Down Expand Up @@ -344,9 +361,9 @@ def dijkstra_spf(
self,
dst_node: TN,
nodes_to_find: Optional[set[TN]] = None,
node_state_provider: "Callable[[TN], _StateProvider]" = lambda x: x,
edge_state_provider: "Callable[[TE], _StateProvider]" = lambda x: x,
) -> "Iterator[tuple[TN, _Number, _StateProvider, TE, _StateProvider]]":
node_state_provider: "Callable[[TN], TNState]" = lambda x: x,
edge_state_provider: "Callable[[TE], TEState]" = lambda x: x,
) -> "Iterator[tuple[TN, _Number, TNState, TE, TEState]]":
"""
Does a Backwards Dijkstra's algorithm: start from destination and follow inbound links to other nodes.
If multihop is disabled, stop after analysing direct connections to dest_rse.
Expand All @@ -358,7 +375,7 @@ def dijkstra_spf(

priority_q = PriorityQueue()
priority_q[dst_node] = 0
next_hops: dict[TN, tuple[_Number, _StateProvider, Optional[TE], Optional[_StateProvider]]] =\
next_hops: dict[TN, tuple[_Number, _StateProvider, Optional[TE], Optional[_StateProvider]]] = \
{dst_node: (0, node_state_provider(dst_node), None, None)}
while priority_q:
node = priority_q.pop()
Expand All @@ -376,7 +393,7 @@ def dijkstra_spf(

edge_state = edge_state_provider(edge)
new_adjacent_dist = node_dist + node_state.cost + edge_state.cost
if new_adjacent_dist < next_hops.get(adjacent_node, (INF, ))[0] and edge_state.enabled:
if new_adjacent_dist < next_hops.get(adjacent_node, (INF,))[0] and edge_state.enabled:
adj_node_state = node_state_provider(adjacent_node)
next_hops[adjacent_node] = new_adjacent_dist, adj_node_state, edge, edge_state
priority_q[adjacent_node] = new_adjacent_dist
Expand Down
251 changes: 249 additions & 2 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import re
import time
import traceback
from collections import defaultdict
from decimal import Decimal, localcontext
from typing import TYPE_CHECKING

from dogpile.cache import make_region
Expand Down Expand Up @@ -49,10 +51,10 @@

if TYPE_CHECKING:
from collections.abc import Callable, Generator, Iterable
from typing import Any, Optional
from typing import Any, Optional, Mapping
from sqlalchemy.orm import Session
from rucio.common.types import InternalAccount
from rucio.core.topology import Topology
from rucio.core.topology import Topology, Node, Edge

LoggerFunction = Callable[..., Any]

Expand Down Expand Up @@ -285,6 +287,251 @@ def legacy_sources(self):
return self._legacy_sources


class FlowManager:
class EdgeFlow:
def __init__(self, edge):
self.edge = edge
self.capacity = 10 ** 9
self.cost = 1
self.flow = 0

class NodeFlow:
def __init__(self, node):
self.node = node
self.capacity = 10 ** 9
self.cost = 1
self.in_flow = 0
self.out_flow = 0

def __init__(self, topology: "Topology"):
self.topology = topology

@read_session
def _demand_from_requests(self, *, session: "Session"):
"""
Scans the Requests table and returns the current demand
"""
topology = self.topology
db_stats = request_core.get_request_stats(
state=[RequestState.QUEUED,
RequestState.SUBMITTING,
RequestState.SUBMITTED,
RequestState.WAITING],
session=session,
)

pledged_demand = defaultdict(lambda: defaultdict(Decimal))
pending_demand = defaultdict(Decimal)
for db_stat in db_stats:
if (db_stat.dest_rse_id not in topology) or (db_stat.source_rse_id and db_stat.source_rse_id not in topology):
# The RSE was deleted. Ignore
continue

src_node = topology[db_stat.source_rse_id] if db_stat.source_rse_id else None
dst_node = topology[db_stat.dest_rse_id]
if src_node:
pledged_demand[dst_node][src_node] += Decimal(db_stat.bytes)
else:
pending_demand[dst_node] += Decimal(db_stat.bytes)

return pledged_demand, pending_demand

def _capacity_from_requests_history(self, *, session: "Session"):
pass

def _route_in_tree(
self,
root_node,
demand_towards_root_node: "Mapping[Node, Decimal]",
node_flow_provider: "Callable[[Node], NodeFlow]",
edge_flow_provider: "Callable[[Edge], EdgeFlow]",
ignore_capacity: bool = False,
):
"""
Compute the shortest path tree towards the root node from all other nodes.
Route the demands in this tree and increase the associated flow.
"""
accepted_demand_towards_root_node = {}
remaining_demand_towards_root_node = {}
sorted_nodes = list(
self.topology.dijkstra_spf(
dst_node=root_node,
nodes_to_find=set(demand_towards_root_node),
node_state_provider=node_flow_provider,
edge_state_provider=edge_flow_provider,
)
)
# Adding the demand may result in an edge or node to become overloaded. record by how much it was overloaded
overflow_ratio = Decimal(1)
inbound_flow_by_node = defaultdict(Decimal)
for node, _, node_state, edge_to_next_hop, edge_state in reversed(sorted_nodes):
next_hop = edge_to_next_hop.dst_node

inbound_flow = inbound_flow_by_node[node]
# Whatever flow goes out of the current node will have to go into the next hop
outbound_flow = inbound_flow + demand_towards_root_node.get(node, Decimal(0))
inbound_flow_by_node[next_hop] = inbound_flow_by_node[next_hop] + outbound_flow
# Detect if we overload a node or an edge. And by how much
if not ignore_capacity:
overflow_ratio = max(
overflow_ratio,
inbound_flow / node_state.capacity,
outbound_flow / edge_state.capacity
)
if not ignore_capacity:
overflow_ratio = max(overflow_ratio, inbound_flow_by_node[root_node] / root_node.capacity)

# Push each demand. If overflow_ratio is bigger than one, only push the fraction of demand which
# doesn't overload anything.
sorted_nodes.append((root_node, 0, node_flow_provider(root_node), None, None))
for node, _, node_state, edge_to_next_hop, edge_state in sorted_nodes:
desired_demand = demand_towards_root_node.get(node, Decimal(0))

# Only accept the fraction of the demand which doesn't overload a node or an edge
accepted_demand = desired_demand / overflow_ratio
accepted_inbound = inbound_flow_by_node[node] / overflow_ratio
accepted_outbound = accepted_inbound + accepted_demand

if desired_demand > 0:
remaining_demand = desired_demand - accepted_demand
accepted_demand_towards_root_node[node] = accepted_demand
if remaining_demand > 0:
remaining_demand_towards_root_node[node] = remaining_demand

if accepted_inbound > 0:
node_state.in_flow += accepted_inbound

if edge_state is not None and accepted_outbound > 0:
node_state.out_flow += accepted_outbound
edge_state.flow += accepted_outbound

return accepted_demand_towards_root_node, remaining_demand_towards_root_node

@METRICS.time_it()
def _karakostas_multicommodity_flow(
self,
demands: "Mapping[Node, Mapping[Node, float]]",
epsilon: Decimal = Decimal('0.1')
):
"""
Compute the maximum multicommodity flow [1]. This corresponds to the most load-balanced way of transferring the given demand.
The input demand must be in the format {source_node: {destination_node: amount_of_bytes_to_transfer_from_source_to_destination}}
The resulted solution is fractional: a flow from source_node to destination_node can be split at will over all possible paths.
Uses the following algorithm. The result will be within (1 - epsilon)**(-3) from the optimal solution.
[1] Karakostas, G. (2008). Faster approximation schemes for fractional multicommodity flow problems. ACM Transactions on Algorithms (TALG)
"""
flow = defaultdict(lambda: defaultdict(Decimal))
if not demands:
return flow

with localcontext() as ctx:
# We will manipulate very small numbers
ctx.prec = 50

num_n_e = len(self.topology.rse_id_to_data_map) + len(self.topology.edges)
# constants are described (with the accompanying proof) in the linked paper
delta = (Decimal(1) / pow(Decimal(1) + epsilon, (Decimal(1) - epsilon) / epsilon)) * pow((Decimal(1) - epsilon) / num_n_e, Decimal(1) / epsilon)
flow_scale_factor = ((Decimal(1) + epsilon) / delta).ln() / (Decimal(1) + epsilon).ln()
max_iterations = 2 * (Decimal(1) / epsilon) * (num_n_e * (Decimal(1) + epsilon) / (Decimal(1) - epsilon)).ln() / (Decimal(1) + epsilon).ln()

dl = Decimal(delta * num_n_e)

class _FlowProxy:
"""
Stores the value of a flow while, also, automatically adjusting the cost
and the total volume (as neded for the Karakosta's algorithm) each time
an augmenting flow is pushed through the graph.
"""
def __init__(self, element_flow: "_NodeFlow | _EdgeFlow"):
self.value = Decimal(0)
self.element_flow = element_flow

def __add__(self, other):
self.value += other
volume_increase = epsilon * other * self.element_flow.cost
self.element_flow.cost += volume_increase / self.element_flow.capacity
nonlocal dl
dl += volume_increase
return self

class _EdgeFlow(self.EdgeFlow):

def __init__(self, edge):
super().__init__(edge)
self.enabled = True
self.capacity = 10 ** 9
self.cost = delta / self.capacity
self.flow = _FlowProxy(self)

class _NodeFlow(self.NodeFlow):

def __init__(self, node):
super().__init__(node)
self.enabled = True
self.capacity = 10 ** 9
self.cost = delta / self.capacity
self.in_flow = _FlowProxy(self)
self.out_flow = Decimal(0)

node_flows = {node: _NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()}
edge_flows = {edge: _EdgeFlow(edge) for edge in self.topology.edges.values()}

demand_multiplier = 1
iteration = 0
while dl < 1:
# If the demands are very small compared to edge capacities, this algorithm will require an unreasonable
# number of iterations to complete. The following mechanism is described in the linked paper and is meant to
# speed up convergence time by doubling the demands multiple times.
iteration += 1
if iteration > max_iterations:
iteration = 0
demand_multiplier = demand_multiplier * 2

for dst_node, demand_towards_dst_node in demands.items():
demand_towards_dst_node = {n: d * demand_multiplier for n, d in demand_towards_dst_node.items()}

while dl < 1 and demand_towards_dst_node:
accepted_flows, demand_towards_dst_node = self._route_in_tree(
root_node=dst_node,
demand_towards_root_node=demand_towards_dst_node,
node_flow_provider=node_flows.get,
edge_flow_provider=edge_flows.get,
)
for node, accepted_flow in accepted_flows.items():
flow[dst_node][node] += accepted_flow

# The computed flow violated edge and node capacity constraints. We need to scale it down.
lam = Decimal('inf')
for dst_node, demand_per_src in demands.items():
for src_node, demand_from_src in demand_per_src.items():
scaled_flow = flow[dst_node][src_node] / flow_scale_factor
flow[dst_node][src_node] = scaled_flow
lam = min(lam, scaled_flow / demand_from_src)
for node_state in node_flows.values():
node_state.in_flow.value /= flow_scale_factor
node_state.out_flow /= flow_scale_factor
for edge_state in edge_flows.values():
edge_state.flow.value /= flow_scale_factor
return flow

@read_session
def optimize_flow(self, *, session: "Session"):
pledged_demand, pending_demand = self._demand_from_requests(session=session)

node_flows = {node: self.NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()}
edge_flows = {edge: self.EdgeFlow(edge) for edge in self.topology.edges.values()}
for dst_node, demand_towards_dst_node in pledged_demand.items():
inbound_flow_by_node, _ = self._route_in_tree(
root_node=dst_node,
demand_towards_root_node=demand_towards_dst_node,
node_flow_provider=node_flows.get,
edge_flow_provider=edge_flows.get,
)

self._karakostas_multicommodity_flow()


def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str:
"""
an implementation of __str__ for a transfer path, which is a list of direct transfers, so not really an object
Expand Down
Loading

0 comments on commit c89dfa6

Please sign in to comment.