From acfccb6d5d1e66999b76efbc8af3d5a06f028805 Mon Sep 17 00:00:00 2001 From: Radu Carpa Date: Fri, 17 Nov 2023 16:23:21 +0100 Subject: [PATCH] Transfers: re-introduce last_processed_by logic in poller. Closes #6376 The protection provided by older_than, mentioned in 33f957827b6035ae330d5f7f4bc3adc729617f4f, was over-estimated. --- lib/rucio/core/request.py | 17 ++++++++++++++++- lib/rucio/daemons/conveyor/poller.py | 3 +++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index 5213fb37e9e..8accda0994b 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -688,7 +688,7 @@ def get_and_mark_next( for share in activity_shares: query = select( - models.Request + models.Request.id ).where( models.Request.state.in_(state), models.Request.request_type.in_(request_type) @@ -744,6 +744,21 @@ def get_and_mark_next( else: query = query.limit(limit) + if session.bind.dialect.name == 'oracle': + query = select( + models.Request + ).where( + models.Request.id.in_(query) + ).with_for_update( + skip_locked=True + ) + else: + query = query.with_only_columns( + models.Request + ).with_for_update( + skip_locked=True, + of=models.Request.last_processed_by + ) query_result = session.execute(query).scalars() if query_result: if mode_all: diff --git a/lib/rucio/daemons/conveyor/poller.py b/lib/rucio/daemons/conveyor/poller.py index 7edeb29b24f..0d78ff15e2f 100644 --- a/lib/rucio/daemons/conveyor/poller.py +++ b/lib/rucio/daemons/conveyor/poller.py @@ -66,6 +66,7 @@ def _fetch_requests( filter_transfertool, cached_topology, activity, + set_last_processed_by: bool, heartbeat_handler ): worker_number, total_workers, logger = heartbeat_handler.live() @@ -77,6 +78,7 @@ def _fetch_requests( rse_collection=topology, request_type=[RequestType.TRANSFER, RequestType.STAGEIN, RequestType.STAGEOUT], state=[RequestState.SUBMITTED], + processed_by=heartbeat_handler.short_executable if set_last_processed_by else None, limit=db_bulk, older_than=datetime.datetime.utcnow() - datetime.timedelta(seconds=older_than) if older_than else None, total_workers=total_workers, @@ -197,6 +199,7 @@ def _db_producer(*, activity: str, heartbeat_handler: "HeartbeatHandler"): filter_transfertool=filter_transfertool, cached_topology=cached_topology, activity=activity, + set_last_processed_by=not once, heartbeat_handler=heartbeat_handler, )