Skip to content

Commit

Permalink
Transfers: gather and store transfer statistics
Browse files Browse the repository at this point in the history
Each time the poller or receiver transitions a transfer to a different
state, use the information to update the transfer statistic.
The goal is to be able to answer following questions in rucio:
 - how many successful transfers happened towards RSE1 yesterday
 - what was the average transfer rate from RSE1 to RSE2 last hour
Short-term it will allow to visualize the transfer status. Longer-term,
we may use this data to take better scheduling decisions (selection of
destination RSE during rule evaluation; selection of source RSE for
transfers).

Each poller/receiver will gather statistics over a certain time window
before committing the aggregated stats gathered over the previous
time interval into the database. This reduces the number of database
writes at the cost of potentially losing more metrics in case of a
crash. We keep track of the total number of failed/successful
transfers in the given time window; plus the number of transferred
bytes.

Metrics will be aggregated regularly into lower-resolution samples.
Aggregation is done by summing higher resolution metrics. Lower
resolution samples will be stored for a longer period in the database.

The behavior was heavily inspired by existing time series databases,
like prometheus and influxdb. The possibility to use an external tool
for this job was discussed, but rejected.
  • Loading branch information
Radu Carpa committed Nov 20, 2023
1 parent bcce05a commit f4bdf00
Show file tree
Hide file tree
Showing 9 changed files with 531 additions and 29 deletions.
23 changes: 23 additions & 0 deletions etc/sql/oracle/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1996,3 +1996,26 @@ CREATE GLOBAL TEMPORARY TABLE TEMPORARY_ID_4
id RAW(16),
CONSTRAINT TEMPORARY_ID_4_PK PRIMARY KEY (id)
) ON COMMIT DELETE ROWS;

-- 73 ) ========================================= TRANSFER_STATS table ===================================

CREATE TABLE TRANSFER_STATS
(
ID RAW(16),
RESOLUTION INTEGER,
TIMESTAMP DATE,
DEST_RSE_ID RAW(16),
SRC_RSE_ID RAW(16),
ACTIVITY VARCHAR2(50),
FILES_DONE NUMBER(19,0),
BYTES_DONE NUMBER(19,0),
FILES_FAILED NUMBER(19,0),
CREATED_AT DATE,
UPDATED_AT DATE,
CONSTRAINT TRANSFER_STATS_PK PRIMARY KEY (ID),
CONSTRAINT TRANSFER_STATS_DEST_RSE_FK FOREIGN KEY(DEST_RSE_ID) REFERENCES RSES (ID),
CONSTRAINT TRANSFER_STATS_SRC_RSE_FK FOREIGN KEY(SRC_RSE_ID) REFERENCES RSES (ID),
CONSTRAINT TRANSFER_STATS_CREATED_NN CHECK (CREATED_AT IS NOT NULL),
CONSTRAINT TRANSFER_STATS_UPDATED_NN CHECK (UPDATED_AT IS NOT NULL)
);
CREATE INDEX TRANSFER_STATS_KEY_IDX ON TRANSFER_STATS (RESOLUTION, TIMESTAMP, DEST_RSE_ID, SRC_RSE_ID, ACTIVITY);
2 changes: 1 addition & 1 deletion lib/rucio/alembicrevision.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

ALEMBIC_REVISION = '4df2c5ddabc0' # the current alembic head revision
ALEMBIC_REVISION = 'a08fa8de1545' # the current alembic head revision
343 changes: 340 additions & 3 deletions lib/rucio/core/request.py

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,13 @@ def set_transfers_state(


@transactional_session
def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "Session", logger=logging.log):
def update_transfer_state(
tt_status_report: TransferStatusReport,
stats_manager: request_core.TransferStatsManager,
*,
session: "Session",
logger=logging.log
):
"""
Used by poller and consumer to update the internal state of requests,
after the response by the external transfertool.
Expand All @@ -540,6 +546,14 @@ def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "S
if request_core.is_intermediate_hop(request):
request_core.handle_failed_intermediate_hop(request, session=session)

stats_manager.observe(
src_rse_id=request['source_rse_id'],
dst_rse_id=request['dest_rse_id'],
activity=request['activity'],
state=tt_status_report.state,
file_size=request['bytes'],
session=session,
)
request_core.add_monitor_message(
new_state=tt_status_report.state,
request=request,
Expand Down
57 changes: 40 additions & 17 deletions lib/rucio/daemons/conveyor/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import time
from itertools import groupby
from types import FrameType
from typing import TYPE_CHECKING, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence

from requests.exceptions import RequestException
from sqlalchemy.exc import DatabaseError
Expand All @@ -43,6 +43,7 @@
from rucio.core.topology import Topology, ExpiringObjectCache
from rucio.daemons.common import db_workqueue, ProducerConsumerDaemon
from rucio.db.sqla.constants import RequestState, RequestType
from rucio.transfertool.transfertool import Transfertool
from rucio.transfertool.fts3 import FTS3Transfertool
from rucio.transfertool.globus import GlobusTransferTool
from rucio.transfertool.mock import MockTransfertool
Expand Down Expand Up @@ -114,6 +115,7 @@ def _handle_requests(
multi_vo,
timeout,
transfertool,
transfer_stats_manager: request_core.TransferStatsManager,
oidc_account: Optional[str],
*,
logger=logging.log,
Expand Down Expand Up @@ -142,7 +144,13 @@ def _handle_requests(
else:
account = InternalAccount(oidc_account)
transfertool_obj = FTS3Transfertool(external_host=external_host, vo=vo, oidc_account=account)
poll_transfers(transfertool_obj=transfertool_obj, transfers_by_eid=chunk, timeout=timeout, logger=logger)
poll_transfers(
transfertool_obj=transfertool_obj,
transfers_by_eid=chunk,
transfer_stats_manager=transfer_stats_manager,
timeout=timeout,
logger=logger,
)
except Exception:
logger(logging.ERROR, 'Exception', exc_info=True)

Expand Down Expand Up @@ -182,6 +190,8 @@ def poller(
if filter_transfertool:
executable += ' --filter-transfertool ' + filter_transfertool

transfer_stats_manager = request_core.TransferStatsManager()

@db_workqueue(
once=once,
graceful_stop=GRACEFUL_STOP,
Expand Down Expand Up @@ -211,13 +221,15 @@ def _consumer(transfs):
timeout=timeout,
oidc_account=oidc_account,
transfertool=transfertool,
transfer_stats_manager=transfer_stats_manager,
)

ProducerConsumerDaemon(
producers=[_db_producer],
consumers=[_consumer for _ in range(total_threads)],
graceful_stop=GRACEFUL_STOP,
).run()
with transfer_stats_manager:
ProducerConsumerDaemon(
producers=[_db_producer],
consumers=[_consumer for _ in range(total_threads)],
graceful_stop=GRACEFUL_STOP,
).run()


def stop(signum: Optional[int] = None, frame: Optional[FrameType] = None) -> None:
Expand Down Expand Up @@ -280,19 +292,20 @@ def run(
)


def poll_transfers(transfertool_obj, transfers_by_eid, timeout=None, logger=logging.log):
def poll_transfers(
transfertool_obj: Transfertool,
transfers_by_eid: Mapping[str, Mapping[str, Any]],
transfer_stats_manager: request_core.TransferStatsManager,
timeout: "Optional[int]" = None,
logger=logging.log
):
"""
Poll a list of transfers from an FTS server
:param transfertool_obj: The Transfertool to use for query
:param transfers_by_eid: Dict of the form {external_id: list_of_transfers}
:param timeout: Timeout.
:param logger: Optional decorated logger that can be passed from the calling daemons or servers.
"""

poll_individual_transfers = False
try:
_poll_transfers(transfertool_obj, transfers_by_eid, timeout, logger)
_poll_transfers(transfertool_obj, transfers_by_eid, transfer_stats_manager, timeout, logger)
except TransferToolWrongAnswer:
poll_individual_transfers = True

Expand All @@ -301,12 +314,18 @@ def poll_transfers(transfertool_obj, transfers_by_eid, timeout=None, logger=logg
for external_id, transfers in transfers_by_eid.items():
logger(logging.DEBUG, 'Checking %s on %s' % (external_id, transfertool_obj))
try:
_poll_transfers(transfertool_obj, {external_id: transfers}, timeout, logger)
_poll_transfers(transfertool_obj, {external_id: transfers}, transfer_stats_manager, timeout, logger)
except Exception as err:
logger(logging.ERROR, 'Problem querying %s on %s . Error returned : %s' % (external_id, transfertool_obj, str(err)))


def _poll_transfers(transfertool_obj, transfers_by_eid, timeout, logger):
def _poll_transfers(
transfertool_obj: Transfertool,
transfers_by_eid: Mapping[str, Mapping[str, Any]],
transfer_stats_manager: request_core.TransferStatsManager,
timeout: "Optional[int]" = None,
logger=logging.log
):
"""
Helper function for poll_transfers which performs the actual polling and database update.
"""
Expand Down Expand Up @@ -355,7 +374,11 @@ def _poll_transfers(transfertool_obj, transfers_by_eid, timeout, logger):
METRICS.counter('query_transfer_exception').inc()
else:
for request_id in request_ids.intersection(transf_resp):
ret = transfer_core.update_transfer_state(transf_resp[request_id], logger=logger)
ret = transfer_core.update_transfer_state(
tt_status_report=transf_resp[request_id],
stats_manager=transfer_stats_manager,
logger=logger,
)
# if True, really update request content; if False, only touch request
if ret:
cnt += 1
Expand Down
26 changes: 20 additions & 6 deletions lib/rucio/daemons/conveyor/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from rucio.common.logging import setup_logging
from rucio.common.policy import get_policy
from rucio.core import transfer as transfer_core
from rucio.core import request as request_core
from rucio.core.monitor import MetricManager
from rucio.daemons.common import HeartbeatHandler
from rucio.db.sqla.session import transactional_session
Expand All @@ -48,11 +49,12 @@

class Receiver(object):

def __init__(self, broker, id_, total_threads, all_vos=False):
def __init__(self, broker, id_, total_threads, transfer_stats_manager: request_core.TransferStatsManager, all_vos=False):
self.__all_vos = all_vos
self.__broker = broker
self.__id = id_
self.__total_threads = total_threads
self._transfer_stats_manager = transfer_stats_manager

@METRICS.count_it
def on_error(self, frame):
Expand Down Expand Up @@ -85,7 +87,12 @@ def _perform_request_update(self, msg, *, session=None, logger=logging.log):
if tt_status_report.get_db_fields_to_update(session=session, logger=logger):
logging.info('RECEIVED %s', tt_status_report)

ret = transfer_core.update_transfer_state(tt_status_report, session=session, logger=logger)
ret = transfer_core.update_transfer_state(
tt_status_report=tt_status_report,
stats_manager=self._transfer_stats_manager,
session=session,
logger=logger,
)
METRICS.counter('update_request_state.{updated}').labels(updated=ret).inc()
except Exception:
logging.critical(traceback.format_exc())
Expand Down Expand Up @@ -146,8 +153,8 @@ def receiver(id_, total_threads=1, all_vos=False):

logging.info('receiver started')

with HeartbeatHandler(executable=DAEMON_NAME, renewal_interval=30) as heartbeat_handler:

with (HeartbeatHandler(executable=DAEMON_NAME, renewal_interval=30) as heartbeat_handler,
request_core.TransferStatsManager() as transfer_stats_manager):
while not GRACEFUL_STOP.is_set():

_, _, logger = heartbeat_handler.live()
Expand All @@ -158,8 +165,15 @@ def receiver(id_, total_threads=1, all_vos=False):
logger(logging.INFO, 'connecting to %s' % conn.transport._Transport__host_and_ports[0][0])
METRICS.counter('reconnect.{host}').labels(host=conn.transport._Transport__host_and_ports[0][0].split('.')[0]).inc()

conn.set_listener('rucio-messaging-fts3', Receiver(broker=conn.transport._Transport__host_and_ports[0],
id_=id_, total_threads=total_threads, all_vos=all_vos))
conn.set_listener(
'rucio-messaging-fts3',
Receiver(
broker=conn.transport._Transport__host_and_ports[0],
id_=id_,
total_threads=total_threads,
transfer_stats_manager=transfer_stats_manager,
all_vos=all_vos
))
if not use_ssl:
conn.connect(username, password, wait=True)
else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# Copyright European Organization for Nuclear Research (CERN) since 2012
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

''' transfer_stats table '''

import datetime

import sqlalchemy as sa
from alembic import context
from alembic.op import (create_table, create_primary_key, create_foreign_key,
create_index, create_check_constraint, drop_table)

from rucio.db.sqla.types import GUID
# Alembic revision identifiers
revision = 'a08fa8de1545'
down_revision = '4df2c5ddabc0'


def upgrade():
if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
create_table('transfer_stats',
sa.Column('id', GUID()),
sa.Column('resolution', sa.Integer),
sa.Column('timestamp', sa.DateTime),
sa.Column('dest_rse_id', GUID()),
sa.Column('src_rse_id', GUID()),
sa.Column('activity', sa.String(50)),
sa.Column('files_done', sa.BigInteger),
sa.Column('bytes_done', sa.BigInteger),
sa.Column('files_failed', sa.BigInteger),
sa.Column('created_at', sa.DateTime, default=datetime.datetime.utcnow),
sa.Column('updated_at', sa.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow))
create_primary_key('TRANSFER_STATS_PK', 'transfer_stats', ['id'])
create_foreign_key('TRANSFER_STATS_DEST_RSE_FK', 'transfer_stats', 'rses', ['dest_rse_id'], ['id'])
create_foreign_key('TRANSFER_STATS_SRC_RSE_FK', 'transfer_stats', 'rses', ['src_rse_id'], ['id'])
create_index('TRANSFER_STATS_KEY_IDX', 'transfer_stats', ['resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity'])
create_check_constraint('TRANSFER_STATS_CREATED_NN', 'transfer_stats', 'created_at is not null')
create_check_constraint('TRANSFER_STATS_UPDATED_NN', 'transfer_stats', 'updated_at is not null')


def downgrade():

if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']:
drop_table('transfer_stats')
18 changes: 18 additions & 0 deletions lib/rucio/db/sqla/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,24 @@ class Distance(BASE, ModelBase):
Index('DISTANCES_DEST_RSEID_IDX', 'dest_rse_id'))


class TransferStats(BASE, ModelBase):
"""Represents counters for transfer link usage"""
__tablename__ = 'transfer_stats'
id: Mapped[uuid.UUID] = mapped_column(GUID(), default=utils.generate_uuid)
resolution: Mapped[int] = mapped_column(Integer)
timestamp: Mapped[datetime] = mapped_column(DateTime)
dest_rse_id: Mapped[uuid.UUID] = mapped_column(GUID())
src_rse_id: Mapped[uuid.UUID] = mapped_column(GUID())
activity: Mapped[Optional[str]] = mapped_column(String(50))
files_done: Mapped[int] = mapped_column(BigInteger)
bytes_done: Mapped[int] = mapped_column(BigInteger)
files_failed: Mapped[int] = mapped_column(BigInteger)
_table_args = (PrimaryKeyConstraint('id', name='TRANSFER_STATS_PK'),
ForeignKeyConstraint(['dest_rse_id'], ['rses.id'], name='TRANSFER_STATS_DEST_RSE_FK'),
ForeignKeyConstraint(['src_rse_id'], ['rses.id'], name='TRANSFER_STATS_SRC_RSE_FK'),
Index('TRANSFER_STATS_KEY_IDX', 'resolution', 'timestamp', 'dest_rse_id', 'src_rse_id', 'activity'))


class Subscription(BASE, ModelBase):
"""Represents a subscription"""
__tablename__ = 'subscriptions'
Expand Down
19 changes: 18 additions & 1 deletion tests/test_conveyor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,10 @@ def bulk_query(self, requests_by_eid, timeout=None):
'rucio.core.config.REGION',
'rucio.daemons.reaper.reaper.REGION',
]}], indirect=True)
def test_two_multihops_same_intermediate_rse(rse_factory, did_factory, root_account, core_config_mock, caches_mock):
@pytest.mark.parametrize("file_config_mock", [{"overrides": [
('transfers', 'stats_enabled', 'True'),
]}], indirect=True)
def test_two_multihops_same_intermediate_rse(rse_factory, did_factory, root_account, file_config_mock, core_config_mock, caches_mock):
"""
Handle correctly two multihop transfers having to both jump via the same intermediate hops
"""
Expand All @@ -1379,6 +1382,7 @@ def test_two_multihops_same_intermediate_rse(rse_factory, did_factory, root_acco
# +------>| RSE6 +--->| RSE7 |
# | | | |
# +------+ +------+
start_time = datetime.utcnow()
_, _, reaper_cache_region = caches_mock
rse1, rse1_id = rse_factory.make_rse(scheme='mock', protocol_impl='rucio.rse.protocols.posix.Default')
rse2, rse2_id = rse_factory.make_rse(scheme='mock', protocol_impl='rucio.rse.protocols.posix.Default')
Expand Down Expand Up @@ -1465,6 +1469,19 @@ def on_submit(file):
with pytest.raises(ReplicaNotFound):
replica_core.get_replica(rse_id=rse_id, **did)

# Verify that the statistics are correctly recorded for executed transfers
stats_manager = request_core.TransferStatsManager()
dict_stats = {}
for stat in stats_manager.load_totals(
resolution=stats_manager.raw_resolution,
recent_t=datetime.utcnow(),
older_t=start_time - stats_manager.raw_resolution
):
dict_stats.setdefault(stat['dest_rse_id'], {})[stat['src_rse_id']] = stat
assert dict_stats[rse2_id][rse1_id]['files_failed'] == 1
assert dict_stats[rse2_id][rse1_id]['files_done'] == 1
assert dict_stats[rse2_id][rse1_id]['bytes_done'] == 2


@skip_rse_tests_with_accounts
@pytest.mark.noparallel(groups=[NoParallelGroups.SUBMITTER, NoParallelGroups.POLLER])
Expand Down

0 comments on commit f4bdf00

Please sign in to comment.