diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 2ae95b89884..594dedb2f04 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -18,6 +18,7 @@ import re import time import traceback +from collections import defaultdict from typing import TYPE_CHECKING from dogpile.cache import make_region @@ -47,7 +48,7 @@ from rucio.transfertool.mock import MockTransfertool if TYPE_CHECKING: - from collections.abc import Callable, Generator, Iterable + from collections.abc import Callable, Generator, Iterable, Sequence from typing import Any, Optional from sqlalchemy.orm import Session from rucio.common.types import InternalAccount @@ -540,12 +541,11 @@ def touch_transfer(external_host, transfer_id, *, session: "Session"): raise RucioException(error.args) -@read_session -def __create_transfer_definitions( +def _create_transfer_definitions( topology: "Topology", protocol_factory: ProtocolFactory, rws: RequestWithSources, - sources: "list[RequestSource]", + sources: "Sequence[RequestSource]", max_sources: int, multi_source_sources: "list[RequestSource]", limit_dest_schemes: list[str], @@ -621,7 +621,7 @@ def __create_transfer_definitions( ) transfer_path.append(hop_definition) - transfers_by_source[source.rse.id] = transfer_path + transfers_by_source[source.rse] = transfer_path # create multi-source transfers: add additional sources if possible for transfer_path in transfers_by_source.values(): @@ -671,9 +671,9 @@ def __create_transfer_definitions( return transfers_by_source -def __create_stagein_definitions( +def _create_stagein_definitions( rws: RequestWithSources, - sources: "list[RequestSource]", + sources: "Sequence[RequestSource]", limit_dest_schemes: list[str], operation_src: str, operation_dest: str, @@ -683,7 +683,7 @@ def __create_stagein_definitions( for each source, create a single-hop transfer path with a one stageing definition inside """ transfers_by_source = { - source.rse.id: [ + source.rse: [ StageinTransferDefinition( source=RequestSource( rse_data=source.rse, @@ -717,15 +717,6 @@ def get_dsn(scope, name, dsn): return 'other' -def __filter_multihops_with_intermediate_tape(candidate_paths: "Iterable[list[DirectTransferDefinition]]") -> "Generator[list[DirectTransferDefinition]]": - # Discard multihop transfers which contain a tape source as an intermediate hop - for path in candidate_paths: - if any(transfer.src.rse.is_tape_or_staging_required() for transfer in path[1:]): - pass - else: - yield path - - def __compress_multihops( candidate_paths: "Iterable[list[DirectTransferDefinition]]", sources: "Iterable[RequestSource]", @@ -748,24 +739,201 @@ def __compress_multihops( yield path -def __sort_paths(candidate_paths: "Iterable[list[DirectTransferDefinition]]") -> "Generator[list[DirectTransferDefinition]]": - - def __transfer_order_key(transfer_path): - # Reduce the priority of the tape sources. If there are any disk sources, - # they must fail twice (1 penalty + 1 disk preferred over tape) before a tape will even be tried - source_ranking_penalty = 1 if transfer_path[0].src.rse.is_tape_or_staging_required() else 0 - # higher source_ranking first, - # on equal source_ranking, prefer DISK over TAPE - # on equal type, prefer lower distance - # on equal distance, prefer single hop - return ( - - transfer_path[0].src.ranking + source_ranking_penalty, - transfer_path[0].src.rse.is_tape_or_staging_required(), # rely on the fact that False < True - transfer_path[0].src.distance, - len(transfer_path) > 1, # rely on the fact that False < True - ) +class TransferDefinitionBuilder: + def __init__( + self, + topology: "Topology", + protocol_factory: ProtocolFactory, + max_sources: int, + preparer_mode: bool = False, + schemes: "Optional[list[str]]" = None, + failover_schemes: "Optional[list[str]]" = None, + ): + self.failover_schemes = failover_schemes if failover_schemes is not None else [] + self.schemes = schemes if schemes is not None else [] + self.topology = topology + self.preparer_mode = preparer_mode + self.protocol_factory = protocol_factory + self.max_sources = max_sources + + self.definition_by_request_id = {} + + def build(self, rws: RequestWithSources, sources: "Sequence[RequestSource]", *, logger: "LoggerFunction" = logging.log, session: "Session"): + definition = self.definition_by_request_id.get(rws.request_id) + if definition: + return definition + + transfer_schemes = self.schemes + if rws.previous_attempt_id and self.failover_schemes: + transfer_schemes = self.failover_schemes + + if rws.request_type == RequestType.STAGEIN: + definition = _create_stagein_definitions( + rws=rws, + sources=sources, + limit_dest_schemes=transfer_schemes, + operation_src='read', + operation_dest='write', + protocol_factory=self.protocol_factory + ) + else: + definition = _create_transfer_definitions( + topology=self.topology, + rws=rws, + sources=sources, + max_sources=self.max_sources, + multi_source_sources=[] if self.preparer_mode else sources, + limit_dest_schemes=[], + operation_src='third_party_copy_read', + operation_dest='third_party_copy_write', + domain='wan', + protocol_factory=self.protocol_factory, + session=session + ) + self.definition_by_request_id[rws.request_id] = definition + return definition + + +class SourceRankingStrategy: + filter_only: bool = False + + class _Skip: + pass + + SKIP_SOURCE = _Skip() + + def __init__(self): + super().__init__() + self.rws = None + self.sources = None + + def prepare_context(self, rws: RequestWithSources, sources: "Sequence[RequestSource]", *, logger: "LoggerFunction" = logging.log, session: "Session"): + self.rws = rws + self.sources = sources + + def apply(self, source: RequestSource): + pass + + +class SourceFilterStrategy(SourceRankingStrategy): + filter_only = True + + +class EnforceSourceRSEExpression(SourceFilterStrategy): + + def __init__(self): + super().__init__() + self.allowed_source_rses = None + + def prepare_context(self, rws: RequestWithSources, sources: "Sequence[RequestSource]", *, logger: "LoggerFunction" = logging.log, session: "Session"): + + # parse source expression + self.allowed_source_rses = None + source_replica_expression = rws.attributes.get('source_replica_expression', None) + if source_replica_expression: + try: + parsed_rses = parse_expression(source_replica_expression, session=session) + except InvalidRSEExpression as error: + logger(logging.ERROR, "%s: Invalid RSE exception %s: %s", rws.request_id, source_replica_expression, str(error)) + self.allowed_source_rses = set() + else: + self.allowed_source_rses = {x['id'] for x in parsed_rses} + + def apply(self, source: RequestSource): + if self.allowed_source_rses is not None and source.rse.id not in self.allowed_source_rses: + return SourceRankingStrategy.SKIP_SOURCE + + +class SkipRestrictedRSEs(SourceFilterStrategy): + + def __init__(self, admin_accounts): + super().__init__() + self.admin_accounts = admin_accounts if admin_accounts is not None else [] + + def apply(self, source: RequestSource): + if source.rse.attributes.get('restricted_read') and self.rws.account not in self.admin_accounts: + return SourceRankingStrategy.SKIP_SOURCE + - yield from sorted(candidate_paths, key=__transfer_order_key) +class SkipBlocklistedRSEs(SourceFilterStrategy): + + def __init__(self, topology): + super().__init__() + self.topology = topology + + def apply(self, source: RequestSource): + # Ignore blocklisted RSEs + if not source.rse.columns['availability_read'] and not self.topology.ignore_availability: + return SourceRankingStrategy.SKIP_SOURCE + + +class EnforceStagingBuffer(SourceFilterStrategy): + def apply(self, source: RequestSource): + # For staging requests, the staging_buffer attribute must be correctly set + if self.rws.request_type == RequestType.STAGEIN and source.rse.attributes.get('staging_buffer') != self.rws.dest_rse.name: + return SourceRankingStrategy.SKIP_SOURCE + + +class RestrictTapeSources(SourceFilterStrategy): + def apply(self, source: RequestSource): + # Ignore tape sources if they are not desired + if source.rse.is_tape_or_staging_required() and not self.rws.attributes.get("allow_tape_source", True): + return SourceRankingStrategy.SKIP_SOURCE + + +class HighestAdjustedRankingFirst(SourceRankingStrategy): + def apply(self, source: RequestSource): + source_ranking_penalty = 1 if source.rse.is_tape_or_staging_required() else 0 + return - source.ranking + source_ranking_penalty + + +class PreferDiskOverTape(SourceRankingStrategy): + def apply(self, source: RequestSource): + return source.rse.is_tape_or_staging_required() # rely on the fact that False < True + + +class PathDistance(SourceRankingStrategy): + + def __init__(self, transfer_definition_builder: TransferDefinitionBuilder): + super().__init__() + self.transfer_definition_builder = transfer_definition_builder + self.paths_by_source_rse = {} + + def prepare_context(self, rws: RequestWithSources, sources: "Sequence[RequestSource]", *, logger: "LoggerFunction" = logging.log, session: "Session"): + self.paths_by_source_rse = self.transfer_definition_builder.build(rws, sources, logger=logger, session=session) + + def apply(self, source: RequestSource): + path = self.paths_by_source_rse.get(source.rse) + if path is None: + return SourceRankingStrategy.SKIP_SOURCE + return path[0].src.distance + + +class PreferSingleHop(PathDistance): + def apply(self, source: RequestSource): + path = self.paths_by_source_rse.get(source.rse) + if path is None: + return SourceRankingStrategy.SKIP_SOURCE + return len(path) > 1 # rely on the fact that False < True + + +class SkipSchemeMissmatch(PathDistance): + filter_only = True + + def apply(self, source: RequestSource): + path = self.paths_by_source_rse.get(source.rse) + if path is not None and not path: + return SourceRankingStrategy.SKIP_SOURCE + + +class SkipIntermediateTape(PathDistance): + filter_only = True + + def apply(self, source: RequestSource): + # Discard multihop transfers which contain a tape source as an intermediate hop + path = self.paths_by_source_rse.get(source.rse) + if any(transfer.src.rse.is_tape_or_staging_required() for transfer in path[1:]): + return SourceRankingStrategy.SKIP_SOURCE @transactional_session @@ -794,11 +962,27 @@ def build_transfer_paths( Each path is a list of hops. Each hop is a transfer definition. """ - if schemes is None: - schemes = [] - - if failover_schemes is None: - failover_schemes = [] + transfer_definition_builder = TransferDefinitionBuilder( + topology=topology, + schemes=schemes, + failover_schemes=failover_schemes, + protocol_factory=protocol_factory, + max_sources=max_sources, + preparer_mode=preparer_mode, + ) + strategies = [ + EnforceSourceRSEExpression(), + SkipBlocklistedRSEs(topology=topology), + SkipRestrictedRSEs(admin_accounts=admin_accounts), + EnforceStagingBuffer(), + RestrictTapeSources(), + SkipSchemeMissmatch(transfer_definition_builder=transfer_definition_builder), + SkipIntermediateTape(transfer_definition_builder=transfer_definition_builder), + HighestAdjustedRankingFirst(), + PreferDiskOverTape(), + PathDistance(transfer_definition_builder=transfer_definition_builder), + PreferSingleHop(transfer_definition_builder=transfer_definition_builder), + ] if admin_accounts is None: admin_accounts = set() @@ -817,10 +1001,6 @@ def build_transfer_paths( for source in all_sources: source.rse.ensure_loaded(load_name=True, load_info=True, load_attributes=True, load_columns=True, session=session) - transfer_schemes = schemes - if rws.previous_attempt_id and failover_schemes: - transfer_schemes = failover_schemes - # Assume request doesn't have any sources. Will be removed later if sources are found. reqs_no_source.add(rws.request_id) if not all_sources: @@ -849,122 +1029,50 @@ def build_transfer_paths( reqs_no_source.remove(rws.request_id) continue - # parse source expression - source_replica_expression = rws.attributes.get('source_replica_expression', None) - allowed_source_rses = None - if source_replica_expression: - try: - parsed_rses = parse_expression(source_replica_expression, session=session) - except InvalidRSEExpression as error: - logger(logging.ERROR, "%s: Invalid RSE exception %s: %s", rws.request_id, source_replica_expression, str(error)) - continue - else: - allowed_source_rses = [x['id'] for x in parsed_rses] - - filtered_sources = all_sources - # Only keep allowed sources - if allowed_source_rses is not None: - filtered_sources = filter(lambda s: s.rse.id in allowed_source_rses, filtered_sources) - filtered_sources = filter(lambda s: s.rse.name is not None, filtered_sources) - if rws.account not in admin_accounts: - filtered_sources = filter(lambda s: not s.rse.attributes.get('restricted_read'), filtered_sources) - # Ignore blocklisted RSEs - if not topology.ignore_availability: - filtered_sources = filter(lambda s: s.rse.columns['availability_read'], filtered_sources) - # For staging requests, the staging_buffer attribute must be correctly set - if rws.request_type == RequestType.STAGEIN: - filtered_sources = filter(lambda s: s.rse.attributes.get('staging_buffer') == rws.dest_rse.name, filtered_sources) - # Ignore tape sources if they are not desired - filtered_sources = list(filtered_sources) - had_tape_sources = len(filtered_sources) > 0 - if not rws.attributes.get("allow_tape_source", True): - filtered_sources = filter(lambda s: not s.rse.is_tape_or_staging_required(), filtered_sources) - - filtered_sources = list(filtered_sources) - filtered_rses_log = '' - if len(all_sources) != len(filtered_sources): - filtered_rses = list(set(s.rse.name for s in all_sources).difference(s.rse.name for s in filtered_sources)) - filtered_rses_log = '; %d dropped by filter: ' % (len(all_sources) - len(filtered_sources)) - filtered_rses_log += ','.join(filtered_rses[:num_sources_in_logs]) - if len(filtered_rses) > num_sources_in_logs: - filtered_rses_log += '... and %d others' % (len(filtered_rses) - num_sources_in_logs) - candidate_paths = [] - - candidate_sources = filtered_sources - if requested_source_only and rws.requested_source: - candidate_sources = [rws.requested_source] if rws.requested_source in filtered_sources else [] - - if rws.request_type == RequestType.STAGEIN: - paths = __create_stagein_definitions(rws=rws, - sources=candidate_sources, - limit_dest_schemes=transfer_schemes, - operation_src='read', - operation_dest='write', - protocol_factory=protocol_factory) - else: - paths = __create_transfer_definitions(topology=topology, - rws=rws, - sources=candidate_sources, - max_sources=max_sources, - multi_source_sources=[] if preparer_mode else filtered_sources, - limit_dest_schemes=[], - operation_src='third_party_copy_read', - operation_dest='third_party_copy_write', - domain='wan', - protocol_factory=protocol_factory, - session=session) - - sources_without_path = [] - any_source_had_scheme_mismatch = False - for source in candidate_sources: - transfer_path = paths.get(source.rse.id) - if transfer_path is None: - logger(logging.WARNING, "%s: no path from %s to %s", rws.request_id, source.rse, rws.dest_rse) - sources_without_path.append(source.rse.name) - continue - if not transfer_path: - any_source_had_scheme_mismatch = True - logger(logging.WARNING, "%s: no matching protocol between %s and %s", rws.request_id, source.rse, rws.dest_rse) - sources_without_path.append(source.rse.name) - continue - - if len(transfer_path) > 1: - logger(logging.DEBUG, '%s: From %s to %s requires multihop: %s', rws.request_id, source.rse, rws.dest_rse, transfer_path_str(transfer_path)) - - candidate_paths.append(transfer_path) - - if len(candidate_sources) != len(candidate_paths): - logger(logging.DEBUG, '%s: Sources after path computation: %s', rws.request_id, [str(path[0].src.rse) for path in candidate_paths]) - - sources_without_path_log = '' - if sources_without_path: - sources_without_path_log = '; %d dropped due to missing path: ' % len(sources_without_path) - sources_without_path_log += ','.join(sources_without_path[:num_sources_in_logs]) - if len(sources_without_path) > num_sources_in_logs: - sources_without_path_log += '... and %d others' % (len(sources_without_path) - num_sources_in_logs) - - candidate_paths = __filter_multihops_with_intermediate_tape(candidate_paths) + # For each strategy name, gives the sources which were rejected by it + rejected_sources = defaultdict(list) + # Cost of each accepted source (lists of ordered costs: one for each ranking strategy) + cost_vectors = {s: [] for s in rws.sources} + for strategy in strategies: + sources = list(cost_vectors) + strategy.prepare_context(rws, sources, logger=logger, session=session) + strategy_verdicts = map(lambda s: (s, strategy.apply(s)), sources) + for source, verdict in strategy_verdicts: + if verdict is SourceRankingStrategy.SKIP_SOURCE: + rejected_sources[strategy.__class__.__name__].append(source) + cost_vectors.pop(source) + elif not strategy.filter_only: + cost_vectors[source].append(verdict) + + transfers_by_rse = transfer_definition_builder.build(rws, cost_vectors, logger=logger, session=session) + candidate_paths = [transfers_by_rse[s.rse] for s in sorted(cost_vectors, key=cost_vectors.get)] if not preparer_mode: candidate_paths = __compress_multihops(candidate_paths, all_sources) - candidate_paths = list(__sort_paths(candidate_paths)) + candidate_paths = list(candidate_paths) - ordered_sources_log = ','.join(('multihop: ' if len(path) > 1 else '') + '{}:{}:{}'.format(path[0].src.rse, path[0].src.ranking, path[0].src.distance) - for path in candidate_paths[:num_sources_in_logs]) + ordered_sources_log = ','.join( + ('multihop: ' if len(path) > 1 else '') + '{}:{}:{}'.format(path[0].src.rse, path[0].src.ranking, path[0].src.distance) + for path in candidate_paths[:num_sources_in_logs] + ) if len(candidate_paths) > num_sources_in_logs: ordered_sources_log += '... and %d others' % (len(candidate_paths) - num_sources_in_logs) - - logger(logging.INFO, '%s: %d ordered sources: %s%s%s', rws, len(candidate_paths), - ordered_sources_log, filtered_rses_log, sources_without_path_log) + filtered_rses_log = '' + for strategy_name, sources in rejected_sources.items(): + filtered_rses_log += f'; {len(sources)} dropped by strategy "{strategy_name}": ' + filtered_rses_log += ','.join(str(s.rse) for s in sources[:num_sources_in_logs]) + if len(sources) > num_sources_in_logs: + filtered_rses_log += '... and %d others' % (len(sources) - num_sources_in_logs) + logger(logging.INFO, '%s: %d ordered sources: %s%s', rws, len(candidate_paths), ordered_sources_log, filtered_rses_log) if not candidate_paths: # It can happen that some sources are skipped because they are TAPE, and others because # of scheme mismatch. However, we can only have one state in the database. I picked to # prioritize setting only_tape_source without any particular reason. - if had_tape_sources and not filtered_sources: + if RestrictTapeSources.__name__ in rejected_sources: logger(logging.DEBUG, '%s: Only tape sources found' % rws.request_id) reqs_only_tape_source.add(rws.request_id) reqs_no_source.remove(rws.request_id) - elif any_source_had_scheme_mismatch: + elif SkipSchemeMissmatch.__name__ in rejected_sources: logger(logging.DEBUG, '%s: Scheme mismatch detected' % rws.request_id) reqs_scheme_mismatch.add(rws.request_id) reqs_no_source.remove(rws.request_id)