Skip to content

Commit

Permalink
Transfers: bittorrent transfertool
Browse files Browse the repository at this point in the history
Additional metadata is added to each file did: the merkle root and
piece layers used by the bittorrent v2 format. This way, we can
reconstruct the .torrent files from this data, allowing us to
transfer files directly between RSEs using bittorrent clients
running on each of the RSEs.

This, initial implementation, relies on the qBittorrent client, but
other clients which support bittorrent v2 could be added later.

A new RSE protocol was needed for the task. The protocol must be
configured with the hostname+port of the bittorrent's data channel
and the 'magnet' scheme. We use custom extensions to the magnet
format to store the name/scope/path of a replica, so the link is not
currently importable into existing torrent clients, but this leaves
the door open for the future. It would be possible to generate magnet
links which actually work with such clients directly in list-replicas.
  • Loading branch information
Radu Carpa committed Jan 11, 2024
1 parent a547060 commit 7ec7dd6
Show file tree
Hide file tree
Showing 17 changed files with 903 additions and 30 deletions.
5 changes: 5 additions & 0 deletions etc/rse-accounts.cfg.template
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,10 @@
"cert": ["/tmp/x509up","/tmp/x509up"],
"auth_type": "cert",
"timeout":300
},
"": {
"qbittorrent_management_address",
"qbittorrent_username": "",
"qbittorrent_password": ""
}
}
15 changes: 14 additions & 1 deletion lib/rucio/client/uploadclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import copy
import json
import logging
Expand All @@ -30,7 +31,7 @@
ResourceTemporaryUnavailable, ServiceUnavailable, InputValidationError, RSEChecksumUnavailable,
ScopeNotFound)
from rucio.common.utils import (adler32, detect_client_location, execute, generate_uuid, make_valid_did, md5, send_trace,
retry, GLOBALLY_SUPPORTED_CHECKSUMS)
retry, bittorrent_v2_merkle_sha256, GLOBALLY_SUPPORTED_CHECKSUMS)
from rucio.rse import rsemanager as rsemgr


Expand Down Expand Up @@ -336,6 +337,16 @@ def _pick_random_rse(rse_expression):
raise NotAllFilesUploaded()
return 0

def _add_bittorrent_meta(self, file, logger):
pieces_root, pieces_layers, piece_length = bittorrent_v2_merkle_sha256(os.path.join(file['dirname'], file['basename']))
bittorrent_meta = {
'bittorrent_pieces_root': base64.b64encode(pieces_root).decode(),
'bittorrent_pieces_layers': base64.b64encode(pieces_layers).decode(),
'bittorrent_piece_length': piece_length,
}
self.client.set_metadata_bulk(scope=file['did_scope'], name=file['did_name'], meta=bittorrent_meta)
logger(logging.INFO, 'Added Bittorrent did meta')

def _register_file(self, file, registered_dataset_dids, ignore_availability=False, activity=None):
"""
Registers the given file in Rucio. Creates a dataset if
Expand Down Expand Up @@ -404,12 +415,14 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals

# add file to rse if it is not registered yet
replicastate = list(self.client.list_replicas([file_did], all_states=True))
self._add_bittorrent_meta(file=file, logger=logger)
if rse not in replicastate[0]['rses']:
self.client.add_replicas(rse=rse, files=[replica_for_api])
logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse)
except DataIdentifierNotFound:
logger(logging.DEBUG, 'File DID does not exist')
self.client.add_replicas(rse=rse, files=[replica_for_api])
self._add_bittorrent_meta(file=file, logger=logger)
logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse)
if not dataset_did_str:
# only need to add rules for files if no dataset is given
Expand Down
238 changes: 237 additions & 1 deletion lib/rucio/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import argparse
import base64
import copy
import datetime
import errno
import getpass
import hashlib
import io
import ipaddress
import itertools
import json
import logging
Expand All @@ -43,6 +45,7 @@
from uuid import uuid4 as uuid
from xml.etree import ElementTree

import math
import mmap
import requests
import zlib
Expand All @@ -63,7 +66,7 @@

if TYPE_CHECKING:
from collections.abc import Callable
from typing import TypeVar
from typing import TypeVar, Optional

T = TypeVar('T')

Expand Down Expand Up @@ -347,6 +350,218 @@ def crc32(file):
CHECKSUM_ALGO_DICT['crc32'] = crc32


def _next_pow2(num):
if not num:
return 0
return math.ceil(math.log2(num))


def _bittorrent_v2_piece_length_pow2(file_size: int) -> int:
"""
Automatically chooses the `piece size` so that `piece layers`
is kept small(er) than usually. This is a balancing act:
having a big piece_length requires more work on bittorrent client
side to validate hashes, but having it small requires more
place to store the `piece layers` in the database.
Returns the result as the exponent 'x' for power of 2.
To get the actual length in bytes, the caller should compute 2^x.
"""

# by the bittorrent v2 specification, the piece size is equal to block size = 16KiB
min_piece_len_pow2 = 14 # 2 ** 14 == 16 KiB
if not file_size:
return min_piece_len_pow2
# Limit the maximum size of pieces_layers hash chain for bittorrent v2,
# because we'll have to store it in the database
max_pieces_layers_size_pow2 = 20 # 2 ** 20 == 1 MiB
# sha256 requires 2 ** 5 == 32 Bytes == 256 bits
hash_size_pow2 = 5

# The closest power of two bigger than the file size
file_size_pow2 = _next_pow2(file_size)

# Compute the target size for the 'pieces layers' in the torrent
# (as power of two: the closest power-of-two smaller than the number)
# Will cap at max_pieces_layers_size for files larger than 1TB.
target_pieces_layers_size = math.sqrt(file_size)
target_pieces_layers_size_pow2 = min(math.floor(math.log2(target_pieces_layers_size)), max_pieces_layers_size_pow2)
target_piece_num_pow2 = max(target_pieces_layers_size_pow2 - hash_size_pow2, 0)

piece_length_pow2 = max(file_size_pow2 - target_piece_num_pow2, min_piece_len_pow2)
return piece_length_pow2


def bittorrent_v2_piece_length(file_size: int) -> int:
return 2 ** _bittorrent_v2_piece_length_pow2(file_size)


def bittorrent_v2_merkle_sha256(file) -> tuple[bytes, bytes, int]:
"""
Compute the .torrent v2 hash tree for the given file.
(http://www.bittorrent.org/beps/bep_0052.html)
In particular, it will return the root of the merkle hash
tree of the file, the 'piece layers' as described in the
previous BEP, and the chosen `piece size`
This function will read the file in chunks of 16KiB
(which is the imposed block size by bittorrent v2) and compute
the sha256 hash of each block. When enough blocks are read
to form a `piece`, will compute the merkle hash root of the
piece from the hashes of its blocks. At the end, the hashes
of pieces are combined to create the global pieces_root.
"""

# by the bittorrent v2 specification, the block size and the
# minimum piece size are both fixed to 16KiB
block_size = 16384
block_size_pow2 = 14 # 2 ** 14 == 16 KiB
# sha256 requires 2 ** 5 == 32 Bytes == 256 bits
hash_size = 32

def _merkle_root(leafs: list[bytes], nb_levels: int, padding: bytes) -> bytes:
"""
Build the root of the merkle hash tree from the (possibly incomplete) leafs layer.
If len(leafs) < 2 ** nb_levels, it will be padded with the padding repeated as many times
as needed to have 2 ** nb_levels leafs in total.
"""
nodes = copy.copy(leafs)
level = nb_levels

while level > 0:
for i in range(2 ** (level - 1)):
node1 = nodes[2 * i] if 2 * i < len(nodes) else padding
node2 = nodes[2 * i + 1] if 2 * i + 1 < len(nodes) else padding
h = hashlib.sha256(node1)
h.update(node2)
if i < len(nodes):
nodes[i] = h.digest()
else:
nodes.append(h.digest())
level -= 1
return nodes[0] if nodes else padding

file_size = os.stat(file).st_size
piece_length_pow2 = _bittorrent_v2_piece_length_pow2(file_size)

block_per_piece_pow2 = piece_length_pow2 - block_size_pow2
piece_length = 2 ** piece_length_pow2
block_per_piece = 2 ** block_per_piece_pow2
piece_num = math.ceil(file_size / piece_length)

remaining = file_size
remaining_in_block = min(file_size, block_size)
block_hashes = []
piece_hashes = []
current_hash = hashlib.sha256()
block_padding = bytes(hash_size)
with open(file, 'rb') as f:
while True:
data = f.read(remaining_in_block)
if not data:
break

current_hash.update(data)

remaining_in_block -= len(data)
remaining -= len(data)

if not remaining_in_block:
block_hashes.append(current_hash.digest())
if len(block_hashes) == block_per_piece or not remaining:
piece_hashes.append(_merkle_root(block_hashes, nb_levels=block_per_piece_pow2, padding=block_padding))
block_hashes = []
current_hash = hashlib.sha256()
remaining_in_block = min(block_size, remaining)

if not remaining:
break

if remaining or remaining_in_block or len(piece_hashes) != piece_num:
raise RucioException(f'Error while computing merkle sha256 of {file}')

piece_padding = _merkle_root([], nb_levels=block_per_piece_pow2, padding=block_padding)
pieces_root = _merkle_root(piece_hashes, nb_levels=_next_pow2(piece_num), padding=piece_padding)
pieces_layers = b''.join(piece_hashes) if len(piece_hashes) > 1 else b''

return pieces_root, pieces_layers, piece_length


def merkle_sha256(file) -> str:
"""
The root of the sha256 merkle hash tree with leaf size of 16 KiB.
"""
pieces_root, _, _ = bittorrent_v2_merkle_sha256(file)
return pieces_root.hex()


CHECKSUM_ALGO_DICT['merkle_sha256'] = merkle_sha256


def bencode(obj):
"""
Copied from the reference implementation of v2 bittorrent:
http://bittorrent.org/beps/bep_0052_torrent_creator.py
"""

if isinstance(obj, int):
return b"i" + str(obj).encode() + b"e"
elif isinstance(obj, bytes):
return str(len(obj)).encode() + b":" + obj
elif isinstance(obj, str):
return bencode(obj.encode("utf-8"))
elif isinstance(obj, list):
return b"l" + b"".join(map(bencode, obj)) + b"e"
elif isinstance(obj, dict):
if all(isinstance(i, bytes) for i in obj.keys()):
items = list(obj.items())
items.sort()
return b"d" + b"".join(map(bencode, itertools.chain(*items))) + b"e"
else:
raise ValueError("dict keys should be bytes " + str(obj.keys()))
raise ValueError("Allowed types: int, bytes, list, dict; not %s", type(obj))


def construct_torrent(
scope: str,
name: str,
length: int,
piece_length: int,
pieces_root: bytes,
pieces_layers: "Optional[bytes]" = None,
trackers: "Optional[list[str]]" = None,
) -> "tuple[str, bytes]":

torrent_dict = {
b'creation date': int(time.time()),
b'info': {
b'meta version': 2,
b'private': 1,
b'name': f'{scope}:{name}'.encode(),
b'piece length': piece_length,
b'file tree': {
name.encode(): {
b'': {
b'length': length,
b'pieces root': pieces_root,
}
}
}
},
b'piece layers': {},
}
if trackers:
torrent_dict[b'announce'] = trackers[0].encode()
if len(trackers) > 1:
torrent_dict[b'announce-list'] = [t.encode() for t in trackers]
if pieces_layers:
torrent_dict[b'piece layers'][pieces_root] = pieces_layers

torrent_id = hashlib.sha256(bencode(torrent_dict[b'info'])).hexdigest()[:40]
torrent = bencode(torrent_dict)
return torrent_id, torrent


def str_to_date(string):
""" Converts a RFC-1123 string to the corresponding datetime value.
Expand Down Expand Up @@ -908,6 +1123,27 @@ class Color:
END = '\033[0m'


def resolve_ips(hostname: str) -> list[str]:
try:
ipaddress.ip_address(hostname)
return [hostname]
except ValueError:
pass
try:
addrinfo = socket.getaddrinfo(hostname, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
return [ai[4][0] for ai in addrinfo]
except socket.gaierror:
pass
return []


def resolve_ip(hostname: str):
ips = resolve_ips(hostname)
if ips:
return ips[0]
return None


def detect_client_location():
"""
Normally client IP will be set on the server side (request.remote_addr)
Expand Down
4 changes: 2 additions & 2 deletions lib/rucio/core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,8 +774,8 @@ def get_and_mark_next(

dst_id = res_dict['dest_rse_id']
src_id = res_dict['source_rse_id']
res_dict['dst_rse'] = rse_collection[dst_id].ensure_loaded(load_name=True)
res_dict['src_rse'] = rse_collection[src_id].ensure_loaded(load_name=True) if src_id is not None else None
res_dict['dst_rse'] = rse_collection[dst_id].ensure_loaded(load_name=True, load_attributes=True)
res_dict['src_rse'] = rse_collection[src_id].ensure_loaded(load_name=True, load_attributes=True) if src_id is not None else None

result.append(res_dict)
else:
Expand Down
Loading

0 comments on commit 7ec7dd6

Please sign in to comment.