diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index c4a9abe1b3c..43e878af3f4 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -29,7 +29,7 @@ from sqlalchemy.exc import IntegrityError from rucio.common import constants -from rucio.common.config import config_get +from rucio.common.config import config_get, config_get_list from rucio.common.constants import SUPPORTED_PROTOCOLS from rucio.common.exception import (InvalidRSEExpression, RequestNotFound, RSEProtocolNotSupported, @@ -77,7 +77,6 @@ MockTransfertool.external_name: MockTransfertool, } - class TransferDestination: def __init__(self, rse: RseData, scheme): self.rse = rse @@ -878,6 +877,17 @@ def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[ """ pass + class _ClassNameDescriptor(object): + """ + Automatically set the external_name of the strategy to the class name. + """ + def __get__(self, obj, objtype=None): + if objtype is not None: + return objtype.__name__ + return type(obj).__name__ + + external_name = _ClassNameDescriptor() + class SourceFilterStrategy(SourceRankingStrategy): filter_only = True @@ -1058,21 +1068,43 @@ def build_transfer_paths( preparer_mode=preparer_mode, requested_source_only=requested_source_only, ) - strategies = [ - EnforceSourceRSEExpression(), - SkipBlocklistedRSEs(topology=topology), - SkipRestrictedRSEs(admin_accounts=admin_accounts), - EnforceStagingBuffer(), - RestrictTapeSources(), + + available_strategies = { + EnforceSourceRSEExpression.external_name: lambda: EnforceSourceRSEExpression(), + SkipBlocklistedRSEs.external_name: lambda: SkipBlocklistedRSEs(topology=topology), + SkipRestrictedRSEs.external_name: lambda: SkipRestrictedRSEs(admin_accounts=admin_accounts), + EnforceStagingBuffer.external_name: lambda: EnforceStagingBuffer(), + RestrictTapeSources.external_name: lambda: RestrictTapeSources(), + SkipSchemeMissmatch.external_name: lambda: SkipSchemeMissmatch(transfer_path_builder=transfer_path_builder), + SkipIntermediateTape.external_name: lambda: SkipIntermediateTape(transfer_path_builder=transfer_path_builder), + HighestAdjustedRankingFirst.external_name: lambda: HighestAdjustedRankingFirst(), + PreferDiskOverTape.external_name: lambda: PreferDiskOverTape(), + PathDistance.external_name: lambda: PathDistance(transfer_path_builder=transfer_path_builder), + PreferSingleHop.external_name: lambda: PreferSingleHop(transfer_path_builder=transfer_path_builder), + } + + default_strategies = [ + EnforceSourceRSEExpression.external_name, + SkipBlocklistedRSEs.external_name, + SkipRestrictedRSEs.external_name, + EnforceStagingBuffer.external_name, + RestrictTapeSources.external_name, # Without the SkipSchemeMissmatch strategy, requests will never be transitioned to the # RequestState.MISMATCH_SCHEME state. It _MUST_ be placed before the other Path-based strategies. - SkipSchemeMissmatch(transfer_path_builder=transfer_path_builder), - SkipIntermediateTape(transfer_path_builder=transfer_path_builder), - HighestAdjustedRankingFirst(), - PreferDiskOverTape(), - PathDistance(transfer_path_builder=transfer_path_builder), - PreferSingleHop(transfer_path_builder=transfer_path_builder), + SkipSchemeMissmatch.external_name, + SkipIntermediateTape.external_name, + HighestAdjustedRankingFirst.external_name, + PreferDiskOverTape.external_name, + PathDistance.external_name, + PreferSingleHop.external_name, ] + strategy_names = config_get_list('transfers', 'source_ranking_strategies', default=default_strategies) + + try: + strategies = list(available_strategies[name]() for name in strategy_names) + except KeyError: + logger(logging.ERROR, "One of the configured source_ranking_strategies doesn't exist %s", strategy_names, exc_info=True) + raise if admin_accounts is None: admin_accounts = set() @@ -1132,7 +1164,7 @@ def build_transfer_paths( for source in sources: verdict = rws_strategy.apply(source) if verdict is SKIP_SOURCE: - rejected_sources[strategy.__class__.__name__].append(source) + rejected_sources[strategy.external_name].append(source) cost_vectors.pop(source) elif not strategy.filter_only: cost_vectors[source].append(verdict) @@ -1161,11 +1193,11 @@ def build_transfer_paths( # It can happen that some sources are skipped because they are TAPE, and others because # of scheme mismatch. However, we can only have one state in the database. I picked to # prioritize setting only_tape_source without any particular reason. - if RestrictTapeSources.__name__ in rejected_sources: + if RestrictTapeSources.external_name in rejected_sources: logger(logging.DEBUG, '%s: Only tape sources found' % rws.request_id) reqs_only_tape_source.add(rws.request_id) reqs_no_source.remove(rws.request_id) - elif SkipSchemeMissmatch.__name__ in rejected_sources: + elif SkipSchemeMissmatch.external_name in rejected_sources: logger(logging.DEBUG, '%s: Scheme mismatch detected' % rws.request_id) reqs_scheme_mismatch.add(rws.request_id) reqs_no_source.remove(rws.request_id) diff --git a/tests/conftest.py b/tests/conftest.py index a537068e443..884e7cab492 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -519,7 +519,7 @@ def file_config_mock(request): if not config_has_section(section): config_add_section(section) config_set(section, option, value) - yield + yield parser @pytest.fixture diff --git a/tests/test_conveyor_submitter.py b/tests/test_conveyor_submitter.py index e084df7ae08..03187eccdf0 100644 --- a/tests/test_conveyor_submitter.py +++ b/tests/test_conveyor_submitter.py @@ -283,7 +283,11 @@ def __setup_test(): @pytest.mark.noparallel(reason="multiple submitters cannot be run in parallel due to partial job assignment by hash") -def test_scheme_missmatch(rse_factory, did_factory, root_account): +@pytest.mark.parametrize("file_config_mock", [ + {"overrides": [('transfers', 'source_ranking_strategies', 'SkipSchemeMissmatch,PathDistance')]}, + {"overrides": [('transfers', 'source_ranking_strategies', 'PathDistance')]} +], indirect=True) +def test_scheme_missmatch(rse_factory, did_factory, root_account, file_config_mock): """ Ensure that the requests are marked MISSMATCH_SCHEME when there is a path, but with wrong schemes. """ @@ -298,7 +302,10 @@ def test_scheme_missmatch(rse_factory, did_factory, root_account): submitter(once=True, rses=[{'id': rse_id} for rse_id in (src_rse_id, dst_rse_id)], partition_wait_time=None, transfertools=['mock'], transfertype='single') request = request_core.get_request_by_did(rse_id=dst_rse_id, **did) - assert request['state'] == RequestState.MISMATCH_SCHEME + if 'SkipSchemeMissmatch' in file_config_mock.get('transfers', 'source_ranking_strategies'): + assert request['state'] == RequestState.MISMATCH_SCHEME + else: + assert request['state'] == RequestState.NO_SOURCES @pytest.mark.noparallel(groups=[NoParallelGroups.SUBMITTER]) diff --git a/tests/test_transfer.py b/tests/test_transfer.py index a1c61d89e95..c39590261d4 100644 --- a/tests/test_transfer.py +++ b/tests/test_transfer.py @@ -148,7 +148,7 @@ def test_get_hops(rse_factory): assert hop4['dest_rse'].id == rse6_id -def test_disk_vs_tape_priority(rse_factory, root_account, mock_scope): +def test_disk_vs_tape_priority(rse_factory, root_account, mock_scope, file_config_mock): tape1_rse_name, tape1_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.TAPE) tape2_rse_name, tape2_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.TAPE) disk1_rse_name, disk1_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.DISK) @@ -215,6 +215,38 @@ def test_disk_vs_tape_priority(rse_factory, root_account, mock_scope): assert transfer[0].legacy_sources[0][0] == tape1_rse_name +@pytest.mark.parametrize("file_config_mock", [ + {"overrides": [('transfers', 'source_ranking_strategies', 'PathDistance')]}, + {"overrides": [('transfers', 'source_ranking_strategies', 'PreferDiskOverTape,PathDistance')]} +], indirect=True) +def test_disk_vs_tape_with_custom_strategy(rse_factory, root_account, mock_scope, file_config_mock): + """ + Disk RSEs are preferred over tape only if the PreferDiskOverTape strategy is set. + """ + disk_rse_name, disk_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.DISK) + tape_rse_name, tape_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.TAPE) + dst_rse_name, dst_rse_id = rse_factory.make_posix_rse() + all_rses = [tape_rse_id, disk_rse_id, dst_rse_id] + add_distance(disk_rse_id, dst_rse_id, distance=20) + add_distance(tape_rse_id, dst_rse_id, distance=10) + + file = {'scope': mock_scope, 'name': 'lfn.' + generate_uuid(), 'type': 'FILE', 'bytes': 1, 'adler32': 'beefdead'} + did = {'scope': file['scope'], 'name': file['name']} + for rse_id in [tape_rse_id, disk_rse_id]: + add_replicas(rse_id=rse_id, files=[file], account=root_account) + + rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse_name, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) + topology = Topology().configure_multihop() + requests = list_and_mark_transfer_requests_and_source_replicas(rse_collection=topology, rses=all_rses) + + [[_, [transfer]]] = pick_and_prepare_submission_path(topology=topology, protocol_factory=ProtocolFactory(), + requests_with_sources=requests).items() + if 'PreferDiskOverTape' in file_config_mock.get('transfers', 'source_ranking_strategies'): + assert transfer[0].src.rse.name == disk_rse_name + else: + assert transfer[0].src.rse.name == tape_rse_name + + @pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [ 'rucio.core.rse_expression_parser.REGION', # The list of multihop RSEs is retrieved by an expression ]}], indirect=True)