Skip to content

Commit

Permalink
Core & Internals: move update_request_state to transfer.py
Browse files Browse the repository at this point in the history
It manipulates input from the transfertool.
  • Loading branch information
Radu Carpa committed Nov 10, 2023
1 parent 666f0f0 commit 11b9550
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 44 deletions.
40 changes: 0 additions & 40 deletions lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -2020,46 +2020,6 @@ def update_requests_priority(priority, filter_, *, session: "Session", logger=lo
raise RucioException(error.args)


@transactional_session
def update_request_state(tt_status_report, *, session: "Session", logger=logging.log):
"""
Used by poller and consumer to update the internal state of requests,
after the response by the external transfertool.
:param tt_status_report: The transfertool status update, retrieved via request.query_request().
:param session: The database session to use.
:param logger: Optional decorated logger that can be passed from the calling daemons or servers.
:returns commit_or_rollback: Boolean.
"""

request_id = tt_status_report.request_id
try:
fields_to_update = tt_status_report.get_db_fields_to_update(session=session, logger=logger)
if not fields_to_update:
update_request(request_id, raise_on_missing=True, session=session)
return False
else:
logger(logging.INFO, 'UPDATING REQUEST %s FOR %s with changes: %s' % (str(request_id), tt_status_report, fields_to_update))

set_request_state(request_id, session=session, **fields_to_update)
request = tt_status_report.request(session)

if tt_status_report.state == RequestState.FAILED:
if is_intermediate_hop(request):
handle_failed_intermediate_hop(request, session=session)

add_monitor_message(new_state=tt_status_report.state,
request=request,
additional_fields=tt_status_report.get_monitor_msg_fields(session=session, logger=logger),
session=session)
return True
except UnsupportedOperation as error:
logger(logging.WARNING, "Request %s doesn't exist - Error: %s" % (request_id, str(error).replace('\n', '')))
return False
except Exception:
logger(logging.CRITICAL, "Exception", exc_info=True)


@read_session
def add_monitor_message(new_state, request, additional_fields, *, session: "Session"):
"""
Expand Down
45 changes: 44 additions & 1 deletion lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from rucio.common.constants import SUPPORTED_PROTOCOLS
from rucio.common.exception import (InvalidRSEExpression,
RequestNotFound, RSEProtocolNotSupported,
RucioException)
RucioException, UnsupportedOperation)
from rucio.common.utils import construct_surl
from rucio.core import did, message as message_core, request as request_core
from rucio.core.account import list_accounts
Expand All @@ -42,6 +42,7 @@
from rucio.db.sqla.constants import DIDType, RequestState, RequestType, TransferLimitDirection
from rucio.db.sqla.session import read_session, transactional_session, stream_session
from rucio.rse import rsemanager as rsemgr
from rucio.transfertool.transfertool import TransferStatusReport
from rucio.transfertool.fts3 import FTS3Transfertool
from rucio.transfertool.globus import GlobusTransferTool
from rucio.transfertool.mock import MockTransfertool
Expand Down Expand Up @@ -500,6 +501,48 @@ def set_transfers_state(
logger(logging.DEBUG, 'Finished to register transfer state for %s' % external_id)


@transactional_session
def update_transfer_state(tt_status_report: TransferStatusReport, *, session: "Session", logger=logging.log):
"""
Used by poller and consumer to update the internal state of requests,
after the response by the external transfertool.
:param tt_status_report: The transfertool status update, retrieved via request.query_request().
:param session: The database session to use.
:param logger: Optional decorated logger that can be passed from the calling daemons or servers.
:returns commit_or_rollback: Boolean.
"""

request_id = tt_status_report.request_id
try:
fields_to_update = tt_status_report.get_db_fields_to_update(session=session, logger=logger)
if not fields_to_update:
request_core.update_request(request_id, raise_on_missing=True, session=session)
return False
else:
logger(logging.INFO, 'UPDATING REQUEST %s FOR %s with changes: %s' % (str(request_id), tt_status_report, fields_to_update))

set_request_state(request_id, session=session, **fields_to_update)
request = tt_status_report.request(session)

if tt_status_report.state == RequestState.FAILED:
if request_core.is_intermediate_hop(request):
request_core.handle_failed_intermediate_hop(request, session=session)

request_core.add_monitor_message(
new_state=tt_status_report.state,
request=request,
additional_fields=tt_status_report.get_monitor_msg_fields(session=session, logger=logger),
session=session
)
return True
except UnsupportedOperation as error:
logger(logging.WARNING, "Request %s doesn't exist - Error: %s" % (request_id, str(error).replace('\n', '')))
return False
except Exception:
logger(logging.CRITICAL, "Exception", exc_info=True)


@transactional_session
def mark_transfer_lost(request, *, session: "Session", logger=logging.log):
new_state = RequestState.LOST
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/daemons/conveyor/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def _poll_transfers(transfertool_obj, transfers_by_eid, timeout, logger):
METRICS.counter('query_transfer_exception').inc()
else:
for request_id in request_ids.intersection(transf_resp):
ret = request_core.update_request_state(transf_resp[request_id], logger=logger)
ret = transfer_core.update_transfer_state(transf_resp[request_id], logger=logger)
# if True, really update request content; if False, only touch request
if ret:
cnt += 1
Expand Down
4 changes: 2 additions & 2 deletions lib/rucio/daemons/conveyor/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from rucio.common.config import config_get, config_get_bool, config_get_int
from rucio.common.logging import setup_logging
from rucio.common.policy import get_policy
from rucio.core import request as request_core
from rucio.core import transfer as transfer_core
from rucio.core.monitor import MetricManager
from rucio.daemons.common import HeartbeatHandler
from rucio.db.sqla.session import transactional_session
Expand Down Expand Up @@ -85,7 +85,7 @@ def _perform_request_update(self, msg, *, session=None, logger=logging.log):
if tt_status_report.get_db_fields_to_update(session=session, logger=logger):
logging.info('RECEIVED %s', tt_status_report)

ret = request_core.update_request_state(tt_status_report, session=session, logger=logger)
ret = transfer_core.update_transfer_state(tt_status_report, session=session, logger=logger)
METRICS.counter('update_request_state.{updated}').labels(updated=ret).inc()
except Exception:
logging.critical(traceback.format_exc())
Expand Down

0 comments on commit 11b9550

Please sign in to comment.