Skip to content

Commit

Permalink
WIP: gather and store transfer statistics
Browse files Browse the repository at this point in the history
Each time the poller or receiver transitions a transfer to a different
state, use the information to update the transfer statistic.
The goal is to be able to answer following questions in rucio:
 - how many successful transfers happened towards RSE1 yesterday
 - what was the average transfer rate from RSE1 to RSE2 last hour
Short-term it will allow to visualize the transfer status. Longer-term,
we may use this data to take better scheduling decisions (selection of
destination RSE during rule evaluation; selection of source RSE for
transfers).

Each poller/receiver will gather statistics over a certain time window
before committing the aggregated stats gathered over the previous
time interval into the database. This reduces the number of database
writes at the cost of potentially losing more metrics in case of a
crash. We keep track of the total number of failed/successful
transfers in the given time window; plus the number of transferred
bytes.

Metrics will be aggregated regularly into lower-resolution samples.
Aggregation is done by summing higher resolution metrics. Lower
resolution samples will be stored for a longer period in the database.

The behavior was heavily inspired by existing time series databases,
like prometheus and influxdb. The possibility to use an external tool
for this job was discussed, but rejected.
  • Loading branch information
Radu Carpa committed Oct 13, 2023
1 parent c89dfa6 commit 7dad0bd
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 1 deletion.
221 changes: 220 additions & 1 deletion lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
import datetime
import logging
import re
import threading
import time
import traceback
from collections import defaultdict
from dataclasses import dataclass
from decimal import Decimal, localcontext
from typing import TYPE_CHECKING

from dogpile.cache import make_region
from dogpile.cache.api import NoValue
from sqlalchemy import select, update
from sqlalchemy import delete, func, insert, select, update
from sqlalchemy.exc import IntegrityError

from rucio.common import constants
Expand Down Expand Up @@ -287,6 +289,205 @@ def legacy_sources(self):
return self._legacy_sources


class TransferStatsManager:

@dataclass
class _StatsRecord:
files_failed: int = 0
files_done: int = 0
bytes_done: int = 0

def __init__(self):
self.lock = threading.Lock()

retentions = sorted([
# resolution, retention
(datetime.timedelta(seconds=5), datetime.timedelta(seconds=10)),
(datetime.timedelta(seconds=10), datetime.timedelta(seconds=20)),
(datetime.timedelta(seconds=20), datetime.timedelta(minutes=2)),
#(datetime.timedelta(minutes=5), datetime.timedelta(hours=1)),
#(datetime.timedelta(hours=1), datetime.timedelta(days=1)),
#(datetime.timedelta(days=1), datetime.timedelta(days=30)),
])

self.retentions = retentions
self.observe_resolution = self.retentions[0][0]

self.current_timestamp, _ = next(self.slice_time(self.observe_resolution, start_time=datetime.datetime.utcnow() + self.observe_resolution))
self.current_samples = defaultdict(lambda: self._StatsRecord())

def observe(self, src_rse_id: str, dst_rse_id: str, activity: str, state: RequestState, file_size: int, *, session: "Optional[Session]" = None):
now = datetime.datetime.utcnow()
samples_to_save = None
with self.lock:
if now > self.current_timestamp:
if self.current_samples:
samples_to_save = {self.current_timestamp: self.current_samples}
self.current_timestamp, _ = next(self.slice_time(self.observe_resolution, start_time=now + self.observe_resolution))

if state in (RequestState.DONE, RequestState.FAILED):
record = self.current_samples[dst_rse_id, src_rse_id, activity]
if state == RequestState.DONE:
record.files_done += 1
record.bytes_done += file_size
else:
record.files_failed += 1
if samples_to_save:
self._save_samples(samples_to_save, session=session)

@transactional_session
def _save_samples(self, to_save, *, session: "Session"):
rows_to_insert = []
for timestamp, samples in to_save.items():
for (dst_rse_id, src_rse_id, activity), record in samples.items():
rows_to_insert.append({
models.TransferStats.timestamp.name: timestamp,
models.TransferStats.resolution.name: self.observe_resolution.seconds,
models.TransferStats.src_rse_id.name: src_rse_id,
models.TransferStats.dest_rse_id.name: dst_rse_id,
models.TransferStats.activity.name: activity,
models.TransferStats.files_failed.name: record.files_failed,
models.TransferStats.files_done.name: record.files_done,
models.TransferStats.bytes_done.name: record.bytes_done,
})
if rows_to_insert:
session.execute(insert(models.TransferStats), rows_to_insert)

@transactional_session
def downsample_and_cleanup(self, *, session: "Session"):
"""
Housekeeping of samples in the database:
- create lower-resolution (but higher-retention) samples from higher-resolution ones;
- delete the samples which are older than the desired retention time.
"""
now = datetime.datetime.utcnow()

stmt = select(
models.TransferStats.resolution,
func.max(models.TransferStats.timestamp),
func.min(models.TransferStats.timestamp),
).group_by(
models.TransferStats.resolution,
)
db_time_ranges = {
datetime.timedelta(seconds=res): (newest_t, oldest_t)
for res, newest_t, oldest_t in session.execute(stmt)
}

for i in range(1, len(self.retentions)):
src_resolution, desired_src_retention = self.retentions[i - 1]
dst_resolution, desired_dst_retention = self.retentions[i]

# Always keep samples at source resolution aligned to the destination resolution interval.
# Keep, at least, the amount of samples needed to cover the first interval at
# destination resolution, but keep more samples if explicitly configured to do so.
_, oldest_desired_src_timestamp = next(self.slice_time(dst_resolution, start_time=now - desired_src_retention))

_, oldest_available_src_timestamp = db_time_ranges.get(src_resolution, (None, None))
newest_available_dst_timestamp, oldest_available_dst_timestamp = db_time_ranges.get(dst_resolution, (None, None))
# Only generate down-samples at destination resolution for interval in which:
# - are within the desired retention window
oldest_time_to_handle = now - desired_dst_retention - dst_resolution
# - we didn't already generate the corresponding sample at destination resolution
if newest_available_dst_timestamp:
oldest_time_to_handle = max(oldest_time_to_handle, newest_available_dst_timestamp)
# - we have samples at source resolution to do it
if oldest_available_src_timestamp:
oldest_time_to_handle = max(oldest_time_to_handle, oldest_available_src_timestamp - src_resolution)
else:
oldest_time_to_handle = now

# Create samples at lower resolution from samples at higher resolution
for recent_t, older_t in self.slice_time(dst_resolution, start_time=now, end_time=oldest_time_to_handle):
stmt = select(
models.TransferStats.src_rse_id,
models.TransferStats.dest_rse_id,
models.TransferStats.activity,
func.sum(models.TransferStats.files_failed).label(models.TransferStats.files_failed.name),
func.sum(models.TransferStats.files_done).label(models.TransferStats.files_done.name),
func.sum(models.TransferStats.bytes_done).label(models.TransferStats.bytes_done.name),
).where(
models.TransferStats.resolution == src_resolution.seconds,
models.TransferStats.timestamp > older_t,
models.TransferStats.timestamp <= recent_t,
).group_by(
models.TransferStats.src_rse_id,
models.TransferStats.dest_rse_id,
models.TransferStats.activity,
)

additional_fields = {
models.TransferStats.timestamp.name: recent_t,
models.TransferStats.resolution.name: dst_resolution.seconds,
}
downsample_stats = [row._asdict() | additional_fields for row in session.execute(stmt)]
if downsample_stats:
session.execute(insert(models.TransferStats), downsample_stats)
if not oldest_available_dst_timestamp or recent_t < oldest_available_dst_timestamp:
oldest_available_dst_timestamp = recent_t
if not newest_available_dst_timestamp or recent_t > newest_available_dst_timestamp:
newest_available_dst_timestamp = recent_t

if oldest_available_dst_timestamp and newest_available_dst_timestamp:
db_time_ranges[dst_resolution] = (newest_available_dst_timestamp, oldest_available_dst_timestamp)

# Delete from the database the samples which are older than desired
if oldest_available_src_timestamp and oldest_available_src_timestamp < oldest_desired_src_timestamp:
stmt = delete(
models.TransferStats
).where(
models.TransferStats.timestamp < oldest_desired_src_timestamp,
models.TransferStats.resolution == src_resolution.seconds
)
session.execute(stmt)

# Cleanup samples at the lowest resolution, which were not handled by the previous loop
last_resolution, last_retention = self.retentions[-1]
_, oldest_desired_timestamp = next(self.slice_time(last_resolution, start_time=now - last_retention))
if db_time_ranges.get(last_resolution, (now, now))[1] < oldest_desired_timestamp:
stmt = delete(
models.TransferStats
).where(
models.TransferStats.timestamp < oldest_desired_timestamp,
models.TransferStats.resolution == last_resolution.seconds,
)
session.execute(stmt)

# Cleanup all resolutions which exist in the database but are not desired by rucio anymore
# (probably due to configuration changes).
for resolution_to_cleanup in set(db_time_ranges).difference(r[0] for r in self.retentions):
stmt = delete(
models.TransferStats
).where(
models.TransferStats.resolution == resolution_to_cleanup.seconds
)
session.execute(stmt)

@staticmethod
def slice_time(
resolution: datetime.timedelta,
start_time: "Optional[datetime.datetime]" = None,
end_time: "Optional[datetime.datetime]" = None
):
"""
Iterates, back in time, over time intervals of length `resolution` which are fully
included within the input interval (start_time, end_time).
Intervals are aligned on boundaries divisible by resolution.
For example: for start_time=17:09:59, end_time=16:20:01 and resolution = 10minutes, it will yield
(17:00:00, 16:50:00), (16:50:00, 16:40:00), (16:40:00, 16:30:00)
"""

if start_time is None:
start_time = datetime.datetime.utcnow()
newer_t = datetime.datetime.fromtimestamp(int(start_time.timestamp()) // resolution.seconds * resolution.seconds)
older_t = newer_t - resolution
while not end_time or older_t >= end_time:
yield newer_t, older_t
newer_t = older_t
older_t = older_t - resolution


class FlowManager:
class EdgeFlow:
def __init__(self, edge):
Expand Down Expand Up @@ -776,6 +977,24 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S
if request_core.is_intermediate_hop(request):
request_core.handle_failed_intermediate_hop(request, session=session)

stats_manage = TransferStatsManager()
stats_manage.observe(
src_rse_id=request['source_rse_id'],
dst_rse_id=request['dest_rse_id'],
activity=request['activity'],
state=tt_status_report.state,
file_size=request['bytes'],
)
import time
time.sleep(21)
stats_manage.observe(
src_rse_id=request['source_rse_id'],
dst_rse_id=request['dest_rse_id'],
activity=request['activity'],
state=tt_status_report.state,
file_size=request['bytes'],
)
stats_manage.downsample_and_cleanup(session=session)
request_core.add_monitor_message(
new_state=tt_status_report.state,
request=request,
Expand Down
18 changes: 18 additions & 0 deletions lib/rucio/db/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,24 @@ class Distance(BASE, ModelBase):
Index('DISTANCES_DEST_RSEID_IDX', 'dest_rse_id'))


class TransferStats(BASE, ModelBase):
"""Represents counters for transfer link usage"""
__tablename__ = 'transfer_stats'
id: Mapped[uuid.UUID] = mapped_column(GUID(), default=utils.generate_uuid)
timestamp: Mapped[datetime] = mapped_column(DateTime)
resolution: Mapped[int] = mapped_column(Integer)
dest_rse_id: Mapped[uuid.UUID] = mapped_column(GUID())
src_rse_id: Mapped[uuid.UUID] = mapped_column(GUID())
activity: Mapped[Optional[str]] = mapped_column(String(50))
files_done: Mapped[int] = mapped_column(BigInteger)
bytes_done: Mapped[int] = mapped_column(BigInteger)
files_failed: Mapped[int] = mapped_column(BigInteger)
_table_args = (PrimaryKeyConstraint('id', name='TRANSFER_STATS_PK'),
ForeignKeyConstraint(['src_rse_id'], ['rses.id'], name='TRANSFER_STATS_SRC_RSE_FK'),
ForeignKeyConstraint(['dest_rse_id'], ['rses.id'], name='TRANSFER_STATS_DEST_RSE_FK'),
Index('TRANSFER_STATS_KEY_IDX', 'resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity'))


class Subscription(BASE, ModelBase):
"""Represents a subscription"""
__tablename__ = 'subscriptions'
Expand Down

0 comments on commit 7dad0bd

Please sign in to comment.