Skip to content

Commit

Permalink
Transfers: get rid of legacy_sources
Browse files Browse the repository at this point in the history
This code was inherited all the way from before the conveyor
refactoring, when we used tuples as a workaround to pass information
through the code. Now we pass directly structured objects which can
be accessed as needed.
  • Loading branch information
Radu Carpa committed Jan 10, 2024
1 parent 436ecc9 commit 2664227
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 48 deletions.
51 changes: 20 additions & 31 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def __init__(self, source: RequestSource, destination: TransferDestination, rws:
self.operation_dest = operation_dest

self._dest_url = None
self._legacy_sources = None
self._source_urls = {}

def __str__(self):
return '{sources}--{request_id}->{destination}'.format(
Expand All @@ -145,21 +145,17 @@ def dest_url(self):
self._dest_url = self._generate_dest_url(self.dst, self.rws, self.protocol_factory, self.operation_dest)
return self._dest_url

@property
def legacy_sources(self):
if not self._legacy_sources:
self._legacy_sources = [
(src.rse.name,
self._generate_source_url(src,
self.dst,
rws=self.rws,
protocol_factory=self.protocol_factory,
operation=self.operation_src),
src.rse.id,
src.ranking)
for src in self.sources
]
return self._legacy_sources
def source_url(self, source: RequestSource):
url = self._source_urls.get(source.rse)
if not url:
self._source_urls = url = self._generate_source_url(
source,
self.dst,
rws=self.rws,
protocol_factory=self.protocol_factory,
operation=self.operation_src
)
return url

@property
def use_ipv4(self):
Expand Down Expand Up @@ -297,16 +293,9 @@ def dest_url(self):
operation=self.operation_dest)
return self._dest_url

@property
def legacy_sources(self):
if not self._legacy_sources:
self._legacy_sources = [(
self.src.rse.name,
self.dest_url, # Source and dest url is the same for stagein requests
self.src.rse.id,
self.src.ranking
)]
return self._legacy_sources
def source_url(self, source: RequestSource):
# Source and dest url is the same for stagein requests
return self.dest_url


def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str:
Expand Down Expand Up @@ -353,7 +342,7 @@ def mark_submitting(
transfer.rws.scope,
transfer.rws.name,
transfer.rws.previous_attempt_id,
transfer.legacy_sources,
[transfer.source_url(s) for s in transfer.sources],
transfer.dest_url,
external_host)
logger(logging.DEBUG, "%s", log_str)
Expand Down Expand Up @@ -394,15 +383,15 @@ def ensure_db_sources(
desired_sources = []
for transfer in transfer_path:

for src_rse, src_url, src_rse_id, rank in transfer.legacy_sources:
for source in transfer.sources:
common_source_attrs = {
"scope": transfer.rws.scope,
"name": transfer.rws.name,
"rse_id": src_rse_id,
"rse_id": source.rse.id,
"dest_rse_id": transfer.dst.rse.id,
"ranking": rank if rank else 0,
"ranking": source.ranking,
"bytes": transfer.rws.byte_count,
"url": src_url,
"url": transfer.source_url(source),
"is_using": True,
}

Expand Down
10 changes: 5 additions & 5 deletions lib/rucio/transfertool/fts3.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ def build_job_params(transfer_path, bring_online, default_lifetime, archive_time
if len(transfer_path) > 1:
job_params['multihop'] = True
job_params['job_metadata']['multihop'] = True
elif len(last_hop.legacy_sources) > 1:
elif len(last_hop.sources) > 1:
job_params['job_metadata']['multi_sources'] = True
if strict_copy:
job_params['strict_copy'] = strict_copy
Expand Down Expand Up @@ -894,7 +894,7 @@ def _file_from_transfer(self, transfer, job_params):
rws = transfer.rws
checksum_to_use = _pick_fts_checksum(transfer, path_strategy=job_params['verify_checksum'])
t_file = {
'sources': [s[1] for s in transfer.legacy_sources],
'sources': [transfer.source_url(s) for s in transfer.sources],
'destinations': [transfer.dest_url],
'metadata': {
'request_id': rws.request_id,
Expand All @@ -920,9 +920,9 @@ def _file_from_transfer(self, transfer, job_params):

if self.token:
t_file['source_tokens'] = []
for source in transfer.legacy_sources:
src_audience = config_get('conveyor', 'request_oidc_audience', False) or determine_audience_for_rse(rse_id=source[2])
src_scope = determine_scope_for_rse(rse_id=source[2], scopes=['storage.read'], extra_scopes=['offline_access'])
for source in transfer.sources:
src_audience = config_get('conveyor', 'request_oidc_audience', False) or determine_audience_for_rse(rse_id=source.rse.id)
src_scope = determine_scope_for_rse(rse_id=source.rse.id, scopes=['storage.read'], extra_scopes=['offline_access'])
t_file['source_tokens'].append(request_token(src_audience, src_scope))

dst_audience = config_get('conveyor', 'request_oidc_audience', False) or determine_audience_for_rse(transfer.dst.rse.id)
Expand Down
2 changes: 1 addition & 1 deletion lib/rucio/transfertool/globus.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def submit(self, transfers, job_params, timeout=None):
submitjob = [
{
# Some dict elements are not needed by globus transfertool, but are accessed by further common fts/globus code
'sources': [s[1] for s in transfer.legacy_sources],
'sources': [transfer.source_url(s) for s in transfer.sources],
'destinations': [transfer.dest_url],
'metadata': {
'src_rse': transfer.src.rse.name,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_tpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_tpc(containerized_rses, root_account, test_scope, did_factory, rse_clie
paths, *_ = build_transfer_paths(topology=topology, protocol_factory=ProtocolFactory(), requests_with_sources=requests)
[[_, [transfer_path]]] = paths.items()
assert transfer_path[0].rws.rule_id == rule_id[0]
src_url = transfer_path[0].legacy_sources[0][1]
src_url = transfer_path[0].source_url(transfer_path[0].sources[0])
dest_url = transfer_path[0].dest_url
check_url(src_url, rse1_hostname, test_file_expected_pfn)
check_url(dest_url, rse2_hostname, test_file_expected_pfn)
Expand Down
20 changes: 10 additions & 10 deletions tests/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,16 @@ def test_disk_vs_tape_priority(rse_factory, root_account, mock_scope, file_confi
# On equal priority and distance, disk should be preferred over tape. Both disk sources will be returned
[[_, [transfer]]] = pick_and_prepare_submission_path(topology=topology, protocol_factory=ProtocolFactory(),
requests_with_sources=requests).items()
assert len(transfer[0].legacy_sources) == 2
assert transfer[0].legacy_sources[0][0] in (disk1_rse_name, disk2_rse_name)
assert len(transfer[0].sources) == 2
assert transfer[0].sources[0].rse.name in (disk1_rse_name, disk2_rse_name)

# Change the rating of the disk RSEs. Disk still preferred, because it must fail twice before tape is tried
disk1_source.ranking = -1
disk2_source.ranking = -1
[[_, [transfer]]] = pick_and_prepare_submission_path(topology=topology, protocol_factory=ProtocolFactory(),
requests_with_sources=requests).items()
assert len(transfer[0].legacy_sources) == 2
assert transfer[0].legacy_sources[0][0] in (disk1_rse_name, disk2_rse_name)
assert len(transfer[0].sources) == 2
assert transfer[0].sources[0].rse.name in (disk1_rse_name, disk2_rse_name)

# Change the rating of the disk RSEs again. Tape RSEs must now be preferred.
# Multiple tape sources are not allowed. Only one tape RSE source must be returned.
Expand All @@ -200,21 +200,21 @@ def test_disk_vs_tape_priority(rse_factory, root_account, mock_scope, file_confi
requests_with_sources=requests).items()
assert len(transfers) == 1
transfer = transfers[0]
assert len(transfer[0].legacy_sources) == 1
assert transfer[0].legacy_sources[0][0] in (tape1_rse_name, tape2_rse_name)
assert len(transfer[0].sources) == 1
assert transfer[0].sources[0].rse.name in (tape1_rse_name, tape2_rse_name)

# On equal source ranking, but different distance; the smaller distance is preferred
[[_, [transfer]]] = pick_and_prepare_submission_path(topology=topology, protocol_factory=ProtocolFactory(),
requests_with_sources=requests).items()
assert len(transfer[0].legacy_sources) == 1
assert transfer[0].legacy_sources[0][0] == tape2_rse_name
assert len(transfer[0].sources) == 1
assert transfer[0].sources[0].rse.name == tape2_rse_name

# On different source ranking, the bigger ranking is preferred
tape2_source.ranking = -1
[[_, [transfer]]] = pick_and_prepare_submission_path(topology=topology, protocol_factory=ProtocolFactory(),
requests_with_sources=requests).items()
assert len(transfer[0].legacy_sources) == 1
assert transfer[0].legacy_sources[0][0] == tape1_rse_name
assert len(transfer[0].sources) == 1
assert transfer[0].sources[0].rse.name == tape1_rse_name


@pytest.mark.parametrize("file_config_mock", [
Expand Down

0 comments on commit 2664227

Please sign in to comment.