diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 0af3f04a45..cd92531aa7 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -16,15 +16,17 @@ import datetime import logging import re +import threading import time import traceback from collections import defaultdict +from dataclasses import dataclass from decimal import Decimal, localcontext from typing import TYPE_CHECKING from dogpile.cache import make_region from dogpile.cache.api import NoValue -from sqlalchemy import select, update +from sqlalchemy import delete, func, insert, select, update from sqlalchemy.exc import IntegrityError from rucio.common import constants @@ -287,6 +289,205 @@ def legacy_sources(self): return self._legacy_sources +class TransferStatsManager: + + @dataclass + class _StatsRecord: + files_failed: int = 0 + files_done: int = 0 + bytes_done: int = 0 + + def __init__(self): + self.lock = threading.Lock() + + retentions = sorted([ + # resolution, retention + (datetime.timedelta(seconds=5), datetime.timedelta(seconds=10)), + (datetime.timedelta(seconds=10), datetime.timedelta(seconds=20)), + (datetime.timedelta(seconds=20), datetime.timedelta(minutes=2)), + #(datetime.timedelta(minutes=5), datetime.timedelta(hours=1)), + #(datetime.timedelta(hours=1), datetime.timedelta(days=1)), + #(datetime.timedelta(days=1), datetime.timedelta(days=30)), + ]) + + self.retentions = retentions + self.observe_resolution = self.retentions[0][0] + + self.current_timestamp, _ = next(self.slice_time(self.observe_resolution, start_time=datetime.datetime.utcnow() + self.observe_resolution)) + self.current_samples = defaultdict(lambda: self._StatsRecord()) + + def observe(self, src_rse_id: str, dst_rse_id: str, activity: str, state: RequestState, file_size: int, *, session: "Optional[Session]" = None): + now = datetime.datetime.utcnow() + samples_to_save = None + with self.lock: + if now > self.current_timestamp: + if self.current_samples: + samples_to_save = {self.current_timestamp: self.current_samples} + self.current_timestamp, _ = next(self.slice_time(self.observe_resolution, start_time=now + self.observe_resolution)) + + if state in (RequestState.DONE, RequestState.FAILED): + record = self.current_samples[dst_rse_id, src_rse_id, activity] + if state == RequestState.DONE: + record.files_done += 1 + record.bytes_done += file_size + else: + record.files_failed += 1 + if samples_to_save: + self._save_samples(samples_to_save, session=session) + + @transactional_session + def _save_samples(self, to_save, *, session: "Session"): + rows_to_insert = [] + for timestamp, samples in to_save.items(): + for (dst_rse_id, src_rse_id, activity), record in samples.items(): + rows_to_insert.append({ + models.TransferStats.timestamp.name: timestamp, + models.TransferStats.resolution.name: self.observe_resolution.seconds, + models.TransferStats.src_rse_id.name: src_rse_id, + models.TransferStats.dest_rse_id.name: dst_rse_id, + models.TransferStats.activity.name: activity, + models.TransferStats.files_failed.name: record.files_failed, + models.TransferStats.files_done.name: record.files_done, + models.TransferStats.bytes_done.name: record.bytes_done, + }) + if rows_to_insert: + session.execute(insert(models.TransferStats), rows_to_insert) + + @transactional_session + def downsample_and_cleanup(self, *, session: "Session"): + """ + Housekeeping of samples in the database: + - create lower-resolution (but higher-retention) samples from higher-resolution ones; + - delete the samples which are older than the desired retention time. + """ + now = datetime.datetime.utcnow() + + stmt = select( + models.TransferStats.resolution, + func.max(models.TransferStats.timestamp), + func.min(models.TransferStats.timestamp), + ).group_by( + models.TransferStats.resolution, + ) + db_time_ranges = { + datetime.timedelta(seconds=res): (newest_t, oldest_t) + for res, newest_t, oldest_t in session.execute(stmt) + } + + for i in range(1, len(self.retentions)): + src_resolution, desired_src_retention = self.retentions[i - 1] + dst_resolution, desired_dst_retention = self.retentions[i] + + # Always keep samples at source resolution aligned to the destination resolution interval. + # Keep, at least, the amount of samples needed to cover the first interval at + # destination resolution, but keep more samples if explicitly configured to do so. + _, oldest_desired_src_timestamp = next(self.slice_time(dst_resolution, start_time=now - desired_src_retention)) + + _, oldest_available_src_timestamp = db_time_ranges.get(src_resolution, (None, None)) + newest_available_dst_timestamp, oldest_available_dst_timestamp = db_time_ranges.get(dst_resolution, (None, None)) + # Only generate down-samples at destination resolution for interval in which: + # - are within the desired retention window + oldest_time_to_handle = now - desired_dst_retention - dst_resolution + # - we didn't already generate the corresponding sample at destination resolution + if newest_available_dst_timestamp: + oldest_time_to_handle = max(oldest_time_to_handle, newest_available_dst_timestamp) + # - we have samples at source resolution to do it + if oldest_available_src_timestamp: + oldest_time_to_handle = max(oldest_time_to_handle, oldest_available_src_timestamp - src_resolution) + else: + oldest_time_to_handle = now + + # Create samples at lower resolution from samples at higher resolution + for recent_t, older_t in self.slice_time(dst_resolution, start_time=now, end_time=oldest_time_to_handle): + stmt = select( + models.TransferStats.src_rse_id, + models.TransferStats.dest_rse_id, + models.TransferStats.activity, + func.sum(models.TransferStats.files_failed).label(models.TransferStats.files_failed.name), + func.sum(models.TransferStats.files_done).label(models.TransferStats.files_done.name), + func.sum(models.TransferStats.bytes_done).label(models.TransferStats.bytes_done.name), + ).where( + models.TransferStats.resolution == src_resolution.seconds, + models.TransferStats.timestamp > older_t, + models.TransferStats.timestamp <= recent_t, + ).group_by( + models.TransferStats.src_rse_id, + models.TransferStats.dest_rse_id, + models.TransferStats.activity, + ) + + additional_fields = { + models.TransferStats.timestamp.name: recent_t, + models.TransferStats.resolution.name: dst_resolution.seconds, + } + downsample_stats = [row._asdict() | additional_fields for row in session.execute(stmt)] + if downsample_stats: + session.execute(insert(models.TransferStats), downsample_stats) + if not oldest_available_dst_timestamp or recent_t < oldest_available_dst_timestamp: + oldest_available_dst_timestamp = recent_t + if not newest_available_dst_timestamp or recent_t > newest_available_dst_timestamp: + newest_available_dst_timestamp = recent_t + + if oldest_available_dst_timestamp and newest_available_dst_timestamp: + db_time_ranges[dst_resolution] = (newest_available_dst_timestamp, oldest_available_dst_timestamp) + + # Delete from the database the samples which are older than desired + if oldest_available_src_timestamp and oldest_available_src_timestamp < oldest_desired_src_timestamp: + stmt = delete( + models.TransferStats + ).where( + models.TransferStats.timestamp < oldest_desired_src_timestamp, + models.TransferStats.resolution == src_resolution.seconds + ) + session.execute(stmt) + + # Cleanup samples at the lowest resolution, which were not handled by the previous loop + last_resolution, last_retention = self.retentions[-1] + _, oldest_desired_timestamp = next(self.slice_time(last_resolution, start_time=now - last_retention)) + if db_time_ranges.get(last_resolution, (now, now))[1] < oldest_desired_timestamp: + stmt = delete( + models.TransferStats + ).where( + models.TransferStats.timestamp < oldest_desired_timestamp, + models.TransferStats.resolution == last_resolution.seconds, + ) + session.execute(stmt) + + # Cleanup all resolutions which exist in the database but are not desired by rucio anymore + # (probably due to configuration changes). + for resolution_to_cleanup in set(db_time_ranges).difference(r[0] for r in self.retentions): + stmt = delete( + models.TransferStats + ).where( + models.TransferStats.resolution == resolution_to_cleanup.seconds + ) + session.execute(stmt) + + @staticmethod + def slice_time( + resolution: datetime.timedelta, + start_time: "Optional[datetime.datetime]" = None, + end_time: "Optional[datetime.datetime]" = None + ): + """ + Iterates, back in time, over time intervals of length `resolution` which are fully + included within the input interval (start_time, end_time). + Intervals are aligned on boundaries divisible by resolution. + + For example: for start_time=17:09:59, end_time=16:20:01 and resolution = 10minutes, it will yield + (17:00:00, 16:50:00), (16:50:00, 16:40:00), (16:40:00, 16:30:00) + """ + + if start_time is None: + start_time = datetime.datetime.utcnow() + newer_t = datetime.datetime.fromtimestamp(int(start_time.timestamp()) // resolution.seconds * resolution.seconds) + older_t = newer_t - resolution + while not end_time or older_t >= end_time: + yield newer_t, older_t + newer_t = older_t + older_t = older_t - resolution + + class FlowManager: class EdgeFlow: def __init__(self, edge): @@ -776,6 +977,24 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S if request_core.is_intermediate_hop(request): request_core.handle_failed_intermediate_hop(request, session=session) + stats_manage = TransferStatsManager() + stats_manage.observe( + src_rse_id=request['source_rse_id'], + dst_rse_id=request['dest_rse_id'], + activity=request['activity'], + state=tt_status_report.state, + file_size=request['bytes'], + ) + import time + time.sleep(21) + stats_manage.observe( + src_rse_id=request['source_rse_id'], + dst_rse_id=request['dest_rse_id'], + activity=request['activity'], + state=tt_status_report.state, + file_size=request['bytes'], + ) + stats_manage.downsample_and_cleanup(session=session) request_core.add_monitor_message( new_state=tt_status_report.state, request=request, diff --git a/lib/rucio/db/sqla/models.py b/lib/rucio/db/sqla/models.py index 596296becd..9c7aa63f31 100644 --- a/lib/rucio/db/sqla/models.py +++ b/lib/rucio/db/sqla/models.py @@ -1420,6 +1420,24 @@ class Distance(BASE, ModelBase): Index('DISTANCES_DEST_RSEID_IDX', 'dest_rse_id')) +class TransferStats(BASE, ModelBase): + """Represents counters for transfer link usage""" + __tablename__ = 'transfer_stats' + id: Mapped[uuid.UUID] = mapped_column(GUID(), default=utils.generate_uuid) + timestamp: Mapped[datetime] = mapped_column(DateTime) + resolution: Mapped[int] = mapped_column(Integer) + dest_rse_id: Mapped[uuid.UUID] = mapped_column(GUID()) + src_rse_id: Mapped[uuid.UUID] = mapped_column(GUID()) + activity: Mapped[Optional[str]] = mapped_column(String(50)) + files_done: Mapped[int] = mapped_column(BigInteger) + bytes_done: Mapped[int] = mapped_column(BigInteger) + files_failed: Mapped[int] = mapped_column(BigInteger) + _table_args = (PrimaryKeyConstraint('id', name='TRANSFER_STATS_PK'), + ForeignKeyConstraint(['src_rse_id'], ['rses.id'], name='TRANSFER_STATS_SRC_RSE_FK'), + ForeignKeyConstraint(['dest_rse_id'], ['rses.id'], name='TRANSFER_STATS_DEST_RSE_FK'), + Index('TRANSFER_STATS_KEY_IDX', 'resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity')) + + class Subscription(BASE, ModelBase): """Represents a subscription""" __tablename__ = 'subscriptions'