Skip to content

Commit

Permalink
Transfers: introduce source selection strategies. Closes rucio#5776
Browse files Browse the repository at this point in the history
Try to be more generic in the source filtering and selection code.
Introduce a SourceRankingStrategy class, which has an `apply` method
to actually apply the strategy to a source and decide what to do with
a source. Some strategies are filter-only: i.e. they only decide if
a source has to be ignored or not. Others are also ranking strategies:
they can return an integer which defines the cost of the source
according to this strategy. The order of ranking strategies is
important: it defines the order in which the costs will be compared to
sort the sources.

This change will definitely introduce some computational and memory
overhead, but I hope it will be low. Instead of hard-coded `filter()`
calls which are done once per request, now we have multiple object
traversals and function calls for each source of each request.
  • Loading branch information
Radu Carpa committed Nov 9, 2023
1 parent 4380d02 commit f93b264
Show file tree
Hide file tree
Showing 3 changed files with 349 additions and 174 deletions.
22 changes: 13 additions & 9 deletions lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@


class RequestSource:
def __init__(self, rse_data, ranking=None, distance=None, file_path=None, scheme=None, url=None):
self.rse = rse_data
def __init__(self, rse: RseData, ranking=None, distance=None, file_path=None, scheme=None, url=None):
self.rse = rse
self.distance = distance if distance is not None else 9999
self.ranking = ranking if ranking is not None else 0
self.file_path = file_path
Expand All @@ -82,7 +82,7 @@ def __init__(
activity: str,
attributes: Optional[Union[str, dict[str, Any]]],
previous_attempt_id: Optional[str],
dest_rse_data: RseData,
dest_rse: RseData,
account: InternalAccount,
retry_count: int,
priority: int,
Expand All @@ -102,7 +102,7 @@ def __init__(
self._dict_attributes = None
self._db_attributes = attributes
self.previous_attempt_id = previous_attempt_id
self.dest_rse = dest_rse_data
self.dest_rse = dest_rse
self.account = account
self.retry_count = retry_count or 0
self.priority = priority if priority is not None else 3
Expand All @@ -118,11 +118,15 @@ def __str__(self):
@property
def attributes(self):
if self._dict_attributes is None:
self.attributes = self._db_attributes
self._dict_attributes = self._parse_db_attributes(self._db_attributes)
return self._dict_attributes

@attributes.setter
def attributes(self, db_attributes):
self._dict_attributes = self._parse_db_attributes(db_attributes)

@staticmethod
def _parse_db_attributes(db_attributes):
attr = {}
if db_attributes:
if isinstance(db_attributes, dict):
Expand All @@ -134,7 +138,7 @@ def attributes(self, db_attributes):
attr['allow_tape_source'] = attr["allow_tape_source"] if (attr and "allow_tape_source" in attr) else True
attr['dsn'] = attr["ds_name"] if (attr and "ds_name" in attr) else None
attr['lifetime'] = attr.get('lifetime', -1)
self._dict_attributes = attr
return attr


def should_retry_request(req, retry_protocol_mismatches):
Expand Down Expand Up @@ -546,19 +550,19 @@ def list_and_mark_transfer_requests_and_source_replicas(
if not request:
request = RequestWithSources(id_=request_id, request_type=req_type, rule_id=rule_id, scope=scope, name=name,
md5=md5, adler32=adler32, byte_count=byte_count, activity=activity, attributes=attributes,
previous_attempt_id=previous_attempt_id, dest_rse_data=rse_collection[dest_rse_id],
previous_attempt_id=previous_attempt_id, dest_rse=rse_collection[dest_rse_id],
account=account, retry_count=retry_count, priority=priority, transfertool=transfertool,
requested_at=requested_at)
requests_by_id[request_id] = request
# if STAGEIN and destination RSE is QoS make sure the source is included
if request.request_type == RequestType.STAGEIN and get_rse_attribute(rse_id=dest_rse_id, key='staging_required', session=session):
source = RequestSource(rse_data=rse_collection[dest_rse_id])
source = RequestSource(rse=rse_collection[dest_rse_id])
request.sources.append(source)

if replica_rse_id is not None:
replica_rse = rse_collection[replica_rse_id]
replica_rse.name = replica_rse_name
source = RequestSource(rse_data=replica_rse, file_path=file_path,
source = RequestSource(rse=replica_rse, file_path=file_path,
ranking=source_ranking, distance=distance, url=source_url)
request.sources.append(source)
if source_rse_id == replica_rse_id:
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/core/rse.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def attributes(self) -> dict[str, Any]:
return self._attributes

@property
def info(self) -> dict[str, Any]:
def info(self) -> types.RSESettingsDict:
if self._info is None:
raise ValueError(f'info not loaded for rse {self}')
return self._info
Expand Down
Loading

0 comments on commit f93b264

Please sign in to comment.