diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index 76cdf87206..cf95090cb7 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -22,6 +22,7 @@ import threading import traceback import uuid +from abc import ABCMeta, abstractmethod from collections import namedtuple, defaultdict from collections.abc import Sequence, Mapping, Iterator from dataclasses import dataclass @@ -51,6 +52,7 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session + from rucio.rse.protocols.protocol import RSEProtocol """ The core request.py is specifically for handling requests. @@ -73,6 +75,15 @@ def __str__(self): return "src_rse={}".format(self.rse) +class TransferDestination: + def __init__(self, rse: RseData, scheme): + self.rse = rse + self.scheme = scheme + + def __str__(self): + return "dst_rse={}".format(self.rse) + + class RequestWithSources: def __init__( self, @@ -146,6 +157,43 @@ def _parse_db_attributes(db_attributes): return attr +class DirectTransfer(metaclass=ABCMeta): + """ + The configuration for a direct (non-multi-hop) transfer. It can be a multi-source transfer. + """ + + def __init__(self, sources: list[RequestSource], rws: RequestWithSources) -> None: + self.sources: list[RequestSource] = sources + self.rws: RequestWithSources = rws + + @property + @abstractmethod + def src(self) -> RequestSource: + pass + + @property + @abstractmethod + def dst(self) -> TransferDestination: + pass + + @property + @abstractmethod + def dest_url(self) -> str: + pass + + @abstractmethod + def source_url(self, source: RequestSource) -> str: + pass + + @abstractmethod + def dest_protocol(self) -> "RSEProtocol": + pass + + @abstractmethod + def source_protocol(self, source: RequestSource) -> "RSEProtocol": + pass + + def should_retry_request(req, retry_protocol_mismatches): """ Whether should retry this request. diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 2810c74c5d..aa0758a1ce 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -38,7 +38,7 @@ from rucio.core import did, message as message_core, request as request_core from rucio.core.account import list_accounts from rucio.core.monitor import MetricManager -from rucio.core.request import transition_request_state, RequestWithSources, RequestSource +from rucio.core.request import transition_request_state, RequestWithSources, RequestSource, TransferDestination, DirectTransfer from rucio.core.rse import RseData from rucio.core.rse_expression_parser import parse_expression from rucio.db.sqla import models @@ -57,6 +57,7 @@ from sqlalchemy.orm import Session from rucio.common.types import InternalAccount from rucio.core.topology import Topology + from rucio.rse.protocols.protocol import RSEProtocol LoggerFunction = Callable[..., Any] @@ -81,15 +82,6 @@ } -class TransferDestination: - def __init__(self, rse: RseData, scheme): - self.rse = rse - self.scheme = scheme - - def __str__(self): - return "dst_rse={}".format(self.rse) - - class ProtocolFactory: """ Creates and caches protocol objects. Allowing to reuse them. @@ -106,7 +98,7 @@ def protocol(self, rse: RseData, scheme: "Optional[str]", operation: str): return protocol -class DirectTransferDefinition: +class DirectTransferImplementation(DirectTransfer): """ The configuration for a direct (non-multi-hop) transfer. It can be a multi-source transfer. @@ -115,10 +107,9 @@ class DirectTransferDefinition: """ def __init__(self, source: RequestSource, destination: TransferDestination, rws: RequestWithSources, protocol_factory: ProtocolFactory, operation_src: str, operation_dest: str): - self.sources = [source] + super().__init__(sources=[source], rws=rws) self.destination = destination - self.rws = rws self.protocol_factory = protocol_factory self.operation_src = operation_src self.operation_dest = operation_dest @@ -134,11 +125,11 @@ def __str__(self): ) @property - def src(self): + def src(self) -> RequestSource: return self.sources[0] @property - def dst(self): + def dst(self) -> TransferDestination: return self.destination @property @@ -159,31 +150,12 @@ def source_url(self, source: RequestSource) -> str: ) return url - def dest_protocol(self): + def dest_protocol(self) -> "RSEProtocol": return self.protocol_factory.protocol(self.dst.rse, self.dst.scheme, self.operation_dest) - def source_protocol(self, source: RequestSource): + def source_protocol(self, source: RequestSource) -> "RSEProtocol": return self.protocol_factory.protocol(source.rse, source.scheme, self.operation_src) - @property - def use_ipv4(self): - # If any source or destination rse is ipv4 only - return self.dst.rse.attributes.get('use_ipv4', False) or any(src.rse.attributes.get('use_ipv4', False) - for src in self.sources) - - @property - def use_tokens(self) -> bool: - """Whether a transfer can be performed with tokens. - - In order to be so, all the involved RSEs must have it explicitly enabled - and the protocol being used must be WebDAV. - """ - for endpoint in [*self.sources, self.destination]: - if (endpoint.rse.attributes.get('oidc_support') is not True - or endpoint.scheme != 'davs'): - return False - return True - @staticmethod def __rewrite_source_url(source_url, source_sign_url, dest_sign_url, source_scheme): """ @@ -269,7 +241,7 @@ def _generate_dest_url(cls, dst: TransferDestination, rws: RequestWithSources, p return dest_url -class StageinTransferDefinition(DirectTransferDefinition): +class StageinTransferImplementation(DirectTransferImplementation): """ A definition of a transfer which triggers a stagein operation. - The source and destination url are identical @@ -306,7 +278,7 @@ def source_url(self, source: RequestSource) -> str: return self.dest_url -def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str: +def transfer_path_str(transfer_path: "list[DirectTransfer]") -> str: """ an implementation of __str__ for a transfer path, which is a list of direct transfers, so not really an object """ @@ -333,7 +305,7 @@ def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str: @transactional_session def mark_submitting( - transfer: "DirectTransferDefinition", + transfer: "DirectTransfer", external_host: str, *, logger: "Callable", @@ -379,7 +351,7 @@ def mark_submitting( @transactional_session def ensure_db_sources( - transfer_path: "list[DirectTransferDefinition]", + transfer_path: "list[DirectTransfer]", *, logger: "Callable", session: "Session", @@ -640,7 +612,7 @@ def _create_transfer_definitions( domain: str, *, session: "Session", -) -> "dict[RseData, list[DirectTransferDefinition]]": +) -> "dict[RseData, list[DirectTransfer]]": """ Find the all paths from sources towards the destination of the given transfer request. Create the transfer definitions for each point-to-point transfer (multi-source, when possible) @@ -668,7 +640,7 @@ def _create_transfer_definitions( rse=hop_dst_rse, scheme=hop['dest_scheme'], ) - hop_definition = DirectTransferDefinition( + hop_definition = DirectTransferImplementation( source=src, destination=dst, operation_src=operation_src, @@ -764,13 +736,13 @@ def _create_stagein_definitions( operation_src: str, operation_dest: str, protocol_factory: ProtocolFactory, -) -> "dict[RseData, list[StageinTransferDefinition]]": +) -> "dict[RseData, list[DirectTransfer]]": """ for each source, create a single-hop transfer path with a one stageing definition inside """ transfers_by_source = { source.rse: [ - StageinTransferDefinition( + cast(DirectTransfer, StageinTransferImplementation( source=RequestSource( rse=source.rse, file_path=source.file_path, @@ -785,7 +757,7 @@ def _create_stagein_definitions( operation_dest=operation_dest, rws=rws, protocol_factory=protocol_factory, - ) + )) ] for source in sources @@ -804,9 +776,9 @@ def get_dsn(scope, name, dsn): def __compress_multihops( - paths_by_source: "Iterable[tuple[RequestSource, Sequence[DirectTransferDefinition]]]", + paths_by_source: "Iterable[tuple[RequestSource, Sequence[DirectTransfer]]]", sources: "Iterable[RequestSource]", -) -> "Iterator[tuple[RequestSource, Sequence[DirectTransferDefinition]]]": +) -> "Iterator[tuple[RequestSource, Sequence[DirectTransfer]]]": # Compress multihop transfers which contain other sources as part of itself. # For example: multihop A->B->C and B is a source, compress A->B->C into B->C source_rses = {s.rse.id for s in sources} @@ -853,7 +825,7 @@ def build_or_return_cached( *, logger: "LoggerFunction" = logging.log, session: "Session" - ) -> "Mapping[RseData, Sequence[DirectTransferDefinition]]": + ) -> "Mapping[RseData, Sequence[DirectTransfer]]": """ Warning: The function currently caches the result for the given request and returns it for later calls with the same request id. As a result: it can return more (or less) sources than what is provided in the @@ -1055,7 +1027,7 @@ def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[ class PathDistance(SourceRankingStrategy): class _RankingContext(RequestRankingContext): - def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources", paths_for_rws: "Mapping[RseData, Sequence[DirectTransferDefinition]]"): + def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources", paths_for_rws: "Mapping[RseData, Sequence[DirectTransfer]]"): super().__init__(strategy, rws) self.paths_for_rws = paths_for_rws @@ -1414,7 +1386,7 @@ def cancel_transfer(transfertool_obj, transfer_id): @transactional_session def prepare_transfers( - candidate_paths_by_request_id: "dict[str, list[list[DirectTransferDefinition]]]", + candidate_paths_by_request_id: "dict[str, list[list[DirectTransfer]]]", logger: "LoggerFunction" = logging.log, transfertools: "Optional[list[str]]" = None, *, diff --git a/lib/rucio/daemons/conveyor/common.py b/lib/rucio/daemons/conveyor/common.py index d533513241..40957ddb3c 100644 --- a/lib/rucio/daemons/conveyor/common.py +++ b/lib/rucio/daemons/conveyor/common.py @@ -44,7 +44,7 @@ if TYPE_CHECKING: from collections.abc import Callable, Sequence from typing import Optional - from rucio.core.transfer import DirectTransferDefinition + from rucio.core.request import DirectTransfer from rucio.transfertool.transfertool import TransferToolBuilder from sqlalchemy.orm import Session @@ -131,10 +131,10 @@ def pick_and_prepare_submission_path(requests_with_sources, topology, protocol_f def __assign_to_transfertool( - transfer_path: "list[DirectTransferDefinition]", + transfer_path: "list[DirectTransfer]", transfertools: "Optional[list[str]]", logger: "Callable", -) -> "list[tuple[list[DirectTransferDefinition], Optional[TransferToolBuilder]]]": +) -> "list[tuple[list[DirectTransfer], Optional[TransferToolBuilder]]]": """ Iterate over a multihop path and assign sub-paths to transfertools in chucks from left to right. @@ -174,11 +174,11 @@ def __assign_to_transfertool( def assign_paths_to_transfertool_and_create_hops( - candidate_paths_by_request_id: "dict[str: list[DirectTransferDefinition]]", + candidate_paths_by_request_id: "dict[str: list[DirectTransfer]]", default_tombstone_delay: int, transfertools: "Optional[list[str]]" = None, logger: "Callable" = logging.log, -) -> "tuple[dict[TransferToolBuilder, list[DirectTransferDefinition]], set[str]]": +) -> "tuple[dict[TransferToolBuilder, list[DirectTransfer]], set[str]]": """ for each request, pick the first path which can be submitted by one of the transfertools. If the chosen path is multihop, create all missing intermediate requests and replicas. @@ -214,13 +214,13 @@ def assign_paths_to_transfertool_and_create_hops( @transactional_session def __assign_paths_to_transfertool_and_create_hops( request_id: str, - candidate_paths: "Sequence[list[DirectTransferDefinition]]", + candidate_paths: "Sequence[list[DirectTransfer]]", default_tombstone_delay: int, transfertools: "Optional[list[str]]" = None, *, logger: "Callable" = logging.log, session: "Session", -) -> "tuple[Optional[list[DirectTransferDefinition]], Optional[TransferToolBuilder]]": +) -> "tuple[Optional[list[DirectTransfer]], Optional[TransferToolBuilder]]": """ Out of a sequence of candidate paths for the given request, pick the first path which can be submitted by one of the transfertools. @@ -290,7 +290,7 @@ def __assign_paths_to_transfertool_and_create_hops( @transactional_session def __create_missing_replicas_and_requests( - transfer_path: "list[DirectTransferDefinition]", + transfer_path: "list[DirectTransfer]", default_tombstone_delay: int, *, logger: "Callable", diff --git a/lib/rucio/transfertool/fts3.py b/lib/rucio/transfertool/fts3.py index 112bcb4403..41cfcdb121 100644 --- a/lib/rucio/transfertool/fts3.py +++ b/lib/rucio/transfertool/fts3.py @@ -45,7 +45,7 @@ from rucio.transfertool.transfertool import Transfertool, TransferToolBuilder, TransferStatusReport if TYPE_CHECKING: - from rucio.core.transfer import DirectTransferDefinition + from rucio.core.request import DirectTransfer from rucio.core.rse import RseData logging.getLogger("requests").setLevel(logging.CRITICAL) @@ -197,7 +197,7 @@ def _configured_source_strategy(activity: str, logger: Callable[..., Any]) -> st def _available_checksums( - transfer: "DirectTransferDefinition", + transfer: "DirectTransfer", ) -> tuple[set[str], set[str]]: """ Get checksums which can be used for file validation on the source and the destination RSE @@ -218,7 +218,7 @@ def _available_checksums( def _hop_checksum_validation_strategy( - transfer: "DirectTransferDefinition", + transfer: "DirectTransfer", logger: Callable[..., Any], ) -> tuple[str, set[str]]: """ @@ -244,7 +244,7 @@ def _hop_checksum_validation_strategy( def _path_checksum_validation_strategy( - transfer_path: "list[DirectTransferDefinition]", + transfer_path: "list[DirectTransfer]", logger: Callable[..., Any], ) -> str: """ @@ -261,7 +261,7 @@ def _path_checksum_validation_strategy( def _pick_fts_checksum( - transfer: "DirectTransferDefinition", + transfer: "DirectTransfer", path_strategy: "str", ) -> Optional[str]: """ @@ -295,6 +295,19 @@ def _pick_fts_checksum( return checksum_to_use +def _use_tokens(transfer_hop: "DirectTransfer"): + """Whether a transfer can be performed with tokens. + + In order to be so, all the involved RSEs must have it explicitly enabled + and the protocol being used must be WebDAV. + """ + for endpoint in [*transfer_hop.sources, transfer_hop.dst]: + if (endpoint.rse.attributes.get('oidc_support') is not True + or endpoint.scheme != 'davs'): + return False + return True + + def build_job_params(transfer_path, bring_online, default_lifetime, archive_timeout_override, max_time_in_queue, logger): """ Prepare the job parameters which will be passed to FTS transfertool @@ -343,7 +356,8 @@ def build_job_params(transfer_path, bring_online, default_lifetime, archive_time job_params['strict_copy'] = strict_copy if dest_spacetoken: job_params['spacetoken'] = dest_spacetoken - if last_hop.use_ipv4: + if (last_hop.dst.rse.attributes.get('use_ipv4', False) + or any(src.rse.attributes.get('use_ipv4', False) for src in last_hop.sources)): job_params['ipv4'] = True job_params['ipv6'] = False @@ -870,7 +884,7 @@ def submission_builder_for_path(cls, transfer_path, logger=logging.log): if sub_path: oidc_support = False - if all(t.use_tokens for t in sub_path): + if all(_use_tokens(t) for t in sub_path): logger(logging.DEBUG, 'OAuth2/OIDC available for transfer {}'.format([str(hop) for hop in sub_path])) oidc_support = True return sub_path, TransferToolBuilder(cls, external_host=fts_hosts[0], oidc_support=oidc_support, vo=vo) diff --git a/lib/rucio/transfertool/transfertool.py b/lib/rucio/transfertool/transfertool.py index 6805288def..f1d9bef929 100644 --- a/lib/rucio/transfertool/transfertool.py +++ b/lib/rucio/transfertool/transfertool.py @@ -151,7 +151,7 @@ def submission_builder_for_path(cls, transfer_path, logger=logging.log): Analyze the transfer path. If this transfertool class can submit the given transfers, return a TransferToolBuilder instance capable to build transfertool objects configured for this particular submission. - :param transfer_path: List of DirectTransferDefinitions + :param transfer_path: List of DirectTransfer :param logger: logger instance :return: a tuple: a sub-path starting at the first node from transfer_path, and a TransfertoolBuilder instance capable to submit this sub-path. Returns ([], None) if submission is impossible.