Skip to content

Commit

Permalink
Transfers: move DirectTransferDefinition to request.py
Browse files Browse the repository at this point in the history
This is an object which is used by transfertools and by transfer.py
So move it to avoid cyclical dependencies:
- transfertools require transfer.py for DirectTransferDefinition
- transfer.py require transfertools to do the actual work
  • Loading branch information
Radu Carpa committed Jan 19, 2024
1 parent 477ceb8 commit a333221
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 66 deletions.
48 changes: 48 additions & 0 deletions lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import threading
import traceback
import uuid
from abc import ABCMeta, abstractmethod
from collections import namedtuple, defaultdict
from collections.abc import Sequence, Mapping, Iterator
from dataclasses import dataclass
Expand Down Expand Up @@ -51,6 +52,7 @@
if TYPE_CHECKING:

from sqlalchemy.orm import Session
from rucio.rse.protocols.protocol import RSEProtocol

"""
The core request.py is specifically for handling requests.
Expand All @@ -73,6 +75,15 @@ def __str__(self):
return "src_rse={}".format(self.rse)


class TransferDestination:
def __init__(self, rse: RseData, scheme):
self.rse = rse
self.scheme = scheme

def __str__(self):
return "dst_rse={}".format(self.rse)


class RequestWithSources:
def __init__(
self,
Expand Down Expand Up @@ -146,6 +157,43 @@ def _parse_db_attributes(db_attributes):
return attr


class DirectTransfer(metaclass=ABCMeta):
"""
The configuration for a direct (non-multi-hop) transfer. It can be a multi-source transfer.
"""

def __init__(self, sources: list[RequestSource], rws: RequestWithSources) -> None:
self.sources: list[RequestSource] = sources
self.rws: RequestWithSources = rws

@property
@abstractmethod
def src(self) -> RequestSource:
pass

@property
@abstractmethod
def dst(self) -> TransferDestination:
pass

@property
@abstractmethod
def dest_url(self) -> str:
pass

@abstractmethod
def source_url(self, source: RequestSource) -> str:
pass

@abstractmethod
def dest_protocol(self) -> "RSEProtocol":
pass

@abstractmethod
def source_protocol(self, source: RequestSource) -> "RSEProtocol":
pass


def should_retry_request(req, retry_protocol_mismatches):
"""
Whether should retry this request.
Expand Down
72 changes: 22 additions & 50 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from rucio.core import did, message as message_core, request as request_core
from rucio.core.account import list_accounts
from rucio.core.monitor import MetricManager
from rucio.core.request import transition_request_state, RequestWithSources, RequestSource
from rucio.core.request import transition_request_state, RequestWithSources, RequestSource, TransferDestination, DirectTransfer
from rucio.core.rse import RseData
from rucio.core.rse_expression_parser import parse_expression
from rucio.db.sqla import models
Expand All @@ -57,6 +57,7 @@
from sqlalchemy.orm import Session
from rucio.common.types import InternalAccount
from rucio.core.topology import Topology
from rucio.rse.protocols.protocol import RSEProtocol

LoggerFunction = Callable[..., Any]

Expand All @@ -81,15 +82,6 @@
}


class TransferDestination:
def __init__(self, rse: RseData, scheme):
self.rse = rse
self.scheme = scheme

def __str__(self):
return "dst_rse={}".format(self.rse)


class ProtocolFactory:
"""
Creates and caches protocol objects. Allowing to reuse them.
Expand All @@ -106,7 +98,7 @@ def protocol(self, rse: RseData, scheme: "Optional[str]", operation: str):
return protocol


class DirectTransferDefinition:
class DirectTransferImplementation(DirectTransfer):
"""
The configuration for a direct (non-multi-hop) transfer. It can be a multi-source transfer.
Expand All @@ -115,10 +107,9 @@ class DirectTransferDefinition:
"""
def __init__(self, source: RequestSource, destination: TransferDestination, rws: RequestWithSources,
protocol_factory: ProtocolFactory, operation_src: str, operation_dest: str):
self.sources = [source]
super().__init__(sources=[source], rws=rws)
self.destination = destination

self.rws = rws
self.protocol_factory = protocol_factory
self.operation_src = operation_src
self.operation_dest = operation_dest
Expand All @@ -134,11 +125,11 @@ def __str__(self):
)

@property
def src(self):
def src(self) -> RequestSource:
return self.sources[0]

@property
def dst(self):
def dst(self) -> TransferDestination:
return self.destination

@property
Expand All @@ -159,31 +150,12 @@ def source_url(self, source: RequestSource) -> str:
)
return url

def dest_protocol(self):
def dest_protocol(self) -> "RSEProtocol":
return self.protocol_factory.protocol(self.dst.rse, self.dst.scheme, self.operation_dest)

def source_protocol(self, source: RequestSource):
def source_protocol(self, source: RequestSource) -> "RSEProtocol":
return self.protocol_factory.protocol(source.rse, source.scheme, self.operation_src)

@property
def use_ipv4(self):
# If any source or destination rse is ipv4 only
return self.dst.rse.attributes.get('use_ipv4', False) or any(src.rse.attributes.get('use_ipv4', False)
for src in self.sources)

@property
def use_tokens(self) -> bool:
"""Whether a transfer can be performed with tokens.
In order to be so, all the involved RSEs must have it explicitly enabled
and the protocol being used must be WebDAV.
"""
for endpoint in [*self.sources, self.destination]:
if (endpoint.rse.attributes.get('oidc_support') is not True
or endpoint.scheme != 'davs'):
return False
return True

@staticmethod
def __rewrite_source_url(source_url, source_sign_url, dest_sign_url, source_scheme):
"""
Expand Down Expand Up @@ -269,7 +241,7 @@ def _generate_dest_url(cls, dst: TransferDestination, rws: RequestWithSources, p
return dest_url


class StageinTransferDefinition(DirectTransferDefinition):
class StageinTransferImplementation(DirectTransferImplementation):
"""
A definition of a transfer which triggers a stagein operation.
- The source and destination url are identical
Expand Down Expand Up @@ -306,7 +278,7 @@ def source_url(self, source: RequestSource) -> str:
return self.dest_url


def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str:
def transfer_path_str(transfer_path: "list[DirectTransfer]") -> str:
"""
an implementation of __str__ for a transfer path, which is a list of direct transfers, so not really an object
"""
Expand All @@ -333,7 +305,7 @@ def transfer_path_str(transfer_path: "list[DirectTransferDefinition]") -> str:

@transactional_session
def mark_submitting(
transfer: "DirectTransferDefinition",
transfer: "DirectTransfer",
external_host: str,
*,
logger: "Callable",
Expand Down Expand Up @@ -379,7 +351,7 @@ def mark_submitting(

@transactional_session
def ensure_db_sources(
transfer_path: "list[DirectTransferDefinition]",
transfer_path: "list[DirectTransfer]",
*,
logger: "Callable",
session: "Session",
Expand Down Expand Up @@ -640,7 +612,7 @@ def _create_transfer_definitions(
domain: str,
*,
session: "Session",
) -> "dict[RseData, list[DirectTransferDefinition]]":
) -> "dict[RseData, list[DirectTransfer]]":
"""
Find the all paths from sources towards the destination of the given transfer request.
Create the transfer definitions for each point-to-point transfer (multi-source, when possible)
Expand Down Expand Up @@ -668,7 +640,7 @@ def _create_transfer_definitions(
rse=hop_dst_rse,
scheme=hop['dest_scheme'],
)
hop_definition = DirectTransferDefinition(
hop_definition = DirectTransferImplementation(
source=src,
destination=dst,
operation_src=operation_src,
Expand Down Expand Up @@ -764,13 +736,13 @@ def _create_stagein_definitions(
operation_src: str,
operation_dest: str,
protocol_factory: ProtocolFactory,
) -> "dict[RseData, list[StageinTransferDefinition]]":
) -> "dict[RseData, list[DirectTransfer]]":
"""
for each source, create a single-hop transfer path with a one stageing definition inside
"""
transfers_by_source = {
source.rse: [
StageinTransferDefinition(
cast(DirectTransfer, StageinTransferImplementation(
source=RequestSource(
rse=source.rse,
file_path=source.file_path,
Expand All @@ -785,7 +757,7 @@ def _create_stagein_definitions(
operation_dest=operation_dest,
rws=rws,
protocol_factory=protocol_factory,
)
))

]
for source in sources
Expand All @@ -804,9 +776,9 @@ def get_dsn(scope, name, dsn):


def __compress_multihops(
paths_by_source: "Iterable[tuple[RequestSource, Sequence[DirectTransferDefinition]]]",
paths_by_source: "Iterable[tuple[RequestSource, Sequence[DirectTransfer]]]",
sources: "Iterable[RequestSource]",
) -> "Iterator[tuple[RequestSource, Sequence[DirectTransferDefinition]]]":
) -> "Iterator[tuple[RequestSource, Sequence[DirectTransfer]]]":
# Compress multihop transfers which contain other sources as part of itself.
# For example: multihop A->B->C and B is a source, compress A->B->C into B->C
source_rses = {s.rse.id for s in sources}
Expand Down Expand Up @@ -853,7 +825,7 @@ def build_or_return_cached(
*,
logger: "LoggerFunction" = logging.log,
session: "Session"
) -> "Mapping[RseData, Sequence[DirectTransferDefinition]]":
) -> "Mapping[RseData, Sequence[DirectTransfer]]":
"""
Warning: The function currently caches the result for the given request and returns it for later calls
with the same request id. As a result: it can return more (or less) sources than what is provided in the
Expand Down Expand Up @@ -1055,7 +1027,7 @@ def apply(self, ctx: RequestRankingContext, source: RequestSource) -> "Optional[
class PathDistance(SourceRankingStrategy):

class _RankingContext(RequestRankingContext):
def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources", paths_for_rws: "Mapping[RseData, Sequence[DirectTransferDefinition]]"):
def __init__(self, strategy: "SourceRankingStrategy", rws: "RequestWithSources", paths_for_rws: "Mapping[RseData, Sequence[DirectTransfer]]"):
super().__init__(strategy, rws)
self.paths_for_rws = paths_for_rws

Expand Down Expand Up @@ -1414,7 +1386,7 @@ def cancel_transfer(transfertool_obj, transfer_id):

@transactional_session
def prepare_transfers(
candidate_paths_by_request_id: "dict[str, list[list[DirectTransferDefinition]]]",
candidate_paths_by_request_id: "dict[str, list[list[DirectTransfer]]]",
logger: "LoggerFunction" = logging.log,
transfertools: "Optional[list[str]]" = None,
*,
Expand Down
16 changes: 8 additions & 8 deletions lib/rucio/daemons/conveyor/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from typing import Optional
from rucio.core.transfer import DirectTransferDefinition
from rucio.core.request import DirectTransfer
from rucio.transfertool.transfertool import TransferToolBuilder
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -131,10 +131,10 @@ def pick_and_prepare_submission_path(requests_with_sources, topology, protocol_f


def __assign_to_transfertool(
transfer_path: "list[DirectTransferDefinition]",
transfer_path: "list[DirectTransfer]",
transfertools: "Optional[list[str]]",
logger: "Callable",
) -> "list[tuple[list[DirectTransferDefinition], Optional[TransferToolBuilder]]]":
) -> "list[tuple[list[DirectTransfer], Optional[TransferToolBuilder]]]":
"""
Iterate over a multihop path and assign sub-paths to transfertools in chucks from left to right.
Expand Down Expand Up @@ -174,11 +174,11 @@ def __assign_to_transfertool(


def assign_paths_to_transfertool_and_create_hops(
candidate_paths_by_request_id: "dict[str: list[DirectTransferDefinition]]",
candidate_paths_by_request_id: "dict[str: list[DirectTransfer]]",
default_tombstone_delay: int,
transfertools: "Optional[list[str]]" = None,
logger: "Callable" = logging.log,
) -> "tuple[dict[TransferToolBuilder, list[DirectTransferDefinition]], set[str]]":
) -> "tuple[dict[TransferToolBuilder, list[DirectTransfer]], set[str]]":
"""
for each request, pick the first path which can be submitted by one of the transfertools.
If the chosen path is multihop, create all missing intermediate requests and replicas.
Expand Down Expand Up @@ -214,13 +214,13 @@ def assign_paths_to_transfertool_and_create_hops(
@transactional_session
def __assign_paths_to_transfertool_and_create_hops(
request_id: str,
candidate_paths: "Sequence[list[DirectTransferDefinition]]",
candidate_paths: "Sequence[list[DirectTransfer]]",
default_tombstone_delay: int,
transfertools: "Optional[list[str]]" = None,
*,
logger: "Callable" = logging.log,
session: "Session",
) -> "tuple[Optional[list[DirectTransferDefinition]], Optional[TransferToolBuilder]]":
) -> "tuple[Optional[list[DirectTransfer]], Optional[TransferToolBuilder]]":
"""
Out of a sequence of candidate paths for the given request, pick the first path which can
be submitted by one of the transfertools.
Expand Down Expand Up @@ -290,7 +290,7 @@ def __assign_paths_to_transfertool_and_create_hops(

@transactional_session
def __create_missing_replicas_and_requests(
transfer_path: "list[DirectTransferDefinition]",
transfer_path: "list[DirectTransfer]",
default_tombstone_delay: int,
*,
logger: "Callable",
Expand Down
Loading

0 comments on commit a333221

Please sign in to comment.