diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index e6584f7b8c6..db86b6d1071 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -56,8 +56,8 @@ class RequestSource: - def __init__(self, rse_data, ranking=None, distance=None, file_path=None, scheme=None, url=None): - self.rse = rse_data + def __init__(self, rse: RseData, ranking=None, distance=None, file_path=None, scheme=None, url=None): + self.rse = rse self.distance = distance if distance is not None else 9999 self.ranking = ranking if ranking is not None else 0 self.file_path = file_path @@ -82,7 +82,7 @@ def __init__( activity: str, attributes: Optional[Union[str, dict[str, Any]]], previous_attempt_id: Optional[str], - dest_rse_data: RseData, + dest_rse: RseData, account: InternalAccount, retry_count: int, priority: int, @@ -102,7 +102,7 @@ def __init__( self._dict_attributes = None self._db_attributes = attributes self.previous_attempt_id = previous_attempt_id - self.dest_rse = dest_rse_data + self.dest_rse = dest_rse self.account = account self.retry_count = retry_count or 0 self.priority = priority if priority is not None else 3 @@ -118,11 +118,15 @@ def __str__(self): @property def attributes(self): if self._dict_attributes is None: - self.attributes = self._db_attributes + self._dict_attributes = self._parse_db_attributes(self._db_attributes) return self._dict_attributes @attributes.setter def attributes(self, db_attributes): + self._dict_attributes = self._parse_db_attributes(db_attributes) + + @staticmethod + def _parse_db_attributes(db_attributes): attr = {} if db_attributes: if isinstance(db_attributes, dict): @@ -134,7 +138,7 @@ def attributes(self, db_attributes): attr['allow_tape_source'] = attr["allow_tape_source"] if (attr and "allow_tape_source" in attr) else True attr['dsn'] = attr["ds_name"] if (attr and "ds_name" in attr) else None attr['lifetime'] = attr.get('lifetime', -1) - self._dict_attributes = attr + return attr def should_retry_request(req, retry_protocol_mismatches): @@ -546,19 +550,19 @@ def list_and_mark_transfer_requests_and_source_replicas( if not request: request = RequestWithSources(id_=request_id, request_type=req_type, rule_id=rule_id, scope=scope, name=name, md5=md5, adler32=adler32, byte_count=byte_count, activity=activity, attributes=attributes, - previous_attempt_id=previous_attempt_id, dest_rse_data=rse_collection[dest_rse_id], + previous_attempt_id=previous_attempt_id, dest_rse=rse_collection[dest_rse_id], account=account, retry_count=retry_count, priority=priority, transfertool=transfertool, requested_at=requested_at) requests_by_id[request_id] = request # if STAGEIN and destination RSE is QoS make sure the source is included if request.request_type == RequestType.STAGEIN and get_rse_attribute(rse_id=dest_rse_id, key='staging_required', session=session): - source = RequestSource(rse_data=rse_collection[dest_rse_id]) + source = RequestSource(rse=rse_collection[dest_rse_id]) request.sources.append(source) if replica_rse_id is not None: replica_rse = rse_collection[replica_rse_id] replica_rse.name = replica_rse_name - source = RequestSource(rse_data=replica_rse, file_path=file_path, + source = RequestSource(rse=replica_rse, file_path=file_path, ranking=source_ranking, distance=distance, url=source_url) request.sources.append(source) if source_rse_id == replica_rse_id: diff --git a/lib/rucio/core/rse.py b/lib/rucio/core/rse.py index d510c407c73..4b47d22f473 100644 --- a/lib/rucio/core/rse.py +++ b/lib/rucio/core/rse.py @@ -84,7 +84,7 @@ def attributes(self) -> dict[str, Any]: return self._attributes @property - def info(self) -> dict[str, Any]: + def info(self) -> types.RSESettingsDict: if self._info is None: raise ValueError(f'info not loaded for rse {self}') return self._info diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index 2ae95b89884..457af02479b 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -15,10 +15,13 @@ import datetime import logging +import operator import re +import sys import time import traceback -from typing import TYPE_CHECKING +from collections import defaultdict +from typing import TYPE_CHECKING, cast from dogpile.cache import make_region from dogpile.cache.api import NoValue @@ -47,7 +50,7 @@ from rucio.transfertool.mock import MockTransfertool if TYPE_CHECKING: - from collections.abc import Callable, Generator, Iterable + from collections.abc import Callable, Iterator, Iterable, Mapping, Sequence from typing import Any, Optional from sqlalchemy.orm import Session from rucio.common.types import InternalAccount @@ -76,8 +79,8 @@ class TransferDestination: - def __init__(self, rse_data, scheme): - self.rse = rse_data + def __init__(self, rse: RseData, scheme): + self.rse = rse self.scheme = scheme def __str__(self): @@ -91,11 +94,11 @@ class ProtocolFactory: def __init__(self): self.protocols = {} - def protocol(self, rse_data, scheme, operation): - protocol_key = '%s_%s_%s' % (operation, rse_data.id, scheme) + def protocol(self, rse: RseData, scheme: "Optional[str]", operation: str): + protocol_key = '%s_%s_%s' % (operation, rse.id, scheme) protocol = self.protocols.get(protocol_key) if not protocol: - protocol = rsemgr.create_protocol(rse_data.info, operation, scheme) + protocol = rsemgr.create_protocol(rse.info, operation, scheme) self.protocols[protocol_key] = protocol return protocol @@ -255,7 +258,15 @@ class StageinTransferDefinition(DirectTransferDefinition): - must be from TAPE to non-TAPE RSE - can only have one source """ - def __init__(self, source, destination, rws, protocol_factory, operation_src, operation_dest): + def __init__( + self, + source: RequestSource, + destination: TransferDestination, + rws: RequestWithSources, + protocol_factory: ProtocolFactory, + operation_src: str, + operation_dest: str + ): if not source.rse.is_tape() or destination.rse.is_tape(): # allow staging_required QoS RSE to be TAPE to TAPE for pin if not destination.rse.attributes.get('staging_required', None): @@ -540,21 +551,20 @@ def touch_transfer(external_host, transfer_id, *, session: "Session"): raise RucioException(error.args) -@read_session -def __create_transfer_definitions( +def _create_transfer_definitions( topology: "Topology", protocol_factory: ProtocolFactory, rws: RequestWithSources, - sources: "list[RequestSource]", + sources: "Iterable[RequestSource]", max_sources: int, - multi_source_sources: "list[RequestSource]", + multi_source_sources: "Iterable[RequestSource]", limit_dest_schemes: list[str], operation_src: str, operation_dest: str, domain: str, *, session: "Session", -) -> "dict[str, list[DirectTransferDefinition]]": +) -> "dict[RseData, list[DirectTransferDefinition]]": """ Find the all paths from sources towards the destination of the given transfer request. Create the transfer definitions for each point-to-point transfer (multi-source, when possible) @@ -572,14 +582,14 @@ def __create_transfer_definitions( hop_src_rse = hop['source_rse'] hop_dst_rse = hop['dest_rse'] src = RequestSource( - rse_data=hop_src_rse, + rse=hop_src_rse, file_path=source.file_path if hop_src_rse == source.rse else None, ranking=source.ranking if hop_src_rse == source.rse else 0, distance=hop['cumulated_distance'] if hop_src_rse == source.rse else hop['hop_distance'], scheme=hop['source_scheme'], ) dst = TransferDestination( - rse_data=hop_dst_rse, + rse=hop_dst_rse, scheme=hop['dest_scheme'], ) hop_definition = DirectTransferDefinition( @@ -611,7 +621,7 @@ def __create_transfer_definitions( 'allow_tape_source': True }, previous_attempt_id=None, - dest_rse_data=hop_dst_rse, + dest_rse=hop_dst_rse, account=rws.account, retry_count=0, priority=rws.priority, @@ -621,7 +631,7 @@ def __create_transfer_definitions( ) transfer_path.append(hop_definition) - transfers_by_source[source.rse.id] = transfer_path + transfers_by_source[source.rse] = transfer_path # create multi-source transfers: add additional sources if possible for transfer_path in transfers_by_source.values(): @@ -660,7 +670,7 @@ def __create_transfer_definitions( transfer_path[0].sources.append( RequestSource( - rse_data=source.rse, + rse=source.rse, file_path=source.file_path, ranking=source.ranking, distance=edge.cost, @@ -671,28 +681,28 @@ def __create_transfer_definitions( return transfers_by_source -def __create_stagein_definitions( +def _create_stagein_definitions( rws: RequestWithSources, - sources: "list[RequestSource]", + sources: "Iterable[RequestSource]", limit_dest_schemes: list[str], operation_src: str, operation_dest: str, protocol_factory: ProtocolFactory, -) -> "dict[str, list[StageinTransferDefinition]]": +) -> "dict[RseData, list[StageinTransferDefinition]]": """ for each source, create a single-hop transfer path with a one stageing definition inside """ transfers_by_source = { - source.rse.id: [ + source.rse: [ StageinTransferDefinition( source=RequestSource( - rse_data=source.rse, + rse=source.rse, file_path=source.file_path, url=source.url, scheme=limit_dest_schemes, ), destination=TransferDestination( - rse_data=rws.dest_rse, + rse=rws.dest_rse, scheme=limit_dest_schemes, ), operation_src=operation_src, @@ -717,19 +727,10 @@ def get_dsn(scope, name, dsn): return 'other' -def __filter_multihops_with_intermediate_tape(candidate_paths: "Iterable[list[DirectTransferDefinition]]") -> "Generator[list[DirectTransferDefinition]]": - # Discard multihop transfers which contain a tape source as an intermediate hop - for path in candidate_paths: - if any(transfer.src.rse.is_tape_or_staging_required() for transfer in path[1:]): - pass - else: - yield path - - def __compress_multihops( - candidate_paths: "Iterable[list[DirectTransferDefinition]]", + candidate_paths: "Iterable[Sequence[DirectTransferDefinition]]", sources: "Iterable[RequestSource]", -) -> "Generator[list[DirectTransferDefinition]]": +) -> "Iterator[Sequence[DirectTransferDefinition]]": # Compress multihop transfers which contain other sources as part of itself. # For example: multihop A->B->C and B is a source, compress A->B->C into B->C source_rses = {s.rse.id for s in sources} @@ -748,24 +749,251 @@ def __compress_multihops( yield path -def __sort_paths(candidate_paths: "Iterable[list[DirectTransferDefinition]]") -> "Generator[list[DirectTransferDefinition]]": - - def __transfer_order_key(transfer_path): - # Reduce the priority of the tape sources. If there are any disk sources, - # they must fail twice (1 penalty + 1 disk preferred over tape) before a tape will even be tried - source_ranking_penalty = 1 if transfer_path[0].src.rse.is_tape_or_staging_required() else 0 - # higher source_ranking first, - # on equal source_ranking, prefer DISK over TAPE - # on equal type, prefer lower distance - # on equal distance, prefer single hop - return ( - - transfer_path[0].src.ranking + source_ranking_penalty, - transfer_path[0].src.rse.is_tape_or_staging_required(), # rely on the fact that False < True - transfer_path[0].src.distance, - len(transfer_path) > 1, # rely on the fact that False < True - ) +class TransferPathBuilder: + def __init__( + self, + topology: "Topology", + protocol_factory: ProtocolFactory, + max_sources: int, + preparer_mode: bool = False, + schemes: "Optional[list[str]]" = None, + failover_schemes: "Optional[list[str]]" = None, + ): + self.failover_schemes = failover_schemes if failover_schemes is not None else [] + self.schemes = schemes if schemes is not None else [] + self.topology = topology + self.preparer_mode = preparer_mode + self.protocol_factory = protocol_factory + self.max_sources = max_sources + + self.definition_by_request_id = {} + + def build_or_return_cached( + self, + rws: RequestWithSources, + sources: "Iterable[RequestSource]", + *, + logger: "LoggerFunction" = logging.log, + session: "Session" + ) -> "Mapping[RseData, Sequence[DirectTransferDefinition]]": + """ + Warning: The function currently caches the result for the given request and returns it for later calls + with the same request id. As a result: it can return more (or less) sources than what is provided in the + `sources` argument. This is done for performance reasons. As of time of writing, this behavior is not problematic + for the callers of this method. + """ + definition = self.definition_by_request_id.get(rws.request_id) + if definition: + return definition + + transfer_schemes = self.schemes + if rws.previous_attempt_id and self.failover_schemes: + transfer_schemes = self.failover_schemes + + if rws.request_type == RequestType.STAGEIN: + definition = _create_stagein_definitions( + rws=rws, + sources=sources, + limit_dest_schemes=transfer_schemes, + operation_src='read', + operation_dest='write', + protocol_factory=self.protocol_factory + ) + else: + definition = _create_transfer_definitions( + topology=self.topology, + rws=rws, + sources=sources, + max_sources=self.max_sources, + multi_source_sources=[] if self.preparer_mode else sources, + limit_dest_schemes=[], + operation_src='third_party_copy_read', + operation_dest='third_party_copy_write', + domain='wan', + protocol_factory=self.protocol_factory, + session=session + ) + self.definition_by_request_id[rws.request_id] = definition + return definition + + +class _SkipSource: + pass + + +SKIP_SOURCE = _SkipSource() + + +class RequestRankingContext: + + def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources"): + self.strategy = strategy + self.rws = rws + + def apply(self, source: RequestSource) -> "int | _SkipSource": + verdict = self.strategy.apply(self, source) + if verdict is None: + verdict = sys.maxsize + return verdict + + +class SourceRankingStrategy: + filter_only: bool = False + + def for_request( + self, + rws: RequestWithSources, + sources: "Iterable[RequestSource]", + *, + logger: "LoggerFunction" = logging.log, + session: "Session" + ) -> "RequestRankingContext": + return RequestRankingContext(self, rws) + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + pass + + +class SourceFilterStrategy(SourceRankingStrategy): + filter_only = True + + +class EnforceSourceRSEExpression(SourceFilterStrategy): + + class _RankingContext(RequestRankingContext): + def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources", allowed_source_rses: "Optional[set[str]]"): + super().__init__(strategy, rws) + self.allowed_source_rses = allowed_source_rses + + def for_request( + self, + rws: RequestWithSources, + sources: "Iterable[RequestSource]", + *, + logger: "LoggerFunction" = logging.log, + session: "Session" + ) -> "RequestRankingContext": + # parse source expression + allowed_source_rses = None + source_replica_expression = rws.attributes.get('source_replica_expression', None) + if source_replica_expression: + try: + parsed_rses = parse_expression(source_replica_expression, session=session) + except InvalidRSEExpression as error: + logger(logging.ERROR, "%s: Invalid RSE exception %s: %s", rws.request_id, source_replica_expression, str(error)) + allowed_source_rses = set() + else: + allowed_source_rses = {x['id'] for x in parsed_rses} + return self._RankingContext(self, rws, allowed_source_rses) + + def apply(self, ctx: _RankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + ctx = cast(EnforceSourceRSEExpression._RankingContext, ctx) + if ctx.allowed_source_rses is not None and source.rse.id not in ctx.allowed_source_rses: + return SKIP_SOURCE + + +class SkipRestrictedRSEs(SourceFilterStrategy): + + def __init__(self, admin_accounts: "Optional[set[InternalAccount]]" = None): + super().__init__() + self.admin_accounts = admin_accounts if admin_accounts is not None else [] + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + if source.rse.attributes.get('restricted_read') and ctx.rws.account not in self.admin_accounts: + return SKIP_SOURCE - yield from sorted(candidate_paths, key=__transfer_order_key) + +class SkipBlocklistedRSEs(SourceFilterStrategy): + + def __init__(self, topology: "Topology"): + super().__init__() + self.topology = topology + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + # Ignore blocklisted RSEs + if not source.rse.columns['availability_read'] and not self.topology.ignore_availability: + return SKIP_SOURCE + + +class EnforceStagingBuffer(SourceFilterStrategy): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + # For staging requests, the staging_buffer attribute must be correctly set + if ctx.rws.request_type == RequestType.STAGEIN and source.rse.attributes.get('staging_buffer') != ctx.rws.dest_rse.name: + return SKIP_SOURCE + + +class RestrictTapeSources(SourceFilterStrategy): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + # Ignore tape sources if they are not desired + if source.rse.is_tape_or_staging_required() and not ctx.rws.attributes.get("allow_tape_source", True): + return SKIP_SOURCE + + +class HighestAdjustedRankingFirst(SourceRankingStrategy): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + source_ranking_penalty = 1 if source.rse.is_tape_or_staging_required() else 0 + return - source.ranking + source_ranking_penalty + + +class PreferDiskOverTape(SourceRankingStrategy): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + return source.rse.is_tape_or_staging_required() # rely on the fact that False < True + + +class PathDistance(SourceRankingStrategy): + + class _RankingContext(RequestRankingContext): + def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources", paths_for_rws: "Mapping[RseData, Sequence[DirectTransferDefinition]]"): + super().__init__(strategy, rws) + self.paths_for_rws = paths_for_rws + + def __init__(self, transfer_path_builder: TransferPathBuilder): + super().__init__() + self.transfer_path_builder = transfer_path_builder + + def for_request( + self, + rws: RequestWithSources, + sources: "Iterable[RequestSource]", + *, + logger: "LoggerFunction" = logging.log, + session: "Session" + ) -> "RequestRankingContext": + paths_for_rws = self.transfer_path_builder.build_or_return_cached(rws, sources, logger=logger, session=session) + return PathDistance._RankingContext(self, rws, paths_for_rws) + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + path = cast(PathDistance._RankingContext, ctx).paths_for_rws.get(source.rse) + if path is None: + return SKIP_SOURCE + return path[0].src.distance + + +class PreferSingleHop(PathDistance): + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + path = cast(PathDistance._RankingContext, ctx).paths_for_rws.get(source.rse) + if path is None: + return SKIP_SOURCE + return int(len(path) > 1) + + +class SkipSchemeMissmatch(PathDistance): + filter_only = True + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + path = cast(PathDistance._RankingContext, ctx).paths_for_rws.get(source.rse) + if path is not None and not path: + return SKIP_SOURCE + + +class SkipIntermediateTape(PathDistance): + filter_only = True + + def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[int | _SkipSource]": + # Discard multihop transfers which contain a tape source as an intermediate hop + path = cast(PathDistance._RankingContext, ctx).paths_for_rws.get(source.rse) + if path is None or any(transfer.src.rse.is_tape_or_staging_required() for transfer in path[1:]): + return SKIP_SOURCE @transactional_session @@ -794,11 +1022,27 @@ def build_transfer_paths( Each path is a list of hops. Each hop is a transfer definition. """ - if schemes is None: - schemes = [] - - if failover_schemes is None: - failover_schemes = [] + transfer_path_builder = TransferPathBuilder( + topology=topology, + schemes=schemes, + failover_schemes=failover_schemes, + protocol_factory=protocol_factory, + max_sources=max_sources, + preparer_mode=preparer_mode, + ) + strategies = [ + EnforceSourceRSEExpression(), + SkipBlocklistedRSEs(topology=topology), + SkipRestrictedRSEs(admin_accounts=admin_accounts), + EnforceStagingBuffer(), + RestrictTapeSources(), + SkipSchemeMissmatch(transfer_path_builder=transfer_path_builder), + SkipIntermediateTape(transfer_path_builder=transfer_path_builder), + HighestAdjustedRankingFirst(), + PreferDiskOverTape(), + PathDistance(transfer_path_builder=transfer_path_builder), + PreferSingleHop(transfer_path_builder=transfer_path_builder), + ] if admin_accounts is None: admin_accounts = set() @@ -817,10 +1061,6 @@ def build_transfer_paths( for source in all_sources: source.rse.ensure_loaded(load_name=True, load_info=True, load_attributes=True, load_columns=True, session=session) - transfer_schemes = schemes - if rws.previous_attempt_id and failover_schemes: - transfer_schemes = failover_schemes - # Assume request doesn't have any sources. Will be removed later if sources are found. reqs_no_source.add(rws.request_id) if not all_sources: @@ -849,122 +1089,53 @@ def build_transfer_paths( reqs_no_source.remove(rws.request_id) continue - # parse source expression - source_replica_expression = rws.attributes.get('source_replica_expression', None) - allowed_source_rses = None - if source_replica_expression: - try: - parsed_rses = parse_expression(source_replica_expression, session=session) - except InvalidRSEExpression as error: - logger(logging.ERROR, "%s: Invalid RSE exception %s: %s", rws.request_id, source_replica_expression, str(error)) - continue - else: - allowed_source_rses = [x['id'] for x in parsed_rses] - - filtered_sources = all_sources - # Only keep allowed sources - if allowed_source_rses is not None: - filtered_sources = filter(lambda s: s.rse.id in allowed_source_rses, filtered_sources) - filtered_sources = filter(lambda s: s.rse.name is not None, filtered_sources) - if rws.account not in admin_accounts: - filtered_sources = filter(lambda s: not s.rse.attributes.get('restricted_read'), filtered_sources) - # Ignore blocklisted RSEs - if not topology.ignore_availability: - filtered_sources = filter(lambda s: s.rse.columns['availability_read'], filtered_sources) - # For staging requests, the staging_buffer attribute must be correctly set - if rws.request_type == RequestType.STAGEIN: - filtered_sources = filter(lambda s: s.rse.attributes.get('staging_buffer') == rws.dest_rse.name, filtered_sources) - # Ignore tape sources if they are not desired - filtered_sources = list(filtered_sources) - had_tape_sources = len(filtered_sources) > 0 - if not rws.attributes.get("allow_tape_source", True): - filtered_sources = filter(lambda s: not s.rse.is_tape_or_staging_required(), filtered_sources) - - filtered_sources = list(filtered_sources) - filtered_rses_log = '' - if len(all_sources) != len(filtered_sources): - filtered_rses = list(set(s.rse.name for s in all_sources).difference(s.rse.name for s in filtered_sources)) - filtered_rses_log = '; %d dropped by filter: ' % (len(all_sources) - len(filtered_sources)) - filtered_rses_log += ','.join(filtered_rses[:num_sources_in_logs]) - if len(filtered_rses) > num_sources_in_logs: - filtered_rses_log += '... and %d others' % (len(filtered_rses) - num_sources_in_logs) - candidate_paths = [] - - candidate_sources = filtered_sources - if requested_source_only and rws.requested_source: - candidate_sources = [rws.requested_source] if rws.requested_source in filtered_sources else [] - - if rws.request_type == RequestType.STAGEIN: - paths = __create_stagein_definitions(rws=rws, - sources=candidate_sources, - limit_dest_schemes=transfer_schemes, - operation_src='read', - operation_dest='write', - protocol_factory=protocol_factory) - else: - paths = __create_transfer_definitions(topology=topology, - rws=rws, - sources=candidate_sources, - max_sources=max_sources, - multi_source_sources=[] if preparer_mode else filtered_sources, - limit_dest_schemes=[], - operation_src='third_party_copy_read', - operation_dest='third_party_copy_write', - domain='wan', - protocol_factory=protocol_factory, - session=session) - - sources_without_path = [] - any_source_had_scheme_mismatch = False - for source in candidate_sources: - transfer_path = paths.get(source.rse.id) - if transfer_path is None: - logger(logging.WARNING, "%s: no path from %s to %s", rws.request_id, source.rse, rws.dest_rse) - sources_without_path.append(source.rse.name) - continue - if not transfer_path: - any_source_had_scheme_mismatch = True - logger(logging.WARNING, "%s: no matching protocol between %s and %s", rws.request_id, source.rse, rws.dest_rse) - sources_without_path.append(source.rse.name) - continue - - if len(transfer_path) > 1: - logger(logging.DEBUG, '%s: From %s to %s requires multihop: %s', rws.request_id, source.rse, rws.dest_rse, transfer_path_str(transfer_path)) - - candidate_paths.append(transfer_path) - - if len(candidate_sources) != len(candidate_paths): - logger(logging.DEBUG, '%s: Sources after path computation: %s', rws.request_id, [str(path[0].src.rse) for path in candidate_paths]) - - sources_without_path_log = '' - if sources_without_path: - sources_without_path_log = '; %d dropped due to missing path: ' % len(sources_without_path) - sources_without_path_log += ','.join(sources_without_path[:num_sources_in_logs]) - if len(sources_without_path) > num_sources_in_logs: - sources_without_path_log += '... and %d others' % (len(sources_without_path) - num_sources_in_logs) - - candidate_paths = __filter_multihops_with_intermediate_tape(candidate_paths) + # For each strategy name, gives the sources which were rejected by it + rejected_sources = defaultdict(list) + # Cost of each accepted source (lists of ordered costs: one for each ranking strategy) + cost_vectors = {s: [] for s in rws.sources} + for strategy in strategies: + sources = list(cost_vectors) + if not sources: + # All sources where filtered by previous strategies. It's worthless to continue. + break + rws_strategy = strategy.for_request(rws, sources, logger=logger, session=session) + for source in sources: + verdict = rws_strategy.apply(source) + if verdict is SKIP_SOURCE: + rejected_sources[strategy.__class__.__name__].append(source) + cost_vectors.pop(source) + elif not strategy.filter_only: + cost_vectors[source].append(verdict) + + transfers_by_rse = transfer_path_builder.build_or_return_cached(rws, cost_vectors, logger=logger, session=session) + candidate_paths = (transfers_by_rse[s.rse] for s, _ in sorted(cost_vectors.items(), key=operator.itemgetter(1))) if not preparer_mode: candidate_paths = __compress_multihops(candidate_paths, all_sources) - candidate_paths = list(__sort_paths(candidate_paths)) + candidate_paths = list(candidate_paths) - ordered_sources_log = ','.join(('multihop: ' if len(path) > 1 else '') + '{}:{}:{}'.format(path[0].src.rse, path[0].src.ranking, path[0].src.distance) - for path in candidate_paths[:num_sources_in_logs]) + ordered_sources_log = ','.join( + ('multihop: ' if len(path) > 1 else '') + '{}:{}:{}'.format(path[0].src.rse, path[0].src.ranking, path[0].src.distance) + for path in candidate_paths[:num_sources_in_logs] + ) if len(candidate_paths) > num_sources_in_logs: ordered_sources_log += '... and %d others' % (len(candidate_paths) - num_sources_in_logs) - - logger(logging.INFO, '%s: %d ordered sources: %s%s%s', rws, len(candidate_paths), - ordered_sources_log, filtered_rses_log, sources_without_path_log) + filtered_rses_log = '' + for strategy_name, sources in rejected_sources.items(): + filtered_rses_log += f'; {len(sources)} dropped by strategy "{strategy_name}": ' + filtered_rses_log += ','.join(str(s.rse) for s in sources[:num_sources_in_logs]) + if len(sources) > num_sources_in_logs: + filtered_rses_log += '... and %d others' % (len(sources) - num_sources_in_logs) + logger(logging.INFO, '%s: %d ordered sources: %s%s', rws, len(candidate_paths), ordered_sources_log, filtered_rses_log) if not candidate_paths: # It can happen that some sources are skipped because they are TAPE, and others because # of scheme mismatch. However, we can only have one state in the database. I picked to # prioritize setting only_tape_source without any particular reason. - if had_tape_sources and not filtered_sources: + if RestrictTapeSources.__name__ in rejected_sources: logger(logging.DEBUG, '%s: Only tape sources found' % rws.request_id) reqs_only_tape_source.add(rws.request_id) reqs_no_source.remove(rws.request_id) - elif any_source_had_scheme_mismatch: + elif SkipSchemeMissmatch.__name__ in rejected_sources: logger(logging.DEBUG, '%s: Scheme mismatch detected' % rws.request_id) reqs_scheme_mismatch.add(rws.request_id) reqs_no_source.remove(rws.request_id)