diff --git a/etc/sql/oracle/schema.sql b/etc/sql/oracle/schema.sql index 5f8ed794f0d..0031c0aeb2e 100644 --- a/etc/sql/oracle/schema.sql +++ b/etc/sql/oracle/schema.sql @@ -1996,3 +1996,26 @@ CREATE GLOBAL TEMPORARY TABLE TEMPORARY_ID_4 id RAW(16), CONSTRAINT TEMPORARY_ID_4_PK PRIMARY KEY (id) ) ON COMMIT DELETE ROWS; + +-- 73 ) ========================================= TRANSFER_STATS table =================================== + +CREATE TABLE TRANSFER_STATS +( + ID RAW(16), + RESOLUTION INTEGER, + TIMESTAMP DATE, + DEST_RSE_ID RAW(16), + SRC_RSE_ID RAW(16), + ACTIVITY VARCHAR2(50), + FILES_DONE NUMBER(19,0), + BYTES_DONE NUMBER(19,0), + FILES_FAILED NUMBER(19,0), + CREATED_AT DATE, + UPDATED_AT DATE, + CONSTRAINT TRANSFER_STATS_PK PRIMARY KEY (ID), + CONSTRAINT TRANSFER_STATS_DEST_RSE_FK FOREIGN KEY(DEST_RSE_ID) REFERENCES RSES (ID), + CONSTRAINT TRANSFER_STATS_SRC_RSE_FK FOREIGN KEY(SRC_RSE_ID) REFERENCES RSES (ID), + CONSTRAINT TRANSFER_STATS_CREATED_NN CHECK (CREATED_AT IS NOT NULL), + CONSTRAINT TRANSFER_STATS_UPDATED_NN CHECK (UPDATED_AT IS NOT NULL) +); +CREATE INDEX TRANSFER_STATS_KEY_IDX ON TRANSFER_STATS (RESOLUTION, TIMESTAMP, DEST_RSE_ID, SRC_RSE_ID, ACTIVITY); diff --git a/lib/rucio/alembicrevision.py b/lib/rucio/alembicrevision.py index a26c5cb09bb..693ad545ac3 100644 --- a/lib/rucio/alembicrevision.py +++ b/lib/rucio/alembicrevision.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -ALEMBIC_REVISION = '4df2c5ddabc0' # the current alembic head revision +ALEMBIC_REVISION = 'a08fa8de1545' # the current alembic head revision diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index 8accda0994b..f2a80db75dc 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -16,10 +16,14 @@ import datetime import json import logging +import math +import random +import threading import traceback import uuid -from collections import namedtuple -from collections.abc import Sequence +from collections import namedtuple, defaultdict +from collections.abc import Sequence, Mapping +from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Optional, Union from sqlalchemy import and_, or_, update, select, delete, exists, insert @@ -27,7 +31,7 @@ from sqlalchemy.orm import aliased from sqlalchemy.sql.expression import asc, true, false, null, func -from rucio.common.config import config_get_bool +from rucio.common.config import config_get_bool, config_get_int from rucio.common.exception import RequestNotFound, RucioException, UnsupportedOperation, InvalidRSEExpression from rucio.common.types import InternalAccount, InternalScope from rucio.common.utils import generate_uuid, chunks @@ -1307,6 +1311,337 @@ def get_heavy_load_rses(threshold, *, session: "Session"): raise RucioException(error.args) +class TransferStatsManager: + + @dataclass + class _StatsRecord: + files_failed: int = 0 + files_done: int = 0 + bytes_done: int = 0 + + def __init__(self): + self.lock = threading.Lock() + + retentions = sorted([ + # resolution, retention + (datetime.timedelta(minutes=5), datetime.timedelta(hours=1)), + (datetime.timedelta(hours=1), datetime.timedelta(days=1)), + (datetime.timedelta(days=1), datetime.timedelta(days=30)), + ]) + + self.retentions = retentions + self.raw_resolution, raw_retention = self.retentions[0] + + self.current_timestamp = datetime.datetime.utcnow() + self.current_samples = defaultdict() + self._rollover_samples(rollover_time=datetime.datetime.utcnow()) + + self.record_stats = True + self.timer = None + self.downsample_period = math.ceil(raw_retention.total_seconds()) + self.next_downsample = None + + def __enter__(self): + self.timer = threading.Timer(self.raw_resolution.total_seconds(), self.save) + self.record_stats = config_get_bool('transfers', 'stats_enabled', default=self.record_stats) + self.downsample_period = config_get_int('transfers', 'stats_downsample_period', default=self.downsample_period) + self.timer.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.timer is not None: + self.timer.cancel() + self.force_save() + + def observe(self, src_rse_id: str, dst_rse_id: str, activity: str, state: RequestState, file_size: int, *, session: "Optional[Session]" = None): + if not self.record_stats: + return + now = datetime.datetime.utcnow() + save_timestamp, save_samples = now, {} + with self.lock: + if now >= self.current_timestamp + self.raw_resolution: + save_timestamp, save_samples = self._rollover_samples(now) + + if state in (RequestState.DONE, RequestState.FAILED): + record = self.current_samples[dst_rse_id, src_rse_id, activity] + if state == RequestState.DONE: + record.files_done += 1 + record.bytes_done += file_size + else: + record.files_failed += 1 + if save_samples: + self._save_samples(save_timestamp, save_samples, session=session) + + @transactional_session + def save(self, *, session: "Session"): + """ + Save samples to the database if the end of the current recording interval was reached. + Otherwise, do nothing. + """ + now = datetime.datetime.utcnow() + save_timestamp, save_samples = now, {} + with self.lock: + if now >= self.current_timestamp + self.raw_resolution: + save_timestamp, save_samples = self._rollover_samples(now) + if save_samples: + self._save_samples(save_timestamp, save_samples, session=session) + + self.periodic_downsample_and_cleanup(session=session) + + @transactional_session + def force_save(self, *, session: "Session"): + """ + Commit to the database everything without ensuring that + the end of the currently recorded time interval is reached. + + Only to be used for the final save operation on shutdown. + """ + with self.lock: + save_timestamp, save_samples = self._rollover_samples(datetime.datetime.utcnow()) + if save_samples: + self._save_samples(save_timestamp, save_samples, session=session) + + def _rollover_samples(self, rollover_time: datetime.datetime) -> "tuple[datetime.datetime, Mapping[tuple[str, str, str], TransferStatsManager._StatsRecord]]": + previous_samples = (self.current_timestamp, self.current_samples) + self.current_samples = defaultdict(lambda: self._StatsRecord()) + _, self.current_timestamp = next(self.slice_time(self.raw_resolution, start_time=rollover_time + self.raw_resolution)) + return previous_samples + + @transactional_session + def _save_samples( + self, + timestamp: "datetime.datetime", + samples: "Mapping[tuple[str, str, str], TransferStatsManager._StatsRecord]", + *, + session: "Session" + ): + rows_to_insert = [] + for (dst_rse_id, src_rse_id, activity), record in samples.items(): + rows_to_insert.append({ + models.TransferStats.timestamp.name: timestamp, + models.TransferStats.resolution.name: self.raw_resolution.total_seconds(), + models.TransferStats.src_rse_id.name: src_rse_id, + models.TransferStats.dest_rse_id.name: dst_rse_id, + models.TransferStats.activity.name: activity, + models.TransferStats.files_failed.name: record.files_failed, + models.TransferStats.files_done.name: record.files_done, + models.TransferStats.bytes_done.name: record.bytes_done, + }) + if rows_to_insert: + session.execute(insert(models.TransferStats), rows_to_insert) + + @transactional_session + def periodic_downsample_and_cleanup(self, *, session: "Session"): + now = datetime.datetime.utcnow() + timestamp = now + datetime.timedelta(seconds=random.randint(self.downsample_period, 2 * self.downsample_period)) + if not self.next_downsample: + # Wait for a whole period before first downsample operation. Otherwise we'll perform it in each daemon on startup. + self.next_downsample = timestamp + return + + if self.next_downsample > now: + return + + self.next_downsample = timestamp + self.downsample_and_cleanup(session=session) + + @transactional_session + def downsample_and_cleanup(self, *, session: "Session"): + """ + Housekeeping of samples in the database: + - create lower-resolution (but higher-retention) samples from higher-resolution ones; + - delete the samples which are older than the desired retention time. + """ + now = datetime.datetime.utcnow() + + stmt = select( + models.TransferStats.resolution, + func.max(models.TransferStats.timestamp), + func.min(models.TransferStats.timestamp), + ).group_by( + models.TransferStats.resolution, + ) + db_time_ranges = { + datetime.timedelta(seconds=res): (newest_t, oldest_t) + for res, newest_t, oldest_t in session.execute(stmt) + } + + for i in range(1, len(self.retentions)): + src_resolution, desired_src_retention = self.retentions[i - 1] + dst_resolution, desired_dst_retention = self.retentions[i] + + # Always keep samples at source resolution aligned to the destination resolution interval. + # Keep, at least, the amount of samples needed to cover the first interval at + # destination resolution, but keep more samples if explicitly configured to do so. + _, oldest_desired_src_timestamp = next(self.slice_time(dst_resolution, start_time=now - desired_src_retention)) + + _, oldest_available_src_timestamp = db_time_ranges.get(src_resolution, (None, None)) + newest_available_dst_timestamp, oldest_available_dst_timestamp = db_time_ranges.get(dst_resolution, (None, None)) + # Only generate down-samples at destination resolution for interval in which: + # - are within the desired retention window + oldest_time_to_handle = now - desired_dst_retention - dst_resolution + # - we didn't already generate the corresponding sample at destination resolution + if newest_available_dst_timestamp: + oldest_time_to_handle = max(oldest_time_to_handle, newest_available_dst_timestamp + datetime.timedelta(microseconds=1)) + # - we have samples at source resolution to do it + if oldest_available_src_timestamp: + oldest_time_to_handle = max(oldest_time_to_handle, oldest_available_src_timestamp) + else: + oldest_time_to_handle = now + + # Create samples at lower resolution from samples at higher resolution + for recent_t, older_t in self.slice_time(dst_resolution, start_time=now, end_time=oldest_time_to_handle): + additional_fields = { + models.TransferStats.timestamp.name: older_t, + models.TransferStats.resolution.name: dst_resolution.total_seconds(), + } + src_totals = self.load_totals(resolution=src_resolution, recent_t=recent_t, older_t=older_t, session=session) + downsample_stats = [stat | additional_fields for stat in src_totals] + if downsample_stats: + session.execute(insert(models.TransferStats), downsample_stats) + if not oldest_available_dst_timestamp or older_t < oldest_available_dst_timestamp: + oldest_available_dst_timestamp = older_t + if not newest_available_dst_timestamp or older_t > newest_available_dst_timestamp: + newest_available_dst_timestamp = older_t + + if oldest_available_dst_timestamp and newest_available_dst_timestamp: + db_time_ranges[dst_resolution] = (newest_available_dst_timestamp, oldest_available_dst_timestamp) + + # Delete from the database the samples which are older than desired + self._cleanup(resolution=src_resolution, timestamp=oldest_desired_src_timestamp, session=session) + + # Cleanup samples at the lowest resolution, which were not handled by the previous loop + last_resolution, last_retention = self.retentions[-1] + _, oldest_desired_timestamp = next(self.slice_time(last_resolution, start_time=now - last_retention)) + if db_time_ranges.get(last_resolution, (now, now))[1] < oldest_desired_timestamp: + self._cleanup(resolution=last_resolution, timestamp=oldest_desired_timestamp, session=session) + + # Cleanup all resolutions which exist in the database but are not desired by rucio anymore + # (probably due to configuration changes). + for resolution_to_cleanup in set(db_time_ranges).difference(r[0] for r in self.retentions): + self._cleanup(resolution=resolution_to_cleanup, timestamp=now, session=session) + + @stream_session + def load_totals(self, resolution, recent_t, older_t, *, session: "Session"): + """ + Load aggregated totals for the given resolution and time interval. + + Ignore multiple values for the same timestamp at downsample resolutions. + They are result of concurrent downsample operations (two different + daemons performing downsampling at the same time). Very probably, + the values are identical. Eve if not, these values must not be counted twice. + This is to gracefully handle multiple parallel downsample operations. + """ + if resolution == self.raw_resolution: + sub_query = select( + models.TransferStats.timestamp, + models.TransferStats.src_rse_id, + models.TransferStats.dest_rse_id, + models.TransferStats.activity, + models.TransferStats.files_failed, + models.TransferStats.files_done, + models.TransferStats.bytes_done + ) + else: + sub_query = select( + models.TransferStats.timestamp, + models.TransferStats.src_rse_id, + models.TransferStats.dest_rse_id, + models.TransferStats.activity, + func.max(models.TransferStats.files_failed).label(models.TransferStats.files_failed.name), + func.max(models.TransferStats.files_done).label(models.TransferStats.files_done.name), + func.max(models.TransferStats.bytes_done).label(models.TransferStats.bytes_done.name), + ).group_by( + models.TransferStats.timestamp, + models.TransferStats.src_rse_id, + models.TransferStats.dest_rse_id, + models.TransferStats.activity, + ) + + sub_query = sub_query.where( + models.TransferStats.resolution == resolution.total_seconds(), + models.TransferStats.timestamp >= older_t, + models.TransferStats.timestamp < recent_t + ).subquery() + + stmt = select( + sub_query.c.src_rse_id, + sub_query.c.dest_rse_id, + sub_query.c.activity, + func.sum(sub_query.c.files_failed).label(models.TransferStats.files_failed.name), + func.sum(sub_query.c.files_done).label(models.TransferStats.files_done.name), + func.sum(sub_query.c.bytes_done).label(models.TransferStats.bytes_done.name), + ).group_by( + sub_query.c.src_rse_id, + sub_query.c.dest_rse_id, + sub_query.c.activity + ) + + for row in session.execute(stmt): + yield row._asdict() + + @staticmethod + def _cleanup(resolution, timestamp, limit=None, *, session: "Session"): + """ + Delete, from the database, the stats older than the given time. + Skip locked rows, to tolerate parallel executions by multiple daemons. + """ + stmt = select( + models.TransferStats.id + ).where( + models.TransferStats.resolution == resolution.total_seconds(), + models.TransferStats.timestamp < timestamp + ) + + if limit is not None: + stmt = stmt.limit(limit) + + # Oracle does not support chaining order_by(), limit(), and + # with_for_update(). Use a nested query to overcome this. + if session.bind.dialect.name == 'oracle': + stmt = select( + models.TransferStats.id + ).where( + models.TransferStats.id.in_(stmt) + ).with_for_update( + skip_locked=True + ) + else: + stmt = stmt.with_for_update(skip_locked=True) + + for stats in session.execute(stmt).scalars().partitions(10): + stmt = delete( + models.TransferStats + ).where( + models.TransferStats.id.in_(stats) + ) + session.execute(stmt) + + @staticmethod + def slice_time( + resolution: datetime.timedelta, + start_time: "Optional[datetime.datetime]" = None, + end_time: "Optional[datetime.datetime]" = None + ): + """ + Iterates, back in time, over time intervals of length `resolution` which are fully + included within the input interval (start_time, end_time). + Intervals are aligned on boundaries divisible by resolution. + + For example: for start_time=17:09:59, end_time=16:20:01 and resolution = 10minutes, it will yield + (17:00:00, 16:50:00), (16:50:00, 16:40:00), (16:40:00, 16:30:00) + """ + + if start_time is None: + start_time = datetime.datetime.utcnow() + newer_t = datetime.datetime.fromtimestamp(int(start_time.timestamp()) // resolution.total_seconds() * resolution.total_seconds()) + older_t = newer_t - resolution + while not end_time or older_t >= end_time: + yield newer_t, older_t + newer_t = older_t + older_t = older_t - resolution + + @read_session def get_request_stats(state, *, session: "Session"): """ diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index b0e4fd98524..03f61fb7ddc 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -513,7 +513,13 @@ def set_transfers_state( @transactional_session -def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "Session", logger=logging.log): +def update_transfer_state( + tt_status_report: TransferStatusReport, + stats_manager: request_core.TransferStatsManager, + *, + session: "Session", + logger=logging.log +): """ Used by poller and consumer to update the internal state of requests, after the response by the external transfertool. @@ -540,6 +546,14 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S if request_core.is_intermediate_hop(request): request_core.handle_failed_intermediate_hop(request, session=session) + stats_manager.observe( + src_rse_id=request['source_rse_id'], + dst_rse_id=request['dest_rse_id'], + activity=request['activity'], + state=tt_status_report.state, + file_size=request['bytes'], + session=session, + ) request_core.add_monitor_message( new_state=tt_status_report.state, request=request, diff --git a/lib/rucio/daemons/conveyor/poller.py b/lib/rucio/daemons/conveyor/poller.py index 0d78ff15e2f..3048697a977 100644 --- a/lib/rucio/daemons/conveyor/poller.py +++ b/lib/rucio/daemons/conveyor/poller.py @@ -26,7 +26,7 @@ import time from itertools import groupby from types import FrameType -from typing import TYPE_CHECKING, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence from requests.exceptions import RequestException from sqlalchemy.exc import DatabaseError @@ -43,6 +43,7 @@ from rucio.core.topology import Topology, ExpiringObjectCache from rucio.daemons.common import db_workqueue, ProducerConsumerDaemon from rucio.db.sqla.constants import RequestState, RequestType +from rucio.transfertool.transfertool import Transfertool from rucio.transfertool.fts3 import FTS3Transfertool from rucio.transfertool.globus import GlobusTransferTool from rucio.transfertool.mock import MockTransfertool @@ -114,6 +115,7 @@ def _handle_requests( multi_vo, timeout, transfertool, + transfer_stats_manager: request_core.TransferStatsManager, oidc_account: Optional[str], *, logger=logging.log, @@ -142,7 +144,13 @@ def _handle_requests( else: account = InternalAccount(oidc_account) transfertool_obj = FTS3Transfertool(external_host=external_host, vo=vo, oidc_account=account) - poll_transfers(transfertool_obj=transfertool_obj, transfers_by_eid=chunk, timeout=timeout, logger=logger) + poll_transfers( + transfertool_obj=transfertool_obj, + transfers_by_eid=chunk, + transfer_stats_manager=transfer_stats_manager, + timeout=timeout, + logger=logger, + ) except Exception: logger(logging.ERROR, 'Exception', exc_info=True) @@ -182,6 +190,8 @@ def poller( if filter_transfertool: executable += ' --filter-transfertool ' + filter_transfertool + transfer_stats_manager = request_core.TransferStatsManager() + @db_workqueue( once=once, graceful_stop=GRACEFUL_STOP, @@ -211,13 +221,15 @@ def _consumer(transfs): timeout=timeout, oidc_account=oidc_account, transfertool=transfertool, + transfer_stats_manager=transfer_stats_manager, ) - ProducerConsumerDaemon( - producers=[_db_producer], - consumers=[_consumer for _ in range(total_threads)], - graceful_stop=GRACEFUL_STOP, - ).run() + with transfer_stats_manager: + ProducerConsumerDaemon( + producers=[_db_producer], + consumers=[_consumer for _ in range(total_threads)], + graceful_stop=GRACEFUL_STOP, + ).run() def stop(signum: Optional[int] = None, frame: Optional[FrameType] = None) -> None: @@ -280,19 +292,20 @@ def run( ) -def poll_transfers(transfertool_obj, transfers_by_eid, timeout=None, logger=logging.log): +def poll_transfers( + transfertool_obj: Transfertool, + transfers_by_eid: Mapping[str, Mapping[str, Any]], + transfer_stats_manager: request_core.TransferStatsManager, + timeout: "Optional[int]" = None, + logger=logging.log +): """ Poll a list of transfers from an FTS server - - :param transfertool_obj: The Transfertool to use for query - :param transfers_by_eid: Dict of the form {external_id: list_of_transfers} - :param timeout: Timeout. - :param logger: Optional decorated logger that can be passed from the calling daemons or servers. """ poll_individual_transfers = False try: - _poll_transfers(transfertool_obj, transfers_by_eid, timeout, logger) + _poll_transfers(transfertool_obj, transfers_by_eid, transfer_stats_manager, timeout, logger) except TransferToolWrongAnswer: poll_individual_transfers = True @@ -301,12 +314,18 @@ def poll_transfers(transfertool_obj, transfers_by_eid, timeout=None, logger=logg for external_id, transfers in transfers_by_eid.items(): logger(logging.DEBUG, 'Checking %s on %s' % (external_id, transfertool_obj)) try: - _poll_transfers(transfertool_obj, {external_id: transfers}, timeout, logger) + _poll_transfers(transfertool_obj, {external_id: transfers}, transfer_stats_manager, timeout, logger) except Exception as err: logger(logging.ERROR, 'Problem querying %s on %s . Error returned : %s' % (external_id, transfertool_obj, str(err))) -def _poll_transfers(transfertool_obj, transfers_by_eid, timeout, logger): +def _poll_transfers( + transfertool_obj: Transfertool, + transfers_by_eid: Mapping[str, Mapping[str, Any]], + transfer_stats_manager: request_core.TransferStatsManager, + timeout: "Optional[int]" = None, + logger=logging.log +): """ Helper function for poll_transfers which performs the actual polling and database update. """ @@ -355,7 +374,11 @@ def _poll_transfers(transfertool_obj, transfers_by_eid, timeout, logger): METRICS.counter('query_transfer_exception').inc() else: for request_id in request_ids.intersection(transf_resp): - ret = transfer_core.update_transfer_state(transf_resp[request_id], logger=logger) + ret = transfer_core.update_transfer_state( + tt_status_report=transf_resp[request_id], + stats_manager=transfer_stats_manager, + logger=logger, + ) # if True, really update request content; if False, only touch request if ret: cnt += 1 diff --git a/lib/rucio/daemons/conveyor/receiver.py b/lib/rucio/daemons/conveyor/receiver.py index c234802cecb..8e826b2183c 100644 --- a/lib/rucio/daemons/conveyor/receiver.py +++ b/lib/rucio/daemons/conveyor/receiver.py @@ -34,6 +34,7 @@ from rucio.common.logging import setup_logging from rucio.common.policy import get_policy from rucio.core import transfer as transfer_core +from rucio.core import request as request_core from rucio.core.monitor import MetricManager from rucio.daemons.common import HeartbeatHandler from rucio.db.sqla.session import transactional_session @@ -48,11 +49,12 @@ class Receiver(object): - def __init__(self, broker, id_, total_threads, all_vos=False): + def __init__(self, broker, id_, total_threads, transfer_stats_manager: request_core.TransferStatsManager, all_vos=False): self.__all_vos = all_vos self.__broker = broker self.__id = id_ self.__total_threads = total_threads + self._transfer_stats_manager = transfer_stats_manager @METRICS.count_it def on_error(self, frame): @@ -85,7 +87,12 @@ def _perform_request_update(self, msg, *, session=None, logger=logging.log): if tt_status_report.get_db_fields_to_update(session=session, logger=logger): logging.info('RECEIVED %s', tt_status_report) - ret = transfer_core.update_transfer_state(tt_status_report, session=session, logger=logger) + ret = transfer_core.update_transfer_state( + tt_status_report=tt_status_report, + stats_manager=self._transfer_stats_manager, + session=session, + logger=logger, + ) METRICS.counter('update_request_state.{updated}').labels(updated=ret).inc() except Exception: logging.critical(traceback.format_exc()) @@ -146,8 +153,8 @@ def receiver(id_, total_threads=1, all_vos=False): logging.info('receiver started') - with HeartbeatHandler(executable=DAEMON_NAME, renewal_interval=30) as heartbeat_handler: - + with (HeartbeatHandler(executable=DAEMON_NAME, renewal_interval=30) as heartbeat_handler, + request_core.TransferStatsManager() as transfer_stats_manager): while not GRACEFUL_STOP.is_set(): _, _, logger = heartbeat_handler.live() @@ -158,8 +165,15 @@ def receiver(id_, total_threads=1, all_vos=False): logger(logging.INFO, 'connecting to %s' % conn.transport._Transport__host_and_ports[0][0]) METRICS.counter('reconnect.{host}').labels(host=conn.transport._Transport__host_and_ports[0][0].split('.')[0]).inc() - conn.set_listener('rucio-messaging-fts3', Receiver(broker=conn.transport._Transport__host_and_ports[0], - id_=id_, total_threads=total_threads, all_vos=all_vos)) + conn.set_listener( + 'rucio-messaging-fts3', + Receiver( + broker=conn.transport._Transport__host_and_ports[0], + id_=id_, + total_threads=total_threads, + transfer_stats_manager=transfer_stats_manager, + all_vos=all_vos + )) if not use_ssl: conn.connect(username, password, wait=True) else: diff --git a/lib/rucio/db/sqla/migrate_repo/versions/a08fa8de1545_transfer_stats_table.py b/lib/rucio/db/sqla/migrate_repo/versions/a08fa8de1545_transfer_stats_table.py new file mode 100644 index 00000000000..1a5aabaaeef --- /dev/null +++ b/lib/rucio/db/sqla/migrate_repo/versions/a08fa8de1545_transfer_stats_table.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +''' transfer_stats table ''' + +import datetime + +import sqlalchemy as sa +from alembic import context +from alembic.op import (create_table, create_primary_key, create_foreign_key, + create_index, create_check_constraint, drop_table) + +from rucio.db.sqla.types import GUID +# Alembic revision identifiers +revision = 'a08fa8de1545' +down_revision = '4df2c5ddabc0' + + +def upgrade(): + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + create_table('transfer_stats', + sa.Column('id', GUID()), + sa.Column('resolution', sa.Integer), + sa.Column('timestamp', sa.DateTime), + sa.Column('dest_rse_id', GUID()), + sa.Column('src_rse_id', GUID()), + sa.Column('activity', sa.String(50)), + sa.Column('files_done', sa.BigInteger), + sa.Column('bytes_done', sa.BigInteger), + sa.Column('files_failed', sa.BigInteger), + sa.Column('created_at', sa.DateTime, default=datetime.datetime.utcnow), + sa.Column('updated_at', sa.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)) + create_primary_key('TRANSFER_STATS_PK', 'transfer_stats', ['id']) + create_foreign_key('TRANSFER_STATS_DEST_RSE_FK', 'transfer_stats', 'rses', ['dest_rse_id'], ['id']) + create_foreign_key('TRANSFER_STATS_SRC_RSE_FK', 'transfer_stats', 'rses', ['src_rse_id'], ['id']) + create_index('TRANSFER_STATS_KEY_IDX', 'transfer_stats', ['resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity']) + create_check_constraint('TRANSFER_STATS_CREATED_NN', 'transfer_stats', 'created_at is not null') + create_check_constraint('TRANSFER_STATS_UPDATED_NN', 'transfer_stats', 'updated_at is not null') + + +def downgrade(): + + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + drop_table('transfer_stats') diff --git a/lib/rucio/db/sqla/models.py b/lib/rucio/db/sqla/models.py index e6337ba0e85..7887eac1461 100644 --- a/lib/rucio/db/sqla/models.py +++ b/lib/rucio/db/sqla/models.py @@ -1420,6 +1420,24 @@ class Distance(BASE, ModelBase): Index('DISTANCES_DEST_RSEID_IDX', 'dest_rse_id')) +class TransferStats(BASE, ModelBase): + """Represents counters for transfer link usage""" + __tablename__ = 'transfer_stats' + id: Mapped[uuid.UUID] = mapped_column(GUID(), default=utils.generate_uuid) + resolution: Mapped[int] = mapped_column(Integer) + timestamp: Mapped[datetime] = mapped_column(DateTime) + dest_rse_id: Mapped[uuid.UUID] = mapped_column(GUID()) + src_rse_id: Mapped[uuid.UUID] = mapped_column(GUID()) + activity: Mapped[Optional[str]] = mapped_column(String(50)) + files_done: Mapped[int] = mapped_column(BigInteger) + bytes_done: Mapped[int] = mapped_column(BigInteger) + files_failed: Mapped[int] = mapped_column(BigInteger) + _table_args = (PrimaryKeyConstraint('id', name='TRANSFER_STATS_PK'), + ForeignKeyConstraint(['dest_rse_id'], ['rses.id'], name='TRANSFER_STATS_DEST_RSE_FK'), + ForeignKeyConstraint(['src_rse_id'], ['rses.id'], name='TRANSFER_STATS_SRC_RSE_FK'), + Index('TRANSFER_STATS_KEY_IDX', 'resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity')) + + class Subscription(BASE, ModelBase): """Represents a subscription""" __tablename__ = 'subscriptions' diff --git a/tests/test_conveyor.py b/tests/test_conveyor.py index fcb0f4cfdcd..934da677ab9 100644 --- a/tests/test_conveyor.py +++ b/tests/test_conveyor.py @@ -1364,7 +1364,10 @@ def bulk_query(self, requests_by_eid, timeout=None): 'rucio.core.config.REGION', 'rucio.daemons.reaper.reaper.REGION', ]}], indirect=True) -def test_two_multihops_same_intermediate_rse(rse_factory, did_factory, root_account, core_config_mock, caches_mock): +@pytest.mark.parametrize("file_config_mock", [{"overrides": [ + ('transfers', 'stats_enabled', 'True'), +]}], indirect=True) +def test_two_multihops_same_intermediate_rse(rse_factory, did_factory, root_account, file_config_mock, core_config_mock, caches_mock): """ Handle correctly two multihop transfers having to both jump via the same intermediate hops """ @@ -1379,6 +1382,7 @@ def test_two_multihops_same_intermediate_rse(rse_factory, did_factory, root_acco # +------>| RSE6 +--->| RSE7 | # | | | | # +------+ +------+ + start_time = datetime.utcnow() _, _, reaper_cache_region = caches_mock rse1, rse1_id = rse_factory.make_rse(scheme='mock', protocol_impl='rucio.rse.protocols.posix.Default') rse2, rse2_id = rse_factory.make_rse(scheme='mock', protocol_impl='rucio.rse.protocols.posix.Default') @@ -1465,6 +1469,19 @@ def on_submit(file): with pytest.raises(ReplicaNotFound): replica_core.get_replica(rse_id=rse_id, **did) + # Verify that the statistics are correctly recorded for executed transfers + stats_manager = request_core.TransferStatsManager() + dict_stats = {} + for stat in stats_manager.load_totals( + resolution=stats_manager.raw_resolution, + recent_t=datetime.utcnow(), + older_t=start_time - stats_manager.raw_resolution + ): + dict_stats.setdefault(stat['dest_rse_id'], {})[stat['src_rse_id']] = stat + assert dict_stats[rse2_id][rse1_id]['files_failed'] == 1 + assert dict_stats[rse2_id][rse1_id]['files_done'] == 1 + assert dict_stats[rse2_id][rse1_id]['bytes_done'] == 2 + @skip_rse_tests_with_accounts @pytest.mark.noparallel(groups=[NoParallelGroups.SUBMITTER, NoParallelGroups.POLLER])