diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index e6584f7b8c6..fc198523e95 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -56,8 +56,8 @@ class RequestSource: - def __init__(self, rse_data, ranking=None, distance=None, file_path=None, scheme=None, url=None): - self.rse = rse_data + def __init__(self, rse: RseData, ranking=None, distance=None, file_path=None, scheme=None, url=None): + self.rse = rse self.distance = distance if distance is not None else 9999 self.ranking = ranking if ranking is not None else 0 self.file_path = file_path @@ -558,7 +558,7 @@ def list_and_mark_transfer_requests_and_source_replicas( if replica_rse_id is not None: replica_rse = rse_collection[replica_rse_id] replica_rse.name = replica_rse_name - source = RequestSource(rse_data=replica_rse, file_path=file_path, + source = RequestSource(rse=replica_rse, file_path=file_path, ranking=source_ranking, distance=distance, url=source_url) request.sources.append(source) if source_rse_id == replica_rse_id: diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 2ae95b89884..b88fcc73b81 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -16,9 +16,11 @@ import datetime import logging import re +import sys import time import traceback -from typing import TYPE_CHECKING +from collections import defaultdict +from typing import TYPE_CHECKING, cast from dogpile.cache import make_region from dogpile.cache.api import NoValue @@ -540,14 +542,13 @@ def touch_transfer(external_host, transfer_id, *, session: "Session"): raise RucioException(error.args) -@read_session -def __create_transfer_definitions( +def _create_transfer_definitions( topology: "Topology", protocol_factory: ProtocolFactory, rws: RequestWithSources, - sources: "list[RequestSource]", + sources: "Iterable[RequestSource]", max_sources: int, - multi_source_sources: "list[RequestSource]", + multi_source_sources: "Iterable[RequestSource]", limit_dest_schemes: list[str], operation_src: str, operation_dest: str, @@ -621,7 +622,7 @@ def __create_transfer_definitions( ) transfer_path.append(hop_definition) - transfers_by_source[source.rse.id] = transfer_path + transfers_by_source[source.rse] = transfer_path # create multi-source transfers: add additional sources if possible for transfer_path in transfers_by_source.values(): @@ -671,19 +672,19 @@ def __create_transfer_definitions( return transfers_by_source -def __create_stagein_definitions( +def _create_stagein_definitions( rws: RequestWithSources, - sources: "list[RequestSource]", + sources: "Iterable[RequestSource]", limit_dest_schemes: list[str], operation_src: str, operation_dest: str, protocol_factory: ProtocolFactory, -) -> "dict[str, list[StageinTransferDefinition]]": +) -> "dict[RseData, list[StageinTransferDefinition]]": """ for each source, create a single-hop transfer path with a one stageing definition inside """ transfers_by_source = { - source.rse.id: [ + source.rse: [ StageinTransferDefinition( source=RequestSource( rse_data=source.rse, @@ -717,15 +718,6 @@ def get_dsn(scope, name, dsn): return 'other' -def __filter_multihops_with_intermediate_tape(candidate_paths: "Iterable[list[DirectTransferDefinition]]") -> "Generator[list[DirectTransferDefinition]]": - # Discard multihop transfers which contain a tape source as an intermediate hop - for path in candidate_paths: - if any(transfer.src.rse.is_tape_or_staging_required() for transfer in path[1:]): - pass - else: - yield path - - def __compress_multihops( candidate_paths: "Iterable[list[DirectTransferDefinition]]", sources: "Iterable[RequestSource]", @@ -748,24 +740,237 @@ def __compress_multihops( yield path -def __sort_paths(candidate_paths: "Iterable[list[DirectTransferDefinition]]") -> "Generator[list[DirectTransferDefinition]]": - - def __transfer_order_key(transfer_path): - # Reduce the priority of the tape sources. If there are any disk sources, - # they must fail twice (1 penalty + 1 disk preferred over tape) before a tape will even be tried - source_ranking_penalty = 1 if transfer_path[0].src.rse.is_tape_or_staging_required() else 0 - # higher source_ranking first, - # on equal source_ranking, prefer DISK over TAPE - # on equal type, prefer lower distance - # on equal distance, prefer single hop - return ( - - transfer_path[0].src.ranking + source_ranking_penalty, - transfer_path[0].src.rse.is_tape_or_staging_required(), # rely on the fact that False < True - transfer_path[0].src.distance, - len(transfer_path) > 1, # rely on the fact that False < True - ) +class TransferPathBuilder: + def __init__( + self, + topology: "Topology", + protocol_factory: ProtocolFactory, + max_sources: int, + preparer_mode: bool = False, + schemes: "Optional[list[str]]" = None, + failover_schemes: "Optional[list[str]]" = None, + ): + self.failover_schemes = failover_schemes if failover_schemes is not None else [] + self.schemes = schemes if schemes is not None else [] + self.topology = topology + self.preparer_mode = preparer_mode + self.protocol_factory = protocol_factory + self.max_sources = max_sources + + self.definition_by_request_id = {} + + def build_or_return_cached( + self, + rws: RequestWithSources, + sources: "Iterable[RequestSource]", + *, + logger: "LoggerFunction" = logging.log, + session: "Session" + ) -> dict[RseData, list[DirectTransferDefinition]]: + """ + Warning: The function currently caches the result for the given request and returns it for later calls + with the same request id. As a result: it can return more (or less) sources than what is provided in the + `sources` argument. This is done for performance reasons. As of time of writing, this behavior is not problematic + for the callers of this method. + """ + definition = self.definition_by_request_id.get(rws.request_id) + if definition: + return definition + + transfer_schemes = self.schemes + if rws.previous_attempt_id and self.failover_schemes: + transfer_schemes = self.failover_schemes + + if rws.request_type == RequestType.STAGEIN: + definition = _create_stagein_definitions( + rws=rws, + sources=sources, + limit_dest_schemes=transfer_schemes, + operation_src='read', + operation_dest='write', + protocol_factory=self.protocol_factory + ) + else: + definition = _create_transfer_definitions( + topology=self.topology, + rws=rws, + sources=sources, + max_sources=self.max_sources, + multi_source_sources=[] if self.preparer_mode else sources, + limit_dest_schemes=[], + operation_src='third_party_copy_read', + operation_dest='third_party_copy_write', + domain='wan', + protocol_factory=self.protocol_factory, + session=session + ) + self.definition_by_request_id[rws.request_id] = definition + return definition + + +class _SkipSource: + pass + + +SKIP_SOURCE = _SkipSource() + + +class RequestRankingContext: + + def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources"): + self.strategy = strategy + self.rws = rws + + def apply(self, source: RequestSource) -> "int | _SkipSource": + verdict = self.strategy.apply(self, source) + if verdict is None: + verdict = sys.maxsize + return verdict + + +class SourceRankingStrategy: + filter_only: bool = False + + def for_request( + self, + rws: RequestWithSources, + sources: "Iterable[RequestSource]", + *, + logger: "LoggerFunction" = logging.log, + session: "Session" + ) -> "RequestRankingContext": + return RequestRankingContext(self, rws) + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + pass + + +class SourceFilterStrategy(SourceRankingStrategy): + filter_only = True + + +class EnforceSourceRSEExpression(SourceFilterStrategy): + + class _RankingContext(RequestRankingContext): + def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources", allowed_source_rses: "Optional[set[str]]"): + super().__init__(strategy, rws) + self.allowed_source_rses = allowed_source_rses + + def for_request(self, rws: RequestWithSources, sources: "Iterable[RequestSource]", *, logger: "LoggerFunction" = logging.log, session: "Session"): + # parse source expression + allowed_source_rses = None + source_replica_expression = rws.attributes.get('source_replica_expression', None) + if source_replica_expression: + try: + parsed_rses = parse_expression(source_replica_expression, session=session) + except InvalidRSEExpression as error: + logger(logging.ERROR, "%s: Invalid RSE exception %s: %s", rws.request_id, source_replica_expression, str(error)) + allowed_source_rses = set() + else: + allowed_source_rses = {x['id'] for x in parsed_rses} + return self._RankingContext(self, rws, allowed_source_rses) + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + ctx = cast(self.__class__._RankingContext, ctx) + if ctx.allowed_source_rses is not None and source.rse.id not in ctx.allowed_source_rses: + return SKIP_SOURCE + + +class SkipRestrictedRSEs(SourceFilterStrategy): + + def __init__(self, admin_accounts: "Optional[set[InternalAccount]]" = None): + super().__init__() + self.admin_accounts = admin_accounts if admin_accounts is not None else [] + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + if source.rse.attributes.get('restricted_read') and ctx.rws.account not in self.admin_accounts: + return SKIP_SOURCE - yield from sorted(candidate_paths, key=__transfer_order_key) + +class SkipBlocklistedRSEs(SourceFilterStrategy): + + def __init__(self, topology: "Topology"): + super().__init__() + self.topology = topology + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + # Ignore blocklisted RSEs + if not source.rse.columns['availability_read'] and not self.topology.ignore_availability: + return SKIP_SOURCE + + +class EnforceStagingBuffer(SourceFilterStrategy): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + # For staging requests, the staging_buffer attribute must be correctly set + if ctx.rws.request_type == RequestType.STAGEIN and source.rse.attributes.get('staging_buffer') != ctx.rws.dest_rse.name: + return SKIP_SOURCE + + +class RestrictTapeSources(SourceFilterStrategy): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + # Ignore tape sources if they are not desired + if source.rse.is_tape_or_staging_required() and not ctx.rws.attributes.get("allow_tape_source", True): + return SKIP_SOURCE + + +class HighestAdjustedRankingFirst(SourceRankingStrategy): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + source_ranking_penalty = 1 if source.rse.is_tape_or_staging_required() else 0 + return - source.ranking + source_ranking_penalty + + +class PreferDiskOverTape(SourceRankingStrategy): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + return source.rse.is_tape_or_staging_required() # rely on the fact that False < True + + +class PathDistance(SourceRankingStrategy): + + class _RankingContext(RequestRankingContext): + def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources", paths_for_rws: dict[RseData, list[DirectTransferDefinition]]): + super().__init__(strategy, rws) + self.paths_for_rws = paths_for_rws + + def __init__(self, transfer_path_builder: TransferPathBuilder): + super().__init__() + self.transfer_path_builder = transfer_path_builder + + def for_request(self, rws: RequestWithSources, sources: "Iterable[RequestSource]", *, logger: "LoggerFunction" = logging.log, session: "Session"): + paths_for_rws = self.transfer_path_builder.build_or_return_cached(rws, sources, logger=logger, session=session) + return PathDistance._RankingContext(self, rws, paths_for_rws) + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + path = cast(self._RankingContext, ctx).paths_for_rws.get(source.rse) + if path is None: + return SKIP_SOURCE + return path[0].src.distance + + +class PreferSingleHop(PathDistance): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + path = cast(self._RankingContext, ctx).paths_for_rws.get(source.rse) + if path is None: + return SKIP_SOURCE + return len(path) > 1 # rely on the fact that False < True + + +class SkipSchemeMissmatch(PathDistance): + filter_only = True + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + path = cast(self._RankingContext, ctx).paths_for_rws.get(source.rse) + if path is not None and not path: + return SKIP_SOURCE + + +class SkipIntermediateTape(PathDistance): + filter_only = True + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + # Discard multihop transfers which contain a tape source as an intermediate hop + path = cast(self._RankingContext, ctx).paths_for_rws.get(source.rse) + if any(transfer.src.rse.is_tape_or_staging_required() for transfer in path[1:]): + return SKIP_SOURCE @transactional_session @@ -794,11 +999,27 @@ def build_transfer_paths( Each path is a list of hops. Each hop is a transfer definition. """ - if schemes is None: - schemes = [] - - if failover_schemes is None: - failover_schemes = [] + transfer_path_builder = TransferPathBuilder( + topology=topology, + schemes=schemes, + failover_schemes=failover_schemes, + protocol_factory=protocol_factory, + max_sources=max_sources, + preparer_mode=preparer_mode, + ) + strategies = [ + EnforceSourceRSEExpression(), + SkipBlocklistedRSEs(topology=topology), + SkipRestrictedRSEs(admin_accounts=admin_accounts), + EnforceStagingBuffer(), + RestrictTapeSources(), + SkipSchemeMissmatch(transfer_path_builder=transfer_path_builder), + SkipIntermediateTape(transfer_path_builder=transfer_path_builder), + HighestAdjustedRankingFirst(), + PreferDiskOverTape(), + PathDistance(transfer_path_builder=transfer_path_builder), + PreferSingleHop(transfer_path_builder=transfer_path_builder), + ] if admin_accounts is None: admin_accounts = set() @@ -817,10 +1038,6 @@ def build_transfer_paths( for source in all_sources: source.rse.ensure_loaded(load_name=True, load_info=True, load_attributes=True, load_columns=True, session=session) - transfer_schemes = schemes - if rws.previous_attempt_id and failover_schemes: - transfer_schemes = failover_schemes - # Assume request doesn't have any sources. Will be removed later if sources are found. reqs_no_source.add(rws.request_id) if not all_sources: @@ -849,122 +1066,50 @@ def build_transfer_paths( reqs_no_source.remove(rws.request_id) continue - # parse source expression - source_replica_expression = rws.attributes.get('source_replica_expression', None) - allowed_source_rses = None - if source_replica_expression: - try: - parsed_rses = parse_expression(source_replica_expression, session=session) - except InvalidRSEExpression as error: - logger(logging.ERROR, "%s: Invalid RSE exception %s: %s", rws.request_id, source_replica_expression, str(error)) - continue - else: - allowed_source_rses = [x['id'] for x in parsed_rses] - - filtered_sources = all_sources - # Only keep allowed sources - if allowed_source_rses is not None: - filtered_sources = filter(lambda s: s.rse.id in allowed_source_rses, filtered_sources) - filtered_sources = filter(lambda s: s.rse.name is not None, filtered_sources) - if rws.account not in admin_accounts: - filtered_sources = filter(lambda s: not s.rse.attributes.get('restricted_read'), filtered_sources) - # Ignore blocklisted RSEs - if not topology.ignore_availability: - filtered_sources = filter(lambda s: s.rse.columns['availability_read'], filtered_sources) - # For staging requests, the staging_buffer attribute must be correctly set - if rws.request_type == RequestType.STAGEIN: - filtered_sources = filter(lambda s: s.rse.attributes.get('staging_buffer') == rws.dest_rse.name, filtered_sources) - # Ignore tape sources if they are not desired - filtered_sources = list(filtered_sources) - had_tape_sources = len(filtered_sources) > 0 - if not rws.attributes.get("allow_tape_source", True): - filtered_sources = filter(lambda s: not s.rse.is_tape_or_staging_required(), filtered_sources) - - filtered_sources = list(filtered_sources) - filtered_rses_log = '' - if len(all_sources) != len(filtered_sources): - filtered_rses = list(set(s.rse.name for s in all_sources).difference(s.rse.name for s in filtered_sources)) - filtered_rses_log = '; %d dropped by filter: ' % (len(all_sources) - len(filtered_sources)) - filtered_rses_log += ','.join(filtered_rses[:num_sources_in_logs]) - if len(filtered_rses) > num_sources_in_logs: - filtered_rses_log += '... and %d others' % (len(filtered_rses) - num_sources_in_logs) - candidate_paths = [] - - candidate_sources = filtered_sources - if requested_source_only and rws.requested_source: - candidate_sources = [rws.requested_source] if rws.requested_source in filtered_sources else [] - - if rws.request_type == RequestType.STAGEIN: - paths = __create_stagein_definitions(rws=rws, - sources=candidate_sources, - limit_dest_schemes=transfer_schemes, - operation_src='read', - operation_dest='write', - protocol_factory=protocol_factory) - else: - paths = __create_transfer_definitions(topology=topology, - rws=rws, - sources=candidate_sources, - max_sources=max_sources, - multi_source_sources=[] if preparer_mode else filtered_sources, - limit_dest_schemes=[], - operation_src='third_party_copy_read', - operation_dest='third_party_copy_write', - domain='wan', - protocol_factory=protocol_factory, - session=session) - - sources_without_path = [] - any_source_had_scheme_mismatch = False - for source in candidate_sources: - transfer_path = paths.get(source.rse.id) - if transfer_path is None: - logger(logging.WARNING, "%s: no path from %s to %s", rws.request_id, source.rse, rws.dest_rse) - sources_without_path.append(source.rse.name) - continue - if not transfer_path: - any_source_had_scheme_mismatch = True - logger(logging.WARNING, "%s: no matching protocol between %s and %s", rws.request_id, source.rse, rws.dest_rse) - sources_without_path.append(source.rse.name) - continue - - if len(transfer_path) > 1: - logger(logging.DEBUG, '%s: From %s to %s requires multihop: %s', rws.request_id, source.rse, rws.dest_rse, transfer_path_str(transfer_path)) - - candidate_paths.append(transfer_path) - - if len(candidate_sources) != len(candidate_paths): - logger(logging.DEBUG, '%s: Sources after path computation: %s', rws.request_id, [str(path[0].src.rse) for path in candidate_paths]) - - sources_without_path_log = '' - if sources_without_path: - sources_without_path_log = '; %d dropped due to missing path: ' % len(sources_without_path) - sources_without_path_log += ','.join(sources_without_path[:num_sources_in_logs]) - if len(sources_without_path) > num_sources_in_logs: - sources_without_path_log += '... and %d others' % (len(sources_without_path) - num_sources_in_logs) - - candidate_paths = __filter_multihops_with_intermediate_tape(candidate_paths) + # For each strategy name, gives the sources which were rejected by it + rejected_sources = defaultdict(list) + # Cost of each accepted source (lists of ordered costs: one for each ranking strategy) + cost_vectors = {s: [] for s in rws.sources} + for strategy in strategies: + sources = list(cost_vectors) + rws_strategy = strategy.for_request(rws, sources, logger=logger, session=session) + strategy_verdicts = map(lambda s: (s, rws_strategy.apply(s)), sources) + for source, verdict in strategy_verdicts: + if verdict is SKIP_SOURCE: + rejected_sources[strategy.__class__.__name__].append(source) + cost_vectors.pop(source) + elif not strategy.filter_only: + cost_vectors[source].append(verdict) + + transfers_by_rse = transfer_path_builder.build_or_return_cached(rws, cost_vectors, logger=logger, session=session) + candidate_paths = (transfers_by_rse[s.rse] for s in sorted(cost_vectors, key=cost_vectors.get)) if not preparer_mode: candidate_paths = __compress_multihops(candidate_paths, all_sources) - candidate_paths = list(__sort_paths(candidate_paths)) + candidate_paths = list(candidate_paths) - ordered_sources_log = ','.join(('multihop: ' if len(path) > 1 else '') + '{}:{}:{}'.format(path[0].src.rse, path[0].src.ranking, path[0].src.distance) - for path in candidate_paths[:num_sources_in_logs]) + ordered_sources_log = ','.join( + ('multihop: ' if len(path) > 1 else '') + '{}:{}:{}'.format(path[0].src.rse, path[0].src.ranking, path[0].src.distance) + for path in candidate_paths[:num_sources_in_logs] + ) if len(candidate_paths) > num_sources_in_logs: ordered_sources_log += '... and %d others' % (len(candidate_paths) - num_sources_in_logs) - - logger(logging.INFO, '%s: %d ordered sources: %s%s%s', rws, len(candidate_paths), - ordered_sources_log, filtered_rses_log, sources_without_path_log) + filtered_rses_log = '' + for strategy_name, sources in rejected_sources.items(): + filtered_rses_log += f'; {len(sources)} dropped by strategy "{strategy_name}": ' + filtered_rses_log += ','.join(str(s.rse) for s in sources[:num_sources_in_logs]) + if len(sources) > num_sources_in_logs: + filtered_rses_log += '... and %d others' % (len(sources) - num_sources_in_logs) + logger(logging.INFO, '%s: %d ordered sources: %s%s', rws, len(candidate_paths), ordered_sources_log, filtered_rses_log) if not candidate_paths: # It can happen that some sources are skipped because they are TAPE, and others because # of scheme mismatch. However, we can only have one state in the database. I picked to # prioritize setting only_tape_source without any particular reason. - if had_tape_sources and not filtered_sources: + if RestrictTapeSources.__name__ in rejected_sources: logger(logging.DEBUG, '%s: Only tape sources found' % rws.request_id) reqs_only_tape_source.add(rws.request_id) reqs_no_source.remove(rws.request_id) - elif any_source_had_scheme_mismatch: + elif SkipSchemeMissmatch.__name__ in rejected_sources: logger(logging.DEBUG, '%s: Scheme mismatch detected' % rws.request_id) reqs_scheme_mismatch.add(rws.request_id) reqs_no_source.remove(rws.request_id)