Skip to content

Commit

Permalink
WIP: implement a flow overlay for topology
Browse files Browse the repository at this point in the history
Implement a wrapper class around topology which adds flow-related
information on top of the topological data. I.e. how much data is
entering/leaving a node and how much is crossing each link.

Initial implementation for the maximum concurrent flow algorithm
which would allow to find the near-optimal (fractional) placement
of the demand into the network.
  • Loading branch information
Radu Carpa committed Nov 14, 2023
1 parent 6c31d5a commit bf23967
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 2 deletions.
13 changes: 12 additions & 1 deletion lib/rucio/core/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def __init__(self, src_node: TN, dst_node: TN):
self.cost: _Number = 1
self.enabled: bool = True

self.__hash = None

self.add_to_nodes()

def add_to_nodes(self):
Expand Down Expand Up @@ -111,7 +113,12 @@ def __eq__(self, other):
return self._src_node == other._src_node and self._dst_node == other._dst_node

def __str__(self):
return f'{self._src_node}-->{self._dst_node}'
return f'{self.src_node}-->{self.dst_node}'

def __hash__(self):
if self.__hash is None:
self.__hash = hash((self.src_node, self.dst_node))
return self.__hash


class Topology(RseCollection, Generic[TN, TE]):
Expand Down Expand Up @@ -146,6 +153,10 @@ def get_or_create(self, rse_id: str) -> "TN":
self._edges_loaded = False
return rse_data

@property
def edges(self):
return self._edges

def edge(self, src_node: TN, dst_node: TN) -> "Optional[TE]":
return self._edges.get((src_node, dst_node))

Expand Down
252 changes: 251 additions & 1 deletion lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import traceback
from collections import defaultdict
from dataclasses import dataclass
from decimal import Decimal, localcontext
from typing import TYPE_CHECKING, cast

from dogpile.cache import make_region
Expand Down Expand Up @@ -57,7 +58,7 @@
from typing import Any, Optional
from sqlalchemy.orm import Session
from rucio.common.types import InternalAccount
from rucio.core.topology import Topology
from rucio.core.topology import Topology, Node, Edge

LoggerFunction = Callable[..., Any]

Expand Down Expand Up @@ -593,6 +594,255 @@ def slice_time(
older_t = older_t - resolution


class FlowManager:
class EdgeFlow:
def __init__(self, edge):
self.edge = edge
self.capacity = 10 ** 9
self.cost = 1
self.flow = 0

class NodeFlow:
def __init__(self, node):
self.node = node
self.capacity = 10 ** 9
self.cost = 1
self.in_flow = 0
self.out_flow = 0

def __init__(self, topology: "Topology"):
self.topology = topology

@read_session
def _demand_from_requests(self, *, session: "Session"):
"""
Scans the Requests table and returns the current demand
"""
topology = self.topology
db_stats = request_core.get_request_stats(
state=[RequestState.QUEUED,
RequestState.SUBMITTING,
RequestState.SUBMITTED,
RequestState.WAITING],
session=session,
)

pledged_demand = defaultdict(lambda: defaultdict(Decimal))
pending_demand = defaultdict(Decimal)
for db_stat in db_stats:
if (db_stat.dest_rse_id not in topology) or (db_stat.source_rse_id and db_stat.source_rse_id not in topology):
# The RSE was deleted. Ignore
continue

src_node = topology[db_stat.source_rse_id] if db_stat.source_rse_id else None
dst_node = topology[db_stat.dest_rse_id]
if src_node:
pledged_demand[dst_node][src_node] += Decimal(db_stat.bytes)
else:
pending_demand[dst_node] += Decimal(db_stat.bytes)

return pledged_demand, pending_demand

def _route_in_tree(
self,
root_node,
demand_towards_root_node: "Mapping[Node, Decimal]",
node_flow_provider: "Callable[[Node], NodeFlow]",
edge_flow_provider: "Callable[[Edge], EdgeFlow]",
ignore_capacity: bool = False,
):
"""
Compute the shortest path tree towards the root node from all other nodes.
Route the demands in this tree and increase the associated flow.
"""
accepted_demand_towards_root_node = {}
remaining_demand_towards_root_node = {}
sorted_nodes = list(
self.topology.dijkstra_spf(
dst_node=root_node,
nodes_to_find=set(demand_towards_root_node),
node_state_provider=node_flow_provider,
edge_state_provider=edge_flow_provider,
)
)
# Adding the demand may result in an edge or node to become overloaded. record by how much it was overloaded
overflow_ratio = Decimal(1)
inbound_flow_by_node = defaultdict(Decimal)
for node, _, node_state, edge_to_next_hop, edge_state in reversed(sorted_nodes):
next_hop = edge_to_next_hop.dst_node

inbound_flow = inbound_flow_by_node[node]
# Whatever flow goes out of the current node will have to go into the next hop
outbound_flow = inbound_flow + demand_towards_root_node.get(node, Decimal(0))
inbound_flow_by_node[next_hop] = inbound_flow_by_node[next_hop] + outbound_flow
# Detect if we overload a node or an edge. And by how much
if not ignore_capacity:
overflow_ratio = max(
overflow_ratio,
inbound_flow / node_state.capacity,
outbound_flow / edge_state.capacity
)
if not ignore_capacity:
overflow_ratio = max(overflow_ratio, inbound_flow_by_node[root_node] / root_node.capacity)

# Push each demand. If overflow_ratio is bigger than one, only push the fraction of demand which
# doesn't overload anything.
sorted_nodes.append((root_node, 0, node_flow_provider(root_node), None, None))
for node, _, node_state, edge_to_next_hop, edge_state in sorted_nodes:
desired_demand = demand_towards_root_node.get(node, Decimal(0))

# Only accept the fraction of the demand which doesn't overload a node or an edge
accepted_demand = desired_demand / overflow_ratio
accepted_inbound = inbound_flow_by_node[node] / overflow_ratio
accepted_outbound = accepted_inbound + accepted_demand

if desired_demand > 0:
remaining_demand = desired_demand - accepted_demand
accepted_demand_towards_root_node[node] = accepted_demand
if remaining_demand > 0:
remaining_demand_towards_root_node[node] = remaining_demand

if accepted_inbound > 0:
node_state.in_flow += accepted_inbound

if edge_state is not None and accepted_outbound > 0:
node_state.out_flow += accepted_outbound
edge_state.flow += accepted_outbound

return accepted_demand_towards_root_node, remaining_demand_towards_root_node

@METRICS.time_it()
def _karakostas_multicommodity_flow(
self,
demands: "Mapping[Node, Mapping[Node, float | Decimal]]",
epsilon: Decimal = Decimal('0.1')
):
"""
Compute the maximum multicommodity flow [1]. This corresponds to the most load-balanced way of transferring the given demand.
The input demand must be in the format {source_node: {destination_node: amount_of_bytes_to_transfer_from_source_to_destination}}
The resulted solution is fractional: a flow from source_node to destination_node can be split at will over all possible paths.
Uses the following algorithm. The result will be within (1 - epsilon)**(-3) from the optimal solution.
[1] Karakostas, G. (2008). Faster approximation schemes for fractional multicommodity flow problems. ACM Transactions on Algorithms (TALG)
"""
flow = defaultdict(lambda: defaultdict(Decimal))
if not demands:
return None, None

with localcontext() as ctx:
# We will manipulate very small numbers
ctx.prec = 50

num_n_e = len(self.topology.rse_id_to_data_map) + len(self.topology.edges)
# constants are described (with the accompanying proof) in the linked paper
delta = (Decimal(1) / pow(Decimal(1) + epsilon, (Decimal(1) - epsilon) / epsilon)) * pow((Decimal(1) - epsilon) / num_n_e, Decimal(1) / epsilon)
flow_scale_factor = ((Decimal(1) + epsilon) / delta).ln() / (Decimal(1) + epsilon).ln()
max_iterations = 2 * (Decimal(1) / epsilon) * (num_n_e * (Decimal(1) + epsilon) / (Decimal(1) - epsilon)).ln() / (Decimal(1) + epsilon).ln()

dl = Decimal(delta * num_n_e)

class _FlowProxy:
"""
Stores the value of a flow while, also, automatically adjusting the cost
and the total volume (as neded for the Karakosta's algorithm) each time
an augmenting flow is pushed through the graph.
"""
def __init__(self, element_flow: "_NodeFlow | _EdgeFlow"):
self.value = Decimal(0)
self.element_flow = element_flow

def __add__(self, other):
self.value += other
volume_increase = epsilon * other * self.element_flow.cost
self.element_flow.cost += volume_increase / self.element_flow.capacity
nonlocal dl
dl += volume_increase
return self

class _EdgeFlow(self.EdgeFlow):

def __init__(self, edge):
super().__init__(edge)
self.enabled = True
self.capacity = 10 ** 9
self.cost = delta / self.capacity
self.flow = _FlowProxy(self)

class _NodeFlow(self.NodeFlow):

def __init__(self, node):
super().__init__(node)
self.enabled = True
self.capacity = 10 ** 9
self.cost = delta / self.capacity
self.in_flow = _FlowProxy(self)
self.out_flow = Decimal(0)

node_flows = {node: _NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()}
edge_flows = {edge: _EdgeFlow(edge) for edge in self.topology.edges.values()}

demand_multiplier = 1
iteration = 0
while dl < 1:
# If the demands are very small compared to edge capacities, this algorithm will require an unreasonable
# number of iterations to complete. The following mechanism is described in the linked paper and is meant to
# speed up convergence time by doubling the demands multiple times.
iteration += 1
if iteration > max_iterations:
iteration = 0
demand_multiplier = demand_multiplier * 2

for dst_node, demand_towards_dst_node in demands.items():
demand_towards_dst_node = {n: d * demand_multiplier for n, d in demand_towards_dst_node.items()}

while dl < 1 and demand_towards_dst_node:
accepted_flows, demand_towards_dst_node = self._route_in_tree(
root_node=dst_node,
demand_towards_root_node=demand_towards_dst_node,
node_flow_provider=node_flows.get,
edge_flow_provider=edge_flows.get,
)
for node, accepted_flow in accepted_flows.items():
flow[dst_node][node] += accepted_flow

# The computed flow violated edge and node capacity constraints. We need to scale it down.
lam = Decimal('inf')
for dst_node, demand_per_src in demands.items():
for src_node, demand_from_src in demand_per_src.items():
scaled_flow = flow[dst_node][src_node] / flow_scale_factor
flow[dst_node][src_node] = scaled_flow
lam = min(lam, scaled_flow / demand_from_src)
for node_state in node_flows.values():
node_state.in_flow.value /= flow_scale_factor
node_state.out_flow /= flow_scale_factor
for edge_state in edge_flows.values():
edge_state.flow.value /= flow_scale_factor
return node_flows, edge_flows

@read_session
def optimize_flow(self, *, session: "Session"):
pledged_demand, pending_demand = self._demand_from_requests(session=session)

# Compute the current status of the network
current_node_flow = {node: self.NodeFlow(node) for node in self.topology.rse_id_to_data_map.values()}
current_edge_flow = {edge: self.EdgeFlow(edge) for edge in self.topology.edges.values()}
for dst_node, demand_towards_dst_node in pledged_demand.items():
inbound_flow_by_node, _ = self._route_in_tree(
root_node=dst_node,
demand_towards_root_node=demand_towards_dst_node,
node_flow_provider=current_node_flow.get,
edge_flow_provider=current_edge_flow.get,
)

# Compute what would be the optimal flow in the network
desired_node_flow, desired_edge_flow = self._karakostas_multicommodity_flow(pledged_demand)

# TODO: combine the information from the current flow and the desired flow
# to pick the destination (during rule evaluation) or source (for transfers)
# - select in priority if desired_flow > current_flow
# - avoid if desired_flow < current_flow


def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str:
"""
an implementation of __str__ for a transfer path, which is a list of direct transfers, so not really an object
Expand Down

0 comments on commit bf23967

Please sign in to comment.