Skip to content

Commit

Permalink
Transfers: allow configuring source ranking strategies. Closes rucio#…
Browse files Browse the repository at this point in the history
…6342

Keep it quite simple for now. Don't implement any policy-package-like
behavior. It shouldn't be complicated to allow custom strategy
injection later though. New strategies could be added at will to the
available_strategies dictionary. As long as they are not added to the
default_strategies list, they will not be used without being explicitly
activated via the configuration.

Use the class name as strategy name, but leave the path open for
overriding it with a custom name via setting `external_name`.
  • Loading branch information
Radu Carpa committed Nov 10, 2023
1 parent 427f007 commit 0e6ea51
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 21 deletions.
66 changes: 49 additions & 17 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from sqlalchemy.exc import IntegrityError

from rucio.common import constants
from rucio.common.config import config_get
from rucio.common.config import config_get, config_get_list
from rucio.common.constants import SUPPORTED_PROTOCOLS
from rucio.common.exception import (InvalidRSEExpression,
RequestNotFound, RSEProtocolNotSupported,
Expand Down Expand Up @@ -77,7 +77,6 @@
MockTransfertool.external_name: MockTransfertool,
}


class TransferDestination:
def __init__(self, rse: RseData, scheme):
self.rse = rse
Expand Down Expand Up @@ -878,6 +877,17 @@ def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[
"""
pass

class _ClassNameDescriptor(object):
"""
Automatically set the external_name of the strategy to the class name.
"""
def __get__(self, obj, objtype=None):
if objtype is not None:
return objtype.__name__
return type(obj).__name__

external_name = _ClassNameDescriptor()


class SourceFilterStrategy(SourceRankingStrategy):
filter_only = True
Expand Down Expand Up @@ -1058,21 +1068,43 @@ def build_transfer_paths(
preparer_mode=preparer_mode,
requested_source_only=requested_source_only,
)
strategies = [
EnforceSourceRSEExpression(),
SkipBlocklistedRSEs(topology=topology),
SkipRestrictedRSEs(admin_accounts=admin_accounts),
EnforceStagingBuffer(),
RestrictTapeSources(),

available_strategies = {
EnforceSourceRSEExpression.external_name: lambda: EnforceSourceRSEExpression(),
SkipBlocklistedRSEs.external_name: lambda: SkipBlocklistedRSEs(topology=topology),
SkipRestrictedRSEs.external_name: lambda: SkipRestrictedRSEs(admin_accounts=admin_accounts),
EnforceStagingBuffer.external_name: lambda: EnforceStagingBuffer(),
RestrictTapeSources.external_name: lambda: RestrictTapeSources(),
SkipSchemeMissmatch.external_name: lambda: SkipSchemeMissmatch(transfer_path_builder=transfer_path_builder),
SkipIntermediateTape.external_name: lambda: SkipIntermediateTape(transfer_path_builder=transfer_path_builder),
HighestAdjustedRankingFirst.external_name: lambda: HighestAdjustedRankingFirst(),
PreferDiskOverTape.external_name: lambda: PreferDiskOverTape(),
PathDistance.external_name: lambda: PathDistance(transfer_path_builder=transfer_path_builder),
PreferSingleHop.external_name: lambda: PreferSingleHop(transfer_path_builder=transfer_path_builder),
}

default_strategies = [
EnforceSourceRSEExpression.external_name,
SkipBlocklistedRSEs.external_name,
SkipRestrictedRSEs.external_name,
EnforceStagingBuffer.external_name,
RestrictTapeSources.external_name,
# Without the SkipSchemeMissmatch strategy, requests will never be transitioned to the
# RequestState.MISMATCH_SCHEME state. It _MUST_ be placed before the other Path-based strategies.
SkipSchemeMissmatch(transfer_path_builder=transfer_path_builder),
SkipIntermediateTape(transfer_path_builder=transfer_path_builder),
HighestAdjustedRankingFirst(),
PreferDiskOverTape(),
PathDistance(transfer_path_builder=transfer_path_builder),
PreferSingleHop(transfer_path_builder=transfer_path_builder),
SkipSchemeMissmatch.external_name,
SkipIntermediateTape.external_name,
HighestAdjustedRankingFirst.external_name,
PreferDiskOverTape.external_name,
PathDistance.external_name,
PreferSingleHop.external_name,
]
strategy_names = config_get_list('transfers', 'source_ranking_strategies', default=default_strategies)

try:
strategies = list(available_strategies[name]() for name in strategy_names)
except KeyError:
logger(logging.ERROR, "One of the configured source_ranking_strategies doesn't exist %s", strategy_names, exc_info=True)
raise

if admin_accounts is None:
admin_accounts = set()
Expand Down Expand Up @@ -1132,7 +1164,7 @@ def build_transfer_paths(
for source in sources:
verdict = rws_strategy.apply(source)
if verdict is SKIP_SOURCE:
rejected_sources[strategy.__class__.__name__].append(source)
rejected_sources[strategy.external_name].append(source)
cost_vectors.pop(source)
elif not strategy.filter_only:
cost_vectors[source].append(verdict)
Expand Down Expand Up @@ -1161,11 +1193,11 @@ def build_transfer_paths(
# It can happen that some sources are skipped because they are TAPE, and others because
# of scheme mismatch. However, we can only have one state in the database. I picked to
# prioritize setting only_tape_source without any particular reason.
if RestrictTapeSources.__name__ in rejected_sources:
if RestrictTapeSources.external_name in rejected_sources:
logger(logging.DEBUG, '%s: Only tape sources found' % rws.request_id)
reqs_only_tape_source.add(rws.request_id)
reqs_no_source.remove(rws.request_id)
elif SkipSchemeMissmatch.__name__ in rejected_sources:
elif SkipSchemeMissmatch.external_name in rejected_sources:
logger(logging.DEBUG, '%s: Scheme mismatch detected' % rws.request_id)
reqs_scheme_mismatch.add(rws.request_id)
reqs_no_source.remove(rws.request_id)
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ def file_config_mock(request):
if not config_has_section(section):
config_add_section(section)
config_set(section, option, value)
yield
yield parser


@pytest.fixture
Expand Down
11 changes: 9 additions & 2 deletions tests/test_conveyor_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ def __setup_test():


@pytest.mark.noparallel(reason="multiple submitters cannot be run in parallel due to partial job assignment by hash")
def test_scheme_missmatch(rse_factory, did_factory, root_account):
@pytest.mark.parametrize("file_config_mock", [
{"overrides": [('transfers', 'source_ranking_strategies', 'SkipSchemeMissmatch,PathDistance')]},
{"overrides": [('transfers', 'source_ranking_strategies', 'PathDistance')]}
], indirect=True)
def test_scheme_missmatch(rse_factory, did_factory, root_account, file_config_mock):
"""
Ensure that the requests are marked MISSMATCH_SCHEME when there is a path, but with wrong schemes.
"""
Expand All @@ -298,7 +302,10 @@ def test_scheme_missmatch(rse_factory, did_factory, root_account):
submitter(once=True, rses=[{'id': rse_id} for rse_id in (src_rse_id, dst_rse_id)], partition_wait_time=None, transfertools=['mock'], transfertype='single')

request = request_core.get_request_by_did(rse_id=dst_rse_id, **did)
assert request['state'] == RequestState.MISMATCH_SCHEME
if 'SkipSchemeMissmatch' in file_config_mock.get('transfers', 'source_ranking_strategies'):
assert request['state'] == RequestState.MISMATCH_SCHEME
else:
assert request['state'] == RequestState.NO_SOURCES


@pytest.mark.noparallel(groups=[NoParallelGroups.SUBMITTER])
Expand Down
34 changes: 33 additions & 1 deletion tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_get_hops(rse_factory):
assert hop4['dest_rse'].id == rse6_id


def test_disk_vs_tape_priority(rse_factory, root_account, mock_scope):
def test_disk_vs_tape_priority(rse_factory, root_account, mock_scope, file_config_mock):
tape1_rse_name, tape1_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.TAPE)
tape2_rse_name, tape2_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.TAPE)
disk1_rse_name, disk1_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.DISK)
Expand Down Expand Up @@ -215,6 +215,38 @@ def test_disk_vs_tape_priority(rse_factory, root_account, mock_scope):
assert transfer[0].legacy_sources[0][0] == tape1_rse_name


@pytest.mark.parametrize("file_config_mock", [
{"overrides": [('transfers', 'source_ranking_strategies', 'PathDistance')]},
{"overrides": [('transfers', 'source_ranking_strategies', 'PreferDiskOverTape,PathDistance')]}
], indirect=True)
def test_disk_vs_tape_with_custom_strategy(rse_factory, root_account, mock_scope, file_config_mock):
"""
Disk RSEs are preferred over tape only if the PreferDiskOverTape strategy is set.
"""
disk_rse_name, disk_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.DISK)
tape_rse_name, tape_rse_id = rse_factory.make_posix_rse(rse_type=RSEType.TAPE)
dst_rse_name, dst_rse_id = rse_factory.make_posix_rse()
all_rses = [tape_rse_id, disk_rse_id, dst_rse_id]
add_distance(disk_rse_id, dst_rse_id, distance=20)
add_distance(tape_rse_id, dst_rse_id, distance=10)

file = {'scope': mock_scope, 'name': 'lfn.' + generate_uuid(), 'type': 'FILE', 'bytes': 1, 'adler32': 'beefdead'}
did = {'scope': file['scope'], 'name': file['name']}
for rse_id in [tape_rse_id, disk_rse_id]:
add_replicas(rse_id=rse_id, files=[file], account=root_account)

rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse_name, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None)
topology = Topology().configure_multihop()
requests = list_and_mark_transfer_requests_and_source_replicas(rse_collection=topology, rses=all_rses)

[[_, [transfer]]] = pick_and_prepare_submission_path(topology=topology, protocol_factory=ProtocolFactory(),
requests_with_sources=requests).items()
if 'PreferDiskOverTape' in file_config_mock.get('transfers', 'source_ranking_strategies'):
assert transfer[0].src.rse.name == disk_rse_name
else:
assert transfer[0].src.rse.name == tape_rse_name


@pytest.mark.parametrize("caches_mock", [{"caches_to_mock": [
'rucio.core.rse_expression_parser.REGION', # The list of multihop RSEs is retrieved by an expression
]}], indirect=True)
Expand Down

0 comments on commit 0e6ea51

Please sign in to comment.