From 847212fe89121173438a333389cb0740e9f7afd6 Mon Sep 17 00:00:00 2001 From: Radu Carpa Date: Thu, 28 Sep 2023 17:46:59 +0200 Subject: [PATCH] wip --- lib/rucio/core/transfer.py | 161 +++++++++++++++++++++++++++++++++++- lib/rucio/db/sqla/models.py | 18 ++++ 2 files changed, 177 insertions(+), 2 deletions(-) diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 0af3f04a45..c8f6a59a36 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -19,12 +19,13 @@ import time import traceback from collections import defaultdict +from dataclasses import dataclass from decimal import Decimal, localcontext from typing import TYPE_CHECKING from dogpile.cache import make_region from dogpile.cache.api import NoValue -from sqlalchemy import select, update +from sqlalchemy import delete, exists, func, insert, select, update from sqlalchemy.exc import IntegrityError from rucio.common import constants @@ -287,6 +288,149 @@ def legacy_sources(self): return self._legacy_sources +class TransferStatsManager: + + @dataclass + class _StatsRecord: + files_failed: int = 0 + files_done: int = 0 + bytes_done: int = 0 + + def __init__(self): + self.last_saved = datetime.datetime.utcnow() + self.pending_metrics = defaultdict(lambda: self._StatsRecord()) + + retentions = sorted([ + # resolution, retention + (datetime.timedelta(seconds=1), datetime.timedelta(seconds=10)), + (datetime.timedelta(seconds=5), datetime.timedelta(seconds=20)), + (datetime.timedelta(seconds=20), datetime.timedelta(minutes=2)), + #(datetime.timedelta(minutes=5), datetime.timedelta(hours=1)), + #(datetime.timedelta(hours=1), datetime.timedelta(days=1)), + #(datetime.timedelta(days=1), datetime.timedelta(days=30)), + ]) + + self.retentions = retentions + self.observe_resolution = self.retentions[0][0] + + def observe(self, src_rse_id: str, dst_rse_id: str, activity: str, state: RequestState, file_size: int): + if state in (RequestState.DONE, RequestState.FAILED): + record = self.pending_metrics[dst_rse_id, src_rse_id, activity] + if state == RequestState.DONE: + record.files_done += 1 + record.bytes_done += file_size + else: + record.files_failed += 1 + + if self.last_saved < datetime.datetime.utcnow() - self.observe_resolution: + self.commit_stats() + + @transactional_session + def commit_stats(self, *, session: "Session"): + rows_to_insert = [] + for (dst_rse_id, src_rse_id, activity), record in self.pending_metrics.items(): + rows_to_insert.append({ + models.TransferStats.timestamp.name: self.last_saved, + models.TransferStats.resolution.name: self.observe_resolution.seconds, + models.TransferStats.src_rse_id.name: src_rse_id, + models.TransferStats.dest_rse_id.name: dst_rse_id, + models.TransferStats.activity.name: activity, + models.TransferStats.files_failed.name: record.files_failed, + models.TransferStats.files_done.name: record.files_done, + models.TransferStats.bytes_done.name: record.bytes_done, + }) + + if rows_to_insert: + session.execute(insert(models.TransferStats), rows_to_insert) + + @transactional_session + def downsample_and_cleanup(self, *, session: "Session"): + now = datetime.datetime.utcnow() + + stmt = select( + models.TransferStats.resolution, + func.max(models.TransferStats.timestamp), + # ).where( + # models.TransferStats.resolution.in_([r[0].seconds for r in self.retentions]), + ).group_by( + models.TransferStats.resolution, + ) + most_recent_db_timestamps = {datetime.timedelta(seconds=r): t for r, t in session.execute(stmt)} + + for i in range(1, len(self.retentions)): + src_resolution, src_duration = self.retentions[i - 1] + dst_resolution, dst_duration = self.retentions[i] + oldest_time_to_handle = max(most_recent_db_timestamps.get(dst_resolution, now - dst_duration), now - dst_duration) + for recent_t, older_t in self._slice_time(dst_resolution, start_time=now, end_time=oldest_time_to_handle): + stmt = select( + models.TransferStats.src_rse_id, + models.TransferStats.dest_rse_id, + models.TransferStats.activity, + func.sum(models.TransferStats.files_failed).label(models.TransferStats.files_failed.name), + func.sum(models.TransferStats.files_done).label(models.TransferStats.files_done.name), + func.sum(models.TransferStats.bytes_done).label(models.TransferStats.bytes_done.name), + ).where( + models.TransferStats.resolution == src_resolution.seconds, + models.TransferStats.timestamp > older_t, + models.TransferStats.timestamp <= recent_t, + ).group_by( + models.TransferStats.src_rse_id, + models.TransferStats.dest_rse_id, + models.TransferStats.activity, + ) + + additional_fields = { + models.TransferStats.timestamp.name: recent_t, + models.TransferStats.resolution.name: dst_resolution.seconds, + } + downsample_stats = [row._asdict() | additional_fields for row in session.execute(stmt)] + if downsample_stats: + session.execute(insert(models.TransferStats), downsample_stats) + + stmt = delete( + models.TransferStats + ).where( + models.TransferStats.timestamp < now - src_duration, + models.TransferStats.resolution == src_resolution.seconds + ) + session.execute(stmt) + + stmt = delete( + models.TransferStats + ).where( + models.TransferStats.resolution == self.retentions[-1][0].seconds, + models.TransferStats.timestamp < now - self.retentions[-1][1], + ) + session.execute(stmt) + + # Cleanup all resolutions which exist in the database but are not desired by rucio anymore + # Probably due to configuration changes. + for resolution_to_cleanup in set(most_recent_db_timestamps).difference(r[0] for r in self.retentions): + stmt = delete( + models.TransferStats + ).where( + models.TransferStats.resolution == resolution_to_cleanup.seconds + ) + session.execute(stmt) + + @staticmethod + def _slice_time( + resolution: datetime.timedelta, + start_time: "Optional[datetime.datetime]" = None, + end_time: "Optional[datetime.datetime]" = None + ): + if start_time is None: + start_time = datetime.datetime.utcnow() + newer_t = datetime.datetime.fromtimestamp(int(start_time.timestamp()) // resolution.seconds * resolution.seconds) + older_t = newer_t - resolution + while True: + if end_time and newer_t <= end_time: + break + yield newer_t, older_t + newer_t = older_t + older_t = older_t - resolution + + class FlowManager: class EdgeFlow: def __init__(self, edge): @@ -776,6 +920,19 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S if request_core.is_intermediate_hop(request): request_core.handle_failed_intermediate_hop(request, session=session) + stats_manage = TransferStatsManager() + stats_manage.observe( + src_rse_id=request['source_rse_id'], + dst_rse_id=request['dest_rse_id'], + activity=request['activity'], + state=tt_status_report.state, + file_size=request['bytes'], + ) + stats_manage.commit_stats(session=session) + + import time + time.sleep(21) + stats_manage.downsample_and_cleanup(session=session) request_core.add_monitor_message( new_state=tt_status_report.state, request=request, @@ -786,7 +943,7 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S except UnsupportedOperation as error: logger(logging.WARNING, "Request %s doesn't exist - Error: %s" % (request_id, str(error).replace('\n', ''))) return False - except Exception: + except Exception as e: logger(logging.CRITICAL, "Exception", exc_info=True) diff --git a/lib/rucio/db/sqla/models.py b/lib/rucio/db/sqla/models.py index 596296becd..9c7aa63f31 100644 --- a/lib/rucio/db/sqla/models.py +++ b/lib/rucio/db/sqla/models.py @@ -1420,6 +1420,24 @@ class Distance(BASE, ModelBase): Index('DISTANCES_DEST_RSEID_IDX', 'dest_rse_id')) +class TransferStats(BASE, ModelBase): + """Represents counters for transfer link usage""" + __tablename__ = 'transfer_stats' + id: Mapped[uuid.UUID] = mapped_column(GUID(), default=utils.generate_uuid) + timestamp: Mapped[datetime] = mapped_column(DateTime) + resolution: Mapped[int] = mapped_column(Integer) + dest_rse_id: Mapped[uuid.UUID] = mapped_column(GUID()) + src_rse_id: Mapped[uuid.UUID] = mapped_column(GUID()) + activity: Mapped[Optional[str]] = mapped_column(String(50)) + files_done: Mapped[int] = mapped_column(BigInteger) + bytes_done: Mapped[int] = mapped_column(BigInteger) + files_failed: Mapped[int] = mapped_column(BigInteger) + _table_args = (PrimaryKeyConstraint('id', name='TRANSFER_STATS_PK'), + ForeignKeyConstraint(['src_rse_id'], ['rses.id'], name='TRANSFER_STATS_SRC_RSE_FK'), + ForeignKeyConstraint(['dest_rse_id'], ['rses.id'], name='TRANSFER_STATS_DEST_RSE_FK'), + Index('TRANSFER_STATS_KEY_IDX', 'resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity')) + + class Subscription(BASE, ModelBase): """Represents a subscription""" __tablename__ = 'subscriptions'