From 9affcf22a827a5914008cb98b06fd3f28a6bc0a0 Mon Sep 17 00:00:00 2001 From: Radu Carpa Date: Wed, 27 Sep 2023 09:52:42 +0200 Subject: [PATCH] Core & Internals: move update_request_state to transfer.py It manipulates input from the transfertool. --- lib/rucio/core/request.py | 40 ----------------------- lib/rucio/core/transfer.py | 44 +++++++++++++++++++++++++- lib/rucio/daemons/conveyor/poller.py | 2 +- lib/rucio/daemons/conveyor/receiver.py | 4 +-- 4 files changed, 46 insertions(+), 44 deletions(-) diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index ef30818a675..5a88da3e059 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -2020,46 +2020,6 @@ def update_requests_priority(priority, filter_, *, session: "Session", logger=lo raise RucioException(error.args) -@transactional_session -def update_request_state(tt_status_report, *, session: "Session", logger=logging.log): - """ - Used by poller and consumer to update the internal state of requests, - after the response by the external transfertool. - - :param tt_status_report: The transfertool status update, retrieved via request.query_request(). - :param session: The database session to use. - :param logger: Optional decorated logger that can be passed from the calling daemons or servers. - :returns commit_or_rollback: Boolean. - """ - - request_id = tt_status_report.request_id - try: - fields_to_update = tt_status_report.get_db_fields_to_update(session=session, logger=logger) - if not fields_to_update: - update_request(request_id, raise_on_missing=True, session=session) - return False - else: - logger(logging.INFO, 'UPDATING REQUEST %s FOR %s with changes: %s' % (str(request_id), tt_status_report, fields_to_update)) - - set_request_state(request_id, session=session, **fields_to_update) - request = tt_status_report.request(session) - - if tt_status_report.state == RequestState.FAILED: - if is_intermediate_hop(request): - handle_failed_intermediate_hop(request, session=session) - - add_monitor_message(new_state=tt_status_report.state, - request=request, - additional_fields=tt_status_report.get_monitor_msg_fields(session=session, logger=logger), - session=session) - return True - except UnsupportedOperation as error: - logger(logging.WARNING, "Request %s doesn't exist - Error: %s" % (request_id, str(error).replace('\n', ''))) - return False - except Exception: - logger(logging.CRITICAL, "Exception", exc_info=True) - - @read_session def add_monitor_message(new_state, request, additional_fields, *, session: "Session"): """ diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 2ae95b89884..b7701549254 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -30,7 +30,7 @@ from rucio.common.constants import SUPPORTED_PROTOCOLS from rucio.common.exception import (InvalidRSEExpression, RequestNotFound, RSEProtocolNotSupported, - RucioException) + RucioException, UnsupportedOperation) from rucio.common.utils import construct_surl from rucio.core import did, message as message_core, request as request_core from rucio.core.account import list_accounts @@ -500,6 +500,48 @@ def set_transfers_state( logger(logging.DEBUG, 'Finished to register transfer state for %s' % external_id) +@transactional_session +def update_transfer_state(tt_status_report, *, session: "Session", logger=logging.log): + """ + Used by poller and consumer to update the internal state of requests, + after the response by the external transfertool. + + :param tt_status_report: The transfertool status update, retrieved via request.query_request(). + :param session: The database session to use. + :param logger: Optional decorated logger that can be passed from the calling daemons or servers. + :returns commit_or_rollback: Boolean. + """ + + request_id = tt_status_report.request_id + try: + fields_to_update = tt_status_report.get_db_fields_to_update(session=session, logger=logger) + if not fields_to_update: + request_core.update_request(request_id, raise_on_missing=True, session=session) + return False + else: + logger(logging.INFO, 'UPDATING REQUEST %s FOR %s with changes: %s' % (str(request_id), tt_status_report, fields_to_update)) + + set_request_state(request_id, session=session, **fields_to_update) + request = tt_status_report.request(session) + + if tt_status_report.state == RequestState.FAILED: + if request_core.is_intermediate_hop(request): + request_core.handle_failed_intermediate_hop(request, session=session) + + request_core.add_monitor_message( + new_state=tt_status_report.state, + request=request, + additional_fields=tt_status_report.get_monitor_msg_fields(session=session, logger=logger), + session=session + ) + return True + except UnsupportedOperation as error: + logger(logging.WARNING, "Request %s doesn't exist - Error: %s" % (request_id, str(error).replace('\n', ''))) + return False + except Exception: + logger(logging.CRITICAL, "Exception", exc_info=True) + + @transactional_session def mark_transfer_lost(request, *, session: "Session", logger=logging.log): new_state = RequestState.LOST diff --git a/lib/rucio/daemons/conveyor/poller.py b/lib/rucio/daemons/conveyor/poller.py index 2bab0760d52..7edeb29b24f 100644 --- a/lib/rucio/daemons/conveyor/poller.py +++ b/lib/rucio/daemons/conveyor/poller.py @@ -352,7 +352,7 @@ def _poll_transfers(transfertool_obj, transfers_by_eid, timeout, logger): METRICS.counter('query_transfer_exception').inc() else: for request_id in request_ids.intersection(transf_resp): - ret = request_core.update_request_state(transf_resp[request_id], logger=logger) + ret = transfer_core.update_transfer_state(transf_resp[request_id], logger=logger) # if True, really update request content; if False, only touch request if ret: cnt += 1 diff --git a/lib/rucio/daemons/conveyor/receiver.py b/lib/rucio/daemons/conveyor/receiver.py index 2a255eec41f..c234802cecb 100644 --- a/lib/rucio/daemons/conveyor/receiver.py +++ b/lib/rucio/daemons/conveyor/receiver.py @@ -33,7 +33,7 @@ from rucio.common.config import config_get, config_get_bool, config_get_int from rucio.common.logging import setup_logging from rucio.common.policy import get_policy -from rucio.core import request as request_core +from rucio.core import transfer as transfer_core from rucio.core.monitor import MetricManager from rucio.daemons.common import HeartbeatHandler from rucio.db.sqla.session import transactional_session @@ -85,7 +85,7 @@ def _perform_request_update(self, msg, *, session=None, logger=logging.log): if tt_status_report.get_db_fields_to_update(session=session, logger=logger): logging.info('RECEIVED %s', tt_status_report) - ret = request_core.update_request_state(tt_status_report, session=session, logger=logger) + ret = transfer_core.update_transfer_state(tt_status_report, session=session, logger=logger) METRICS.counter('update_request_state.{updated}').labels(updated=ret).inc() except Exception: logging.critical(traceback.format_exc())