Skip to content

Commit

Permalink
WIP: Transfers: prototype bittorrent transfertool using deluge
Browse files Browse the repository at this point in the history
Additional metadata is added to each file did containing the
merkle hash tree of the bittorrent v2 format. This way, we can
reconstruct the .torrent files from this data, allowing us to
transfer files directly between RSEs using the 'deluge' bittorrent
clients running on each of the RSEs.

Deluge is client/server and is written in python. Because of that,
it seemed like a very good idea to rely on this bittorrent client
for integration with rucio. However, it is very heavily async and
relies on twisted (which I don't know at all) for its internals.
Because of that, the current prototype implementation using deluge
is overly complicated and hacky. It spawns a separate process for
each api call to deluge because the twisted async loop is not
restartable. There should definitely be better ways to handle this...
  • Loading branch information
Radu Carpa committed Jan 9, 2024
1 parent d9f6163 commit 0ead260
Show file tree
Hide file tree
Showing 17 changed files with 1,355 additions and 29 deletions.
16 changes: 15 additions & 1 deletion lib/rucio/client/uploadclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import copy
import json
import logging
Expand All @@ -30,7 +31,7 @@
ResourceTemporaryUnavailable, ServiceUnavailable, InputValidationError, RSEChecksumUnavailable,
ScopeNotFound)
from rucio.common.utils import (adler32, detect_client_location, execute, generate_uuid, make_valid_did, md5, send_trace,
retry, GLOBALLY_SUPPORTED_CHECKSUMS)
retry, bittorrent_v2_merkle_sha256, GLOBALLY_SUPPORTED_CHECKSUMS)
from rucio.rse import rsemanager as rsemgr


Expand Down Expand Up @@ -336,6 +337,17 @@ def _pick_random_rse(rse_expression):
raise NotAllFilesUploaded()
return 0

def _add_bittorrent_meta(self, file, logger):
#return
pieces_root, pieces_layers, piece_length = bittorrent_v2_merkle_sha256(os.path.join(file['dirname'], file['basename']))
bittorrent_meta = {
'bittorrent_pieces_root': base64.b64encode(pieces_root).decode(),
'bittorrent_pieces_layers': base64.b64encode(pieces_layers).decode(),
'bittorrent_piece_length': piece_length,
}
self.client.set_metadata_bulk(scope=file['did_scope'], name=file['did_name'], meta=bittorrent_meta)
logger(logging.INFO, 'Added Bittorrent did meta')

def _register_file(self, file, registered_dataset_dids, ignore_availability=False, activity=None):
"""
Registers the given file in Rucio. Creates a dataset if
Expand Down Expand Up @@ -404,12 +416,14 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals

# add file to rse if it is not registered yet
replicastate = list(self.client.list_replicas([file_did], all_states=True))
self._add_bittorrent_meta(file=file, logger=logger)
if rse not in replicastate[0]['rses']:
self.client.add_replicas(rse=rse, files=[replica_for_api])
logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse)
except DataIdentifierNotFound:
logger(logging.DEBUG, 'File DID does not exist')
self.client.add_replicas(rse=rse, files=[replica_for_api])
self._add_bittorrent_meta(file=file, logger=logger)
logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse)
if not dataset_did_str:
# only need to add rules for files if no dataset is given
Expand Down
172 changes: 172 additions & 0 deletions lib/rucio/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import argparse
import base64
import copy
import datetime
import errno
import getpass
import hashlib
import io
import ipaddress
import itertools
import json
import logging
Expand All @@ -43,6 +45,7 @@
from uuid import uuid4 as uuid
from xml.etree import ElementTree

import math
import mmap
import requests
import zlib
Expand Down Expand Up @@ -331,6 +334,154 @@ def sha256(file):
CHECKSUM_ALGO_DICT['sha256'] = sha256


def _next_pow2(num):
if not num:
return 0
return math.ceil(math.log2(num))


def _bittorrent_v2_piece_length_pow2(file_size: int) -> int:
"""
Automatically chooses the `piece size` so that `piece layers`
is kept small(er) than usually. This is a balancing act:
having a big piece_length requires more work on bittorrent client
side to validate hashes, but having it small requires more
place to store the `piece layers` in the database.
Returns the result as the exponent 'x' for power of 2.
To get the actual length in bytes, the caller should compute 2^x.
"""

# by the bittorrent v2 specification, the piece size is equal to block size = 16KiB
min_piece_len_pow2 = 14 # 2 ** 14 == 16 KiB
if not file_size:
return min_piece_len_pow2
# Limit the maximum size of pieces_layers hash chain for bittorrent v2,
# because we'll have to store it in the database
max_pieces_layers_size_pow2 = 20 # 2 ** 20 == 1 MiB
# sha256 requires 2 ** 5 == 32 Bytes == 256 bits
hash_size_pow2 = 5

# The closest power of two bigger than the file size
file_size_pow2 = _next_pow2(file_size)

# Compute the target size for the 'pieces layers' in the torrent
# (as power of two: the closest power-of-two smaller than the number)
# Will cap at max_pieces_layers_size for files larger than 1TB.
target_pieces_layers_size = math.sqrt(file_size)
target_pieces_layers_size_pow2 = min(math.floor(math.log2(target_pieces_layers_size)), max_pieces_layers_size_pow2)
target_piece_num_pow2 = max(target_pieces_layers_size_pow2 - hash_size_pow2, 0)

piece_length_pow2 = max(file_size_pow2 - target_piece_num_pow2, min_piece_len_pow2)
return piece_length_pow2


def bittorrent_v2_piece_length(file_size: int) -> int:
return 2 ** _bittorrent_v2_piece_length_pow2(file_size)


def bittorrent_v2_merkle_sha256(file) -> tuple[bytes, bytes, int]:
"""
Compute the .torrent v2 hash tree for the given file.
(http://www.bittorrent.org/beps/bep_0052.html)
In particular, it will return the root of the merkle hash
tree of the file, the 'piece layers' as described in the
previous BEP, and the chosen `piece size`
This function will read the file in chunks of 16KiB
(which is the imposed block size by bittorrent v2) and compute
the sha256 hash of each block. When enough blocks are read
to form a `piece`, will compute the merkle hash root of the
piece from the hashes of its blocks. At the end, the hashes
of pieces are combined to create the global pieces_root.
"""

# by the bittorrent v2 specification, the block size and the
# minimum piece size are both fixed to 16KiB
block_size = 16384
block_size_pow2 = 14 # 2 ** 14 == 16 KiB
# sha256 requires 2 ** 5 == 32 Bytes == 256 bits
hash_size = 32

def _merkle_root(leafs: list[bytes], nb_levels: int, padding: bytes) -> bytes:
"""
Build the root of the merkle hash tree from the (possibly incomplete) leafs layer.
If len(leafs) < 2 ** nb_levels, it will be padded with the padding repeated as many times
as needed to have 2 ** nb_levels leafs in total.
"""
nodes = copy.copy(leafs)
level = nb_levels

while level > 0:
for i in range(2 ** (level - 1)):
node1 = nodes[2 * i] if 2 * i < len(nodes) else padding
node2 = nodes[2 * i + 1] if 2 * i + 1 < len(nodes) else padding
h = hashlib.sha256(node1)
h.update(node2)
if i < len(nodes):
nodes[i] = h.digest()
else:
nodes.append(h.digest())
level -= 1
return nodes[0] if nodes else padding

file_size = os.stat(file).st_size
piece_length_pow2 = _bittorrent_v2_piece_length_pow2(file_size)

block_per_piece_pow2 = piece_length_pow2 - block_size_pow2
piece_length = 2 ** piece_length_pow2
block_per_piece = 2 ** block_per_piece_pow2
piece_num = math.ceil(file_size / piece_length)

remaining = file_size
remaining_in_block = min(file_size, block_size)
block_hashes = []
piece_hashes = []
current_hash = hashlib.sha256()
block_padding = bytes(hash_size)
with open(file, 'rb') as f:
while True:
data = f.read(remaining_in_block)
if not data:
break

current_hash.update(data)

remaining_in_block -= len(data)
remaining -= len(data)

if not remaining_in_block:
block_hashes.append(current_hash.digest())
if len(block_hashes) == block_per_piece or not remaining:
piece_hashes.append(_merkle_root(block_hashes, nb_levels=block_per_piece_pow2, padding=block_padding))
block_hashes = []
current_hash = hashlib.sha256()
remaining_in_block = min(block_size, remaining)

if not remaining:
break

if remaining or remaining_in_block or len(piece_hashes) != piece_num:
raise RucioException(f'Error while computing merkle sha256 of {file}')

piece_padding = _merkle_root([], nb_levels=block_per_piece_pow2, padding=block_padding)
pieces_root = _merkle_root(piece_hashes, nb_levels=_next_pow2(piece_num), padding=piece_padding)
pieces_layers = b''.join(piece_hashes) if len(piece_hashes) > 1 else b''

return pieces_root, pieces_layers, piece_length


def merkle_sha256(file) -> str:
"""
The root of the sha256 merkle hash tree with leaf size of 16 KiB.
"""
pieces_root, _, _ = bittorrent_v2_merkle_sha256(file)
return pieces_root.hex()


CHECKSUM_ALGO_DICT['merkle_sha256'] = merkle_sha256


def crc32(file):
"""
Runs the CRC32 algorithm on the binary content of the file named file and returns the hexadecimal digest
Expand Down Expand Up @@ -908,6 +1059,27 @@ class Color:
END = '\033[0m'


def resolve_ips(hostname: str) -> list[str]:
try:
ipaddress.ip_address(hostname)
return [hostname]
except ValueError:
pass
try:
addrinfo = socket.getaddrinfo(hostname, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
return [ai[4][0] for ai in addrinfo]
except socket.gaierror:
pass
return []


def resolve_ip(hostname: str):
ips = resolve_ips(hostname)
if ips:
return ips[0]
return None


def detect_client_location():
"""
Normally client IP will be set on the server side (request.remote_addr)
Expand Down
4 changes: 2 additions & 2 deletions lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,8 @@ def get_and_mark_next(

dst_id = res_dict['dest_rse_id']
src_id = res_dict['source_rse_id']
res_dict['dst_rse'] = rse_collection[dst_id].ensure_loaded(load_name=True)
res_dict['src_rse'] = rse_collection[src_id].ensure_loaded(load_name=True) if src_id is not None else None
res_dict['dst_rse'] = rse_collection[dst_id].ensure_loaded(load_name=True, load_attributes=True)
res_dict['src_rse'] = rse_collection[src_id].ensure_loaded(load_name=True, load_attributes=True) if src_id is not None else None

result.append(res_dict)
else:
Expand Down
12 changes: 7 additions & 5 deletions lib/rucio/core/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
from rucio.db.sqla.constants import DIDType, RequestState, RequestType, TransferLimitDirection
from rucio.db.sqla.session import read_session, transactional_session, stream_session
from rucio.rse import rsemanager as rsemgr
from rucio.transfertool.transfertool import TransferStatusReport
from rucio.transfertool.transfertool import TransferStatusReport, Transfertool
from rucio.transfertool.bittorrent import BittorrentTransfertool
from rucio.transfertool.fts3 import FTS3Transfertool
from rucio.transfertool.globus import GlobusTransferTool
from rucio.transfertool.mock import MockTransfertool

if TYPE_CHECKING:
from collections.abc import Callable, Iterator, Iterable, Mapping, Sequence
from typing import Any, Optional
from typing import Any, Optional, Type
from sqlalchemy.orm import Session
from rucio.common.types import InternalAccount
from rucio.core.topology import Topology
Expand All @@ -72,10 +73,11 @@

DEFAULT_MULTIHOP_TOMBSTONE_DELAY = int(datetime.timedelta(hours=2).total_seconds())

TRANSFERTOOL_CLASSES_BY_NAME = {
TRANSFERTOOL_CLASSES_BY_NAME: "dict[str, Type[Transfertool]]" = {
FTS3Transfertool.external_name: FTS3Transfertool,
GlobusTransferTool.external_name: GlobusTransferTool,
MockTransfertool.external_name: MockTransfertool,
BittorrentTransfertool.external_name: BittorrentTransfertool,
}


Expand Down Expand Up @@ -883,7 +885,7 @@ def build_or_return_cached(
sources=candidate_sources,
max_sources=self.max_sources,
multi_source_sources=[] if self.preparer_mode else sources,
limit_dest_schemes=[],
limit_dest_schemes=transfer_schemes,
operation_src='third_party_copy_read',
operation_dest='third_party_copy_write',
domain='wan',
Expand Down Expand Up @@ -1447,7 +1449,7 @@ def prepare_transfers(
logger(logging.WARNING, '%s: all available sources were filtered', rws)
continue

update_dict: dict[Any, Any] = {
update_dict: "dict[Any, Any]" = {
models.Request.state.name: _throttler_request_state(
activity=rws.activity,
source_rse=selected_source.rse,
Expand Down
18 changes: 10 additions & 8 deletions lib/rucio/daemons/conveyor/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
from rucio.db.sqla.constants import RequestState, RequestType
from rucio.transfertool.transfertool import Transfertool
from rucio.transfertool.fts3 import FTS3Transfertool
from rucio.transfertool.globus import GlobusTransferTool
from rucio.transfertool.mock import MockTransfertool

if TYPE_CHECKING:
from rucio.daemons.common import HeartbeatHandler
Expand Down Expand Up @@ -132,18 +130,22 @@ def _handle_requests(

for chunk in dict_chunks(transfers_by_eid, fts_bulk):
try:
if transfertool == 'mock':
transfertool_obj = MockTransfertool(external_host=MockTransfertool.external_name)
elif transfertool == 'globus':
transfertool_obj = GlobusTransferTool(external_host=GlobusTransferTool.external_name)
else:
transfertool_cls = transfer_core.TRANSFERTOOL_CLASSES_BY_NAME.get(transfertool, FTS3Transfertool)

transfertool_kwargs = {}
if transfertool_cls.external_name == FTS3Transfertool.external_name:
account = None
if oidc_account:
if vo:
account = InternalAccount(oidc_account, vo=vo)
else:
account = InternalAccount(oidc_account)
transfertool_obj = FTS3Transfertool(external_host=external_host, vo=vo, oidc_account=account)
transfertool_kwargs.update({
'vo': vo,
'oidc_account': account,
})

transfertool_obj = transfertool_cls(external_host=external_host, **transfertool_kwargs)
poll_transfers(
transfertool_obj=transfertool_obj,
transfers_by_eid=chunk,
Expand Down
Loading

0 comments on commit 0ead260

Please sign in to comment.