From 477ceb8652da23458ff9b6a0372ffe5b45d2fedc Mon Sep 17 00:00:00 2001 From: Radu Carpa Date: Fri, 21 Jul 2023 15:12:07 +0200 Subject: [PATCH] Transfers: bittorrent transfertool Additional metadata is added to each file did: the merkle root and piece layers used by the bittorrent v2 format. This way, we can reconstruct the .torrent files from this data, allowing us to transfer files directly between RSEs using bittorrent clients running on each of the RSEs. This, initial implementation, relies on the qBittorrent client, but other clients which support bittorrent v2 could be added later. A new RSE protocol was needed for the task. The protocol must be configured with the hostname+port of the bittorrent's data channel and the 'magnet' scheme. We use custom extensions to the magnet format to store the name/scope/path of a replica, so the link is not currently importable into existing torrent clients, but this leaves the door open for the future. It would be possible to generate magnet links which actually work with such clients directly in list-replicas. --- etc/docker/dev/docker-compose.ports.yml | 2 + etc/docker/dev/docker-compose.yml | 16 ++ etc/docker/dev/iam/indigoiam_db.sql | 3 +- etc/docker/dev/iam/keycloak_db.sql | 64 ++++- etc/docker/dev/xrd/configure_qbittorrent.py | 135 ++++++++++ etc/docker/dev/xrd/entrypoint.sh | 9 +- .../test/extra/rucio_autotests_common.cfg | 2 +- etc/docker/test/extra/rucio_default.cfg | 2 +- lib/rucio/client/uploadclient.py | 15 +- lib/rucio/common/constants.py | 2 +- lib/rucio/common/types.py | 5 +- lib/rucio/common/utils.py | 238 +++++++++++++++++- lib/rucio/core/request.py | 4 +- lib/rucio/core/transfer.py | 16 +- lib/rucio/daemons/conveyor/poller.py | 18 +- lib/rucio/rse/protocols/bittorrent.py | 184 ++++++++++++++ lib/rucio/transfertool/bittorrent.py | 198 +++++++++++++++ lib/rucio/transfertool/bittorrent_driver.py | 56 +++++ .../bittorrent_driver_qbittorrent.py | 134 ++++++++++ lib/rucio/transfertool/transfertool.py | 2 +- requirements.txt | 2 + tests/conftest.py | 7 + tests/test_conveyor.py | 57 ++++- tests/test_upload.py | 6 - tests/test_utils.py | 26 +- tools/docker_activate_rses.sh | 6 + 26 files changed, 1171 insertions(+), 38 deletions(-) create mode 100644 etc/docker/dev/xrd/configure_qbittorrent.py create mode 100644 lib/rucio/rse/protocols/bittorrent.py create mode 100644 lib/rucio/transfertool/bittorrent.py create mode 100644 lib/rucio/transfertool/bittorrent_driver.py create mode 100644 lib/rucio/transfertool/bittorrent_driver_qbittorrent.py diff --git a/etc/docker/dev/docker-compose.ports.yml b/etc/docker/dev/docker-compose.ports.yml index e563bceea52..d98daf493b7 100644 --- a/etc/docker/dev/docker-compose.ports.yml +++ b/etc/docker/dev/docker-compose.ports.yml @@ -34,9 +34,11 @@ services: xrd1: ports: - "127.0.0.1:1094:1094" + - "127.0.0.1:8094:8094" xrd2: ports: - "127.0.0.1:1095:1095" + - "127.0.0.1:8095:8095" xrd3: ports: - "127.0.0.1:1096:1096" diff --git a/etc/docker/dev/docker-compose.yml b/etc/docker/dev/docker-compose.yml index 514b56463a7..da6f40fab31 100644 --- a/etc/docker/dev/docker-compose.yml +++ b/etc/docker/dev/docker-compose.yml @@ -155,7 +155,13 @@ services: environment: - XRDHOST=xrd1 - XRDPORT=1094 + - QBITTORRENT_UI_USERNAME=rucio + - QBITTORRENT_UI_PASSWORD=rucio90df + - QBITTORRENT_UI_PORT=8094 + - QBITTORRENT_LISTEN_PORT=10000 volumes: + - ./xrd/entrypoint.sh:/docker-entrypoint.sh:ro + - ./xrd:/configs:ro - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z - ../../certs/hostcert_xrd1.pem:/tmp/xrdcert.pem:Z - ../../certs/hostcert_xrd1.key.pem:/tmp/xrdkey.pem:Z @@ -172,7 +178,13 @@ services: environment: - XRDHOST=xrd2 - XRDPORT=1095 + - QBITTORRENT_UI_USERNAME=rucio + - QBITTORRENT_UI_PASSWORD=rucio90df + - QBITTORRENT_UI_PORT=8095 + - QBITTORRENT_LISTEN_PORT=10000 volumes: + - ./xrd/entrypoint.sh:/docker-entrypoint.sh:ro + - ./xrd:/configs:ro - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z - ../../certs/hostcert_xrd2.pem:/tmp/xrdcert.pem:Z - ../../certs/hostcert_xrd2.key.pem:/tmp/xrdkey.pem:Z @@ -190,6 +202,8 @@ services: - XRDHOST=xrd3 - XRDPORT=1096 volumes: + - ./xrd/entrypoint.sh:/docker-entrypoint.sh:ro + - ./xrd:/configs:ro - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z - ../../certs/hostcert_xrd3.pem:/tmp/xrdcert.pem:Z - ../../certs/hostcert_xrd3.key.pem:/tmp/xrdkey.pem:Z @@ -205,6 +219,8 @@ services: - XRDHOST=xrd4 - XRDPORT=1097 volumes: + - ./xrd/entrypoint.sh:/docker-entrypoint.sh:ro + - ./xrd:/configs:ro - ../../certs/rucio_ca.pem:/etc/grid-security/certificates/5fca1cb1.0:z - ../../certs/hostcert_xrd4.pem:/tmp/xrdcert.pem:Z - ../../certs/hostcert_xrd4.key.pem:/tmp/xrdkey.pem:Z diff --git a/etc/docker/dev/iam/indigoiam_db.sql b/etc/docker/dev/iam/indigoiam_db.sql index 902ecc289ae..33c6ce74921 100644 --- a/etc/docker/dev/iam/indigoiam_db.sql +++ b/etc/docker/dev/iam/indigoiam_db.sql @@ -822,7 +822,8 @@ INSERT INTO `client_scope` VALUES (8,'email'), (8,'offline_access'), (8,'storage.read:/'), -(8,'storage.modify:/'); +(8,'storage.modify:/'), +(2,'qbittorrent_admin'); /*!40000 ALTER TABLE `client_scope` ENABLE KEYS */; UNLOCK TABLES; diff --git a/etc/docker/dev/iam/keycloak_db.sql b/etc/docker/dev/iam/keycloak_db.sql index a82927e5290..209557e3421 100644 --- a/etc/docker/dev/iam/keycloak_db.sql +++ b/etc/docker/dev/iam/keycloak_db.sql @@ -408,15 +408,18 @@ CREATE TABLE `CLIENT` ( LOCK TABLES `CLIENT` WRITE; /*!40000 ALTER TABLE `CLIENT` DISABLE KEYS */; INSERT INTO `CLIENT` VALUES +('23a13a94-aaa0-4e53-8b58-76480755f39e','','','xrd2',0,'\0','JRdszmE54CNXJgMTUjfMLR2JxkRppc0s','','\0','','\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',-1,'','\0','','\0','client-secret','','',NULL,'','\0','','\0'), ('2f7d86a0-e8ba-4b75-9009-2048c5611177','','\0','account-console',0,'',NULL,'/realms/master/account/','\0',NULL,'\0','61c254e2-095d-42b9-b8cc-4546b124e548','openid-connect',0,'\0','\0','${client_account-console}','\0','client-secret','${authBaseUrl}',NULL,NULL,'','\0','\0','\0'), ('34bb26a0-d197-48a9-a0e2-4987dec23d0e','','','xrd4',0,'\0','OBGvnFSI1njsrnLSmckZbVYmKTPRtFa8','','\0','','\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',-1,'','\0','','\0','client-secret','','',NULL,'','\0','','\0'), ('49a49ecd-6045-42e4-9043-edf917f74b18','','','web1',0,'',NULL,'','\0','','\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',-1,'','\0','','\0','client-secret','','',NULL,'','\0','','\0'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','','','xrd3',0,'\0','1mf4hazS2xtZqmhOYSzrPnuyXfLYT2r6','','\0','','\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',-1,'','\0','','\0','client-secret','','',NULL,'','\0','','\0'), ('53ef6db9-271e-46c5-bd72-2f12ea045014','','','rucio',0,'\0','DzmZKUfTsGz9bynGIp1gSwI5xen5ce8b','','\0','','\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',-1,'','\0','','','client-secret','','',NULL,'','\0','','\0'), ('6f1f1e92-a5e0-48e5-bdf2-4948cc03b8e6','','\0','realm-management',0,'\0',NULL,NULL,'',NULL,'\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',0,'\0','\0','${client_realm-management}','\0','client-secret',NULL,NULL,NULL,'','\0','\0','\0'), ('6fcc4ef0-a82c-453e-90ba-0753d2c11c58','','\0','master-realm',0,'\0',NULL,NULL,'',NULL,'\0','61c254e2-095d-42b9-b8cc-4546b124e548',NULL,0,'\0','\0','master Realm','\0','client-secret',NULL,NULL,NULL,'','\0','\0','\0'), ('7429fd2a-6b7c-412c-8042-92288dbcaa58','','\0','admin-cli',0,'',NULL,NULL,'\0',NULL,'\0','61c254e2-095d-42b9-b8cc-4546b124e548','openid-connect',0,'\0','\0','${client_admin-cli}','\0','client-secret',NULL,NULL,NULL,'\0','\0','','\0'), ('79748e7e-06c2-4915-988c-0e30b15d12db','','\0','security-admin-console',0,'',NULL,'/admin/master/console/','\0',NULL,'\0','61c254e2-095d-42b9-b8cc-4546b124e548','openid-connect',0,'\0','\0','${client_security-admin-console}','\0','client-secret','${authAdminUrl}',NULL,NULL,'','\0','\0','\0'), ('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','','\0','security-admin-console',0,'',NULL,'/admin/ruciodev/console/','\0',NULL,'\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',0,'\0','\0','${client_security-admin-console}','\0','client-secret','${authAdminUrl}',NULL,NULL,'','\0','\0','\0'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','','','xrd1',0,'\0','FFsZCsbISqURuZNkzWSEONYorCr313lt','','\0','','\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',-1,'','\0','','\0','client-secret','','',NULL,'','\0','','\0'), ('8d0348ea-8e2b-47bf-a95b-69df3d711ebf','','\0','broker',0,'\0',NULL,NULL,'',NULL,'\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',0,'\0','\0','${client_broker}','\0','client-secret',NULL,NULL,NULL,'','\0','\0','\0'), ('95d18788-1f65-4019-81ba-502f2de23982','','\0','admin-cli',0,'',NULL,NULL,'\0',NULL,'\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',0,'\0','\0','${client_admin-cli}','\0','client-secret',NULL,NULL,NULL,'\0','\0','','\0'), ('9c1f7e9e-8703-4cca-82b0-d944dbf29287','','\0','account-console',0,'',NULL,'/realms/ruciodev/account/','\0',NULL,'\0','139c1488-d000-4061-922b-0c0b518a57db','openid-connect',0,'\0','\0','${client_account-console}','\0','client-secret','${authBaseUrl}',NULL,NULL,'','\0','\0','\0'), @@ -451,6 +454,11 @@ CREATE TABLE `CLIENT_ATTRIBUTES` ( LOCK TABLES `CLIENT_ATTRIBUTES` WRITE; /*!40000 ALTER TABLE `CLIENT_ATTRIBUTES` DISABLE KEYS */; INSERT INTO `CLIENT_ATTRIBUTES` VALUES +('23a13a94-aaa0-4e53-8b58-76480755f39e','backchannel.logout.revoke.offline.tokens','false'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','backchannel.logout.session.required','true'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','client.secret.creation.time','1705578754'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','oauth2.device.authorization.grant.enabled','false'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','oidc.ciba.grant.enabled','false'), ('2f7d86a0-e8ba-4b75-9009-2048c5611177','pkce.code.challenge.method','S256'), ('2f7d86a0-e8ba-4b75-9009-2048c5611177','post.logout.redirect.uris','+'), ('34bb26a0-d197-48a9-a0e2-4987dec23d0e','acr.loa.map','{}'), @@ -470,6 +478,11 @@ INSERT INTO `CLIENT_ATTRIBUTES` VALUES ('49a49ecd-6045-42e4-9043-edf917f74b18','display.on.consent.screen','false'), ('49a49ecd-6045-42e4-9043-edf917f74b18','oauth2.device.authorization.grant.enabled','false'), ('49a49ecd-6045-42e4-9043-edf917f74b18','oidc.ciba.grant.enabled','false'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','backchannel.logout.revoke.offline.tokens','false'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','backchannel.logout.session.required','true'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','client.secret.creation.time','1705578741'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','oauth2.device.authorization.grant.enabled','false'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','oidc.ciba.grant.enabled','false'), ('53ef6db9-271e-46c5-bd72-2f12ea045014','backchannel.logout.revoke.offline.tokens','false'), ('53ef6db9-271e-46c5-bd72-2f12ea045014','backchannel.logout.session.required','true'), ('53ef6db9-271e-46c5-bd72-2f12ea045014','client.secret.creation.time','1702649298'), @@ -479,6 +492,11 @@ INSERT INTO `CLIENT_ATTRIBUTES` VALUES ('79748e7e-06c2-4915-988c-0e30b15d12db','post.logout.redirect.uris','+'), ('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','pkce.code.challenge.method','S256'), ('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','post.logout.redirect.uris','+'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','backchannel.logout.revoke.offline.tokens','false'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','backchannel.logout.session.required','true'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','client.secret.creation.time','1705578767'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','oauth2.device.authorization.grant.enabled','false'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','oidc.ciba.grant.enabled','false'), ('9c1f7e9e-8703-4cca-82b0-d944dbf29287','pkce.code.challenge.method','S256'), ('9c1f7e9e-8703-4cca-82b0-d944dbf29287','post.logout.redirect.uris','+'), ('a4c24db6-3fe8-4b9c-a183-42a75148d531','post.logout.redirect.uris','+'), @@ -605,6 +623,7 @@ INSERT INTO `CLIENT_SCOPE` VALUES ('7c7de55b-c72a-4006-9b14-db1398fed22f','role_list','139c1488-d000-4061-922b-0c0b518a57db','SAML role list','saml'), ('8920b300-1b2f-4d18-ab8a-e975974fd013','role_list','61c254e2-095d-42b9-b8cc-4546b124e548','SAML role list','saml'), ('afc839cc-2307-4260-9924-338375d22c2b','email','61c254e2-095d-42b9-b8cc-4546b124e548','OpenID Connect built-in scope: email','openid-connect'), +('b94018cc-b7f3-43e2-b5ba-3e93a48f739e','qbittorrent_admin','139c1488-d000-4061-922b-0c0b518a57db','','openid-connect'), ('c945998b-68b7-4894-9561-7863799cc667','microprofile-jwt','61c254e2-095d-42b9-b8cc-4546b124e548','Microprofile - JWT built-in scope','openid-connect'), ('ccb9d5ba-bf89-4762-81c1-c9c87da7d1e8','microprofile-jwt','139c1488-d000-4061-922b-0c0b518a57db','Microprofile - JWT built-in scope','openid-connect'), ('d0262425-28ca-4dba-8f8d-12d1146cd725','offline_access','61c254e2-095d-42b9-b8cc-4546b124e548','OpenID Connect built-in scope: offline_access','openid-connect'), @@ -683,6 +702,12 @@ INSERT INTO `CLIENT_SCOPE_ATTRIBUTES` VALUES ('afc839cc-2307-4260-9924-338375d22c2b','${emailScopeConsentText}','consent.screen.text'), ('afc839cc-2307-4260-9924-338375d22c2b','true','display.on.consent.screen'), ('afc839cc-2307-4260-9924-338375d22c2b','true','include.in.token.scope'), +('b94018cc-b7f3-43e2-b5ba-3e93a48f739e','','consent.screen.text'), +('b94018cc-b7f3-43e2-b5ba-3e93a48f739e','true','display.on.consent.screen'), +('b94018cc-b7f3-43e2-b5ba-3e93a48f739e','qbittorrent_admin:*','dynamic.scope.regexp'), +('b94018cc-b7f3-43e2-b5ba-3e93a48f739e','','gui.order'), +('b94018cc-b7f3-43e2-b5ba-3e93a48f739e','true','include.in.token.scope'), +('b94018cc-b7f3-43e2-b5ba-3e93a48f739e','false','is.dynamic.scope'), ('c945998b-68b7-4894-9561-7863799cc667','false','display.on.consent.screen'), ('c945998b-68b7-4894-9561-7863799cc667','true','include.in.token.scope'), ('ccb9d5ba-bf89-4762-81c1-c9c87da7d1e8','false','display.on.consent.screen'), @@ -730,6 +755,16 @@ CREATE TABLE `CLIENT_SCOPE_CLIENT` ( LOCK TABLES `CLIENT_SCOPE_CLIENT` WRITE; /*!40000 ALTER TABLE `CLIENT_SCOPE_CLIENT` DISABLE KEYS */; INSERT INTO `CLIENT_SCOPE_CLIENT` VALUES +('23a13a94-aaa0-4e53-8b58-76480755f39e','0c885a01-891a-481f-9087-f6567af22b13',''), +('23a13a94-aaa0-4e53-8b58-76480755f39e','434407ef-1d7f-45e8-b91c-7db10210760a','\0'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','4e882685-31e1-451b-9006-cd4ff0dcf750',''), +('23a13a94-aaa0-4e53-8b58-76480755f39e','5019c5be-c7bd-47b0-a5b3-403a98162efe','\0'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','596a6555-3ee8-4aa9-8168-b8f0de92dbb1','\0'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','781fbb54-8552-44a0-9ea2-fab43dcf0b24',''), +('23a13a94-aaa0-4e53-8b58-76480755f39e','78975493-67a3-4819-a933-47b99c7c7e60',''), +('23a13a94-aaa0-4e53-8b58-76480755f39e','b94018cc-b7f3-43e2-b5ba-3e93a48f739e','\0'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','ccb9d5ba-bf89-4762-81c1-c9c87da7d1e8','\0'), +('23a13a94-aaa0-4e53-8b58-76480755f39e','d6da6000-2013-417d-ad33-33f0804b5b80',''), ('2f7d86a0-e8ba-4b75-9009-2048c5611177','0526da56-aab3-455b-9cc8-2d3d8b0457d6',''), ('2f7d86a0-e8ba-4b75-9009-2048c5611177','0b211c52-ca02-4f22-b786-5a0b5085fc78','\0'), ('2f7d86a0-e8ba-4b75-9009-2048c5611177','21ce4324-232a-46b2-b113-9407b67de017',''), @@ -761,6 +796,16 @@ INSERT INTO `CLIENT_SCOPE_CLIENT` VALUES ('49a49ecd-6045-42e4-9043-edf917f74b18','ccb9d5ba-bf89-4762-81c1-c9c87da7d1e8','\0'), ('49a49ecd-6045-42e4-9043-edf917f74b18','d6da6000-2013-417d-ad33-33f0804b5b80',''), ('49a49ecd-6045-42e4-9043-edf917f74b18','e58db343-7593-4ffb-8791-bf88b0675191','\0'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','0c885a01-891a-481f-9087-f6567af22b13',''), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','434407ef-1d7f-45e8-b91c-7db10210760a','\0'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','4e882685-31e1-451b-9006-cd4ff0dcf750',''), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','5019c5be-c7bd-47b0-a5b3-403a98162efe','\0'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','596a6555-3ee8-4aa9-8168-b8f0de92dbb1','\0'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','781fbb54-8552-44a0-9ea2-fab43dcf0b24',''), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','78975493-67a3-4819-a933-47b99c7c7e60',''), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','b94018cc-b7f3-43e2-b5ba-3e93a48f739e','\0'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','ccb9d5ba-bf89-4762-81c1-c9c87da7d1e8','\0'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','d6da6000-2013-417d-ad33-33f0804b5b80',''), ('53ef6db9-271e-46c5-bd72-2f12ea045014','0c885a01-891a-481f-9087-f6567af22b13',''), ('53ef6db9-271e-46c5-bd72-2f12ea045014','434407ef-1d7f-45e8-b91c-7db10210760a','\0'), ('53ef6db9-271e-46c5-bd72-2f12ea045014','4e882685-31e1-451b-9006-cd4ff0dcf750',''), @@ -817,6 +862,16 @@ INSERT INTO `CLIENT_SCOPE_CLIENT` VALUES ('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','78975493-67a3-4819-a933-47b99c7c7e60',''), ('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','ccb9d5ba-bf89-4762-81c1-c9c87da7d1e8','\0'), ('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','d6da6000-2013-417d-ad33-33f0804b5b80',''), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','0c885a01-891a-481f-9087-f6567af22b13',''), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','434407ef-1d7f-45e8-b91c-7db10210760a','\0'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','4e882685-31e1-451b-9006-cd4ff0dcf750',''), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','5019c5be-c7bd-47b0-a5b3-403a98162efe','\0'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','596a6555-3ee8-4aa9-8168-b8f0de92dbb1','\0'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','781fbb54-8552-44a0-9ea2-fab43dcf0b24',''), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','78975493-67a3-4819-a933-47b99c7c7e60',''), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','b94018cc-b7f3-43e2-b5ba-3e93a48f739e','\0'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','ccb9d5ba-bf89-4762-81c1-c9c87da7d1e8','\0'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','d6da6000-2013-417d-ad33-33f0804b5b80',''), ('8d0348ea-8e2b-47bf-a95b-69df3d711ebf','0c885a01-891a-481f-9087-f6567af22b13',''), ('8d0348ea-8e2b-47bf-a95b-69df3d711ebf','434407ef-1d7f-45e8-b91c-7db10210760a','\0'), ('8d0348ea-8e2b-47bf-a95b-69df3d711ebf','4e882685-31e1-451b-9006-cd4ff0dcf750',''), @@ -1569,6 +1624,7 @@ INSERT INTO `DEFAULT_CLIENT_SCOPE` VALUES ('139c1488-d000-4061-922b-0c0b518a57db','781fbb54-8552-44a0-9ea2-fab43dcf0b24',''), ('139c1488-d000-4061-922b-0c0b518a57db','78975493-67a3-4819-a933-47b99c7c7e60',''), ('139c1488-d000-4061-922b-0c0b518a57db','7c7de55b-c72a-4006-9b14-db1398fed22f',''), +('139c1488-d000-4061-922b-0c0b518a57db','b94018cc-b7f3-43e2-b5ba-3e93a48f739e','\0'), ('139c1488-d000-4061-922b-0c0b518a57db','ccb9d5ba-bf89-4762-81c1-c9c87da7d1e8','\0'), ('139c1488-d000-4061-922b-0c0b518a57db','d6da6000-2013-417d-ad33-33f0804b5b80',''), ('61c254e2-095d-42b9-b8cc-4546b124e548','0526da56-aab3-455b-9cc8-2d3d8b0457d6',''), @@ -3186,12 +3242,15 @@ CREATE TABLE `REDIRECT_URIS` ( LOCK TABLES `REDIRECT_URIS` WRITE; /*!40000 ALTER TABLE `REDIRECT_URIS` DISABLE KEYS */; INSERT INTO `REDIRECT_URIS` VALUES +('23a13a94-aaa0-4e53-8b58-76480755f39e','/*'), ('2f7d86a0-e8ba-4b75-9009-2048c5611177','/realms/master/account/*'), ('34bb26a0-d197-48a9-a0e2-4987dec23d0e','/*'), ('49a49ecd-6045-42e4-9043-edf917f74b18','/*'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','/*'), ('53ef6db9-271e-46c5-bd72-2f12ea045014','/*'), ('79748e7e-06c2-4915-988c-0e30b15d12db','/admin/master/console/*'), ('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','/admin/ruciodev/console/*'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','/*'), ('9c1f7e9e-8703-4cca-82b0-d944dbf29287','/realms/ruciodev/account/*'), ('a4c24db6-3fe8-4b9c-a183-42a75148d531','/realms/ruciodev/account/*'), ('e32dde36-fa71-4648-aa46-ad822a2b51b6','/realms/master/account/*'); @@ -4136,11 +4195,14 @@ CREATE TABLE `WEB_ORIGINS` ( LOCK TABLES `WEB_ORIGINS` WRITE; /*!40000 ALTER TABLE `WEB_ORIGINS` DISABLE KEYS */; INSERT INTO `WEB_ORIGINS` VALUES +('23a13a94-aaa0-4e53-8b58-76480755f39e','/*'), ('34bb26a0-d197-48a9-a0e2-4987dec23d0e','/*'), ('49a49ecd-6045-42e4-9043-edf917f74b18','/*'), +('4eca1394-63e6-475c-b176-1d1fe1fb2a45','/*'), ('53ef6db9-271e-46c5-bd72-2f12ea045014','/*'), ('79748e7e-06c2-4915-988c-0e30b15d12db','+'), -('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','+'); +('8b2528fe-d14e-4b36-8b3d-4a44b89bd6dc','+'), +('8c068e03-9b32-48ec-9bde-0d5d2ca62274','/*'); /*!40000 ALTER TABLE `WEB_ORIGINS` ENABLE KEYS */; UNLOCK TABLES; /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; diff --git a/etc/docker/dev/xrd/configure_qbittorrent.py b/etc/docker/dev/xrd/configure_qbittorrent.py new file mode 100644 index 00000000000..3e28dcb91a3 --- /dev/null +++ b/etc/docker/dev/xrd/configure_qbittorrent.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re +import os +import sys +import time +import urllib.request +import urllib.parse + + +def wait_for_server(host, port, max_wait): + timer = 0 + while timer < max_wait: + try: + response = urllib.request.urlopen(urllib.request.Request(f'http://{host}:{port}')) + if response.status == 200: + return + except Exception: + if timer > max_wait: + raise + time.sleep(1) + timer += 1 + + +def configure( + host, + port, + username, + password, + new_config, +): + scheme = 'http' + headers = { + 'content-type': 'application/x-www-form-urlencoded' + } + + # Authenticate + req = urllib.request.Request( + f'{scheme}://{host}:{port}/api/v2/auth/login', + headers=headers, + data=urllib.parse.urlencode({ + 'username': username, + 'password': password + }).encode(), + method='POST', + ) + response = urllib.request.urlopen(req) + headers['Cookie'] = response.getheader("Set-Cookie") + + # Update the config + req = urllib.request.Request( + f'{scheme}://{host}:{port}/api/v2/app/setPreferences', + headers=headers, + data=urllib.parse.urlencode({'json': json.dumps(new_config)}).encode(), + method='POST', + ) + urllib.request.urlopen(req) + + scheme = 'https' if new_config.get('use_https') else scheme + port = new_config.get('web_ui_port', port) + + # Logout + req = urllib.request.Request(f'{scheme}://{host}:{port}/api/v2/auth/logout', headers=headers, method='POST') + urllib.request.urlopen(req) + + +if __name__ == "__main__": + initial_config_done = False + extract_password = re.compile(r'.*: ([23456789ABCDEFGHIJKLMNPQRSTUVWXYZabcdefghjkmnpqrstuvwxyz]{9})$') + for line in sys.stdin: + if initial_config_done: + continue + + automatic_password = extract_password.search(line) + if not automatic_password: + continue + automatic_password = automatic_password.group(1) + + port = 8080 + host = 'localhost' + + wait_for_server(host=host, port=port, max_wait=60) + + config = { + 'listen_port': int(os.environ.get('QBITTORRENT_LISTEN_PORT', 10000)), + # 'ssl_enabled': True, + # 'ssl_listen_port': 20000, + 'upnp': False, + 'dht': False, + 'pex': False, + 'lsd': False, + 'encryption': 1, # require encryption + 'bypass_local_auth': False, + 'web_ui_upnp': False, + # 'web_ui_address': '', + # 'enable_embedded_tracker': False, + # 'embedded_tracker_port': 9000, + 'enable_multi_connections_from_same_ip': True, + } + + if os.environ.get('QBITTORRENT_UI_PASSWORD'): + config['web_ui_password'] = os.environ['QBITTORRENT_UI_PASSWORD'] + if os.environ.get('QBITTORRENT_UI_USERNAME'): + config['web_ui_username'] = os.environ['QBITTORRENT_UI_USERNAME'] + if os.environ.get('QBITTORRENT_UI_PORT'): + config['web_ui_port'] = os.environ['QBITTORRENT_UI_PORT'] + + if os.environ.get('QBITTORRENT_UI_CERT') and os.environ.get('QBITTORRENT_UI_KEY'): + config['use_https'] = True + config['web_ui_https_cert_path'] = os.environ['QBITTORRENT_UI_CERT'] + config['web_ui_https_key_path'] = os.environ['QBITTORRENT_UI_KEY'] + + configure( + host=host, + port=port, + username='admin', + password=automatic_password, + new_config=config, + ) + + initial_config_done = True diff --git a/etc/docker/dev/xrd/entrypoint.sh b/etc/docker/dev/xrd/entrypoint.sh index e0453748475..a77ed291698 100755 --- a/etc/docker/dev/xrd/entrypoint.sh +++ b/etc/docker/dev/xrd/entrypoint.sh @@ -24,6 +24,11 @@ cp /tmp/xrdkey.pem /etc/grid-security/xrd/xrdkey.pem chown -R xrootd:xrootd /etc/grid-security/xrd chmod 0400 /etc/grid-security/xrd/xrdkey.pem -xrootd -R xrootd -n rucio -c /etc/xrootd/xrdrucio.cfg +if [ -n "$QBITTORRENT_UI_PORT" ] +then + export QBITTORRENT_UI_CERT=/tmp/xrdcert.pem + export QBITTORRENT_UI_KEY=/tmp/xrdkey.pem + su -s /bin/bash -c qbittorrent-nox xrootd | tee >(python3 /configs/configure_qbittorrent.py) & +fi -exec "$@" +xrootd -R xrootd -n rucio -c /etc/xrootd/xrdrucio.cfg diff --git a/etc/docker/test/extra/rucio_autotests_common.cfg b/etc/docker/test/extra/rucio_autotests_common.cfg index 9d17512d533..c3117192f4d 100644 --- a/etc/docker/test/extra/rucio_autotests_common.cfg +++ b/etc/docker/test/extra/rucio_autotests_common.cfg @@ -41,7 +41,7 @@ carbon_port = 8125 user_scope = travis [conveyor] -scheme = srm,root,davs,gsiftp,http,https,mock,file +scheme = srm,root,davs,gsiftp,http,https,mock,file,magnet transfertool = fts3 ftshosts = https://fts:8446 cacert = /opt/rucio/etc/rucio_ca.pem diff --git a/etc/docker/test/extra/rucio_default.cfg b/etc/docker/test/extra/rucio_default.cfg index fc9cb420f0c..13319509f4a 100644 --- a/etc/docker/test/extra/rucio_default.cfg +++ b/etc/docker/test/extra/rucio_default.cfg @@ -48,7 +48,7 @@ carbon_port = 8125 user_scope = docker [conveyor] -scheme = https,davs,gsiftp,root,srm,mock,file +scheme = https,davs,gsiftp,root,srm,mock,file,magnet #scheme = https #user_transfers = cms #user_activities = ['dummy_user_activity'] diff --git a/lib/rucio/client/uploadclient.py b/lib/rucio/client/uploadclient.py index 0a29d90648b..a361ad49964 100644 --- a/lib/rucio/client/uploadclient.py +++ b/lib/rucio/client/uploadclient.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import copy import json import logging @@ -30,7 +31,7 @@ ResourceTemporaryUnavailable, ServiceUnavailable, InputValidationError, RSEChecksumUnavailable, ScopeNotFound) from rucio.common.utils import (adler32, detect_client_location, execute, generate_uuid, make_valid_did, md5, send_trace, - retry, GLOBALLY_SUPPORTED_CHECKSUMS) + retry, bittorrent_v2_merkle_sha256, GLOBALLY_SUPPORTED_CHECKSUMS) from rucio.rse import rsemanager as rsemgr @@ -336,6 +337,16 @@ def _pick_random_rse(rse_expression): raise NotAllFilesUploaded() return 0 + def _add_bittorrent_meta(self, file, logger): + pieces_root, pieces_layers, piece_length = bittorrent_v2_merkle_sha256(os.path.join(file['dirname'], file['basename'])) + bittorrent_meta = { + 'bittorrent_pieces_root': base64.b64encode(pieces_root).decode(), + 'bittorrent_pieces_layers': base64.b64encode(pieces_layers).decode(), + 'bittorrent_piece_length': piece_length, + } + self.client.set_metadata_bulk(scope=file['did_scope'], name=file['did_name'], meta=bittorrent_meta) + logger(logging.INFO, 'Added Bittorrent did meta') + def _register_file(self, file, registered_dataset_dids, ignore_availability=False, activity=None): """ Registers the given file in Rucio. Creates a dataset if @@ -404,12 +415,14 @@ def _register_file(self, file, registered_dataset_dids, ignore_availability=Fals # add file to rse if it is not registered yet replicastate = list(self.client.list_replicas([file_did], all_states=True)) + self._add_bittorrent_meta(file=file, logger=logger) if rse not in replicastate[0]['rses']: self.client.add_replicas(rse=rse, files=[replica_for_api]) logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse) except DataIdentifierNotFound: logger(logging.DEBUG, 'File DID does not exist') self.client.add_replicas(rse=rse, files=[replica_for_api]) + self._add_bittorrent_meta(file=file, logger=logger) logger(logging.INFO, 'Successfully added replica in Rucio catalogue at %s' % rse) if not dataset_did_str: # only need to add rules for files if no dataset is given diff --git a/lib/rucio/common/constants.py b/lib/rucio/common/constants.py index cdd07310086..aab2ae14d01 100644 --- a/lib/rucio/common/constants.py +++ b/lib/rucio/common/constants.py @@ -48,7 +48,7 @@ SCHEME_MAP['srm'].append('davs') SCHEME_MAP['davs'].append('srm') -SUPPORTED_PROTOCOLS = ['gsiftp', 'srm', 'root', 'davs', 'http', 'https', 'file', 'storm', 'srm+https', 'scp', 'rsync', 'rclone'] +SUPPORTED_PROTOCOLS = ['gsiftp', 'srm', 'root', 'davs', 'http', 'https', 'file', 'storm', 'srm+https', 'scp', 'rsync', 'rclone', 'magnet'] FTS_STATE = namedtuple('FTS_STATE', ['SUBMITTED', 'READY', 'ACTIVE', 'FAILED', 'FINISHED', 'FINISHEDDIRTY', 'NOT_USED', 'CANCELED'])('SUBMITTED', 'READY', 'ACTIVE', 'FAILED', 'FINISHED', 'FINISHEDDIRTY', diff --git a/lib/rucio/common/types.py b/lib/rucio/common/types.py index 7c38982b010..c3b0628dd8a 100644 --- a/lib/rucio/common/types.py +++ b/lib/rucio/common/types.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Optional, TypedDict, Union +from typing import Any, Callable, Optional, TypedDict, Union class InternalType(object): @@ -104,6 +104,9 @@ def __init__(self, scope, vo='def', fromExternal=True): super(InternalScope, self).__init__(value=scope, vo=vo, fromExternal=fromExternal) +LoggerFunction = Callable[..., Any] + + class RSEDomainLANDict(TypedDict): read: Optional[int] write: Optional[int] diff --git a/lib/rucio/common/utils.py b/lib/rucio/common/utils.py index c2647dab907..fd513b63f5e 100644 --- a/lib/rucio/common/utils.py +++ b/lib/rucio/common/utils.py @@ -15,11 +15,13 @@ import argparse import base64 +import copy import datetime import errno import getpass import hashlib import io +import ipaddress import itertools import json import logging @@ -43,6 +45,7 @@ from uuid import uuid4 as uuid from xml.etree import ElementTree +import math import mmap import requests import zlib @@ -63,7 +66,7 @@ if TYPE_CHECKING: from collections.abc import Callable - from typing import TypeVar + from typing import TypeVar, Optional T = TypeVar('T') @@ -347,6 +350,218 @@ def crc32(file): CHECKSUM_ALGO_DICT['crc32'] = crc32 +def _next_pow2(num): + if not num: + return 0 + return math.ceil(math.log2(num)) + + +def _bittorrent_v2_piece_length_pow2(file_size: int) -> int: + """ + Automatically chooses the `piece size` so that `piece layers` + is kept small(er) than usually. This is a balancing act: + having a big piece_length requires more work on bittorrent client + side to validate hashes, but having it small requires more + place to store the `piece layers` in the database. + + Returns the result as the exponent 'x' for power of 2. + To get the actual length in bytes, the caller should compute 2^x. + """ + + # by the bittorrent v2 specification, the piece size is equal to block size = 16KiB + min_piece_len_pow2 = 14 # 2 ** 14 == 16 KiB + if not file_size: + return min_piece_len_pow2 + # Limit the maximum size of pieces_layers hash chain for bittorrent v2, + # because we'll have to store it in the database + max_pieces_layers_size_pow2 = 20 # 2 ** 20 == 1 MiB + # sha256 requires 2 ** 5 == 32 Bytes == 256 bits + hash_size_pow2 = 5 + + # The closest power of two bigger than the file size + file_size_pow2 = _next_pow2(file_size) + + # Compute the target size for the 'pieces layers' in the torrent + # (as power of two: the closest power-of-two smaller than the number) + # Will cap at max_pieces_layers_size for files larger than 1TB. + target_pieces_layers_size = math.sqrt(file_size) + target_pieces_layers_size_pow2 = min(math.floor(math.log2(target_pieces_layers_size)), max_pieces_layers_size_pow2) + target_piece_num_pow2 = max(target_pieces_layers_size_pow2 - hash_size_pow2, 0) + + piece_length_pow2 = max(file_size_pow2 - target_piece_num_pow2, min_piece_len_pow2) + return piece_length_pow2 + + +def bittorrent_v2_piece_length(file_size: int) -> int: + return 2 ** _bittorrent_v2_piece_length_pow2(file_size) + + +def bittorrent_v2_merkle_sha256(file) -> tuple[bytes, bytes, int]: + """ + Compute the .torrent v2 hash tree for the given file. + (http://www.bittorrent.org/beps/bep_0052.html) + In particular, it will return the root of the merkle hash + tree of the file, the 'piece layers' as described in the + previous BEP, and the chosen `piece size` + + This function will read the file in chunks of 16KiB + (which is the imposed block size by bittorrent v2) and compute + the sha256 hash of each block. When enough blocks are read + to form a `piece`, will compute the merkle hash root of the + piece from the hashes of its blocks. At the end, the hashes + of pieces are combined to create the global pieces_root. + """ + + # by the bittorrent v2 specification, the block size and the + # minimum piece size are both fixed to 16KiB + block_size = 16384 + block_size_pow2 = 14 # 2 ** 14 == 16 KiB + # sha256 requires 2 ** 5 == 32 Bytes == 256 bits + hash_size = 32 + + def _merkle_root(leafs: list[bytes], nb_levels: int, padding: bytes) -> bytes: + """ + Build the root of the merkle hash tree from the (possibly incomplete) leafs layer. + If len(leafs) < 2 ** nb_levels, it will be padded with the padding repeated as many times + as needed to have 2 ** nb_levels leafs in total. + """ + nodes = copy.copy(leafs) + level = nb_levels + + while level > 0: + for i in range(2 ** (level - 1)): + node1 = nodes[2 * i] if 2 * i < len(nodes) else padding + node2 = nodes[2 * i + 1] if 2 * i + 1 < len(nodes) else padding + h = hashlib.sha256(node1) + h.update(node2) + if i < len(nodes): + nodes[i] = h.digest() + else: + nodes.append(h.digest()) + level -= 1 + return nodes[0] if nodes else padding + + file_size = os.stat(file).st_size + piece_length_pow2 = _bittorrent_v2_piece_length_pow2(file_size) + + block_per_piece_pow2 = piece_length_pow2 - block_size_pow2 + piece_length = 2 ** piece_length_pow2 + block_per_piece = 2 ** block_per_piece_pow2 + piece_num = math.ceil(file_size / piece_length) + + remaining = file_size + remaining_in_block = min(file_size, block_size) + block_hashes = [] + piece_hashes = [] + current_hash = hashlib.sha256() + block_padding = bytes(hash_size) + with open(file, 'rb') as f: + while True: + data = f.read(remaining_in_block) + if not data: + break + + current_hash.update(data) + + remaining_in_block -= len(data) + remaining -= len(data) + + if not remaining_in_block: + block_hashes.append(current_hash.digest()) + if len(block_hashes) == block_per_piece or not remaining: + piece_hashes.append(_merkle_root(block_hashes, nb_levels=block_per_piece_pow2, padding=block_padding)) + block_hashes = [] + current_hash = hashlib.sha256() + remaining_in_block = min(block_size, remaining) + + if not remaining: + break + + if remaining or remaining_in_block or len(piece_hashes) != piece_num: + raise RucioException(f'Error while computing merkle sha256 of {file}') + + piece_padding = _merkle_root([], nb_levels=block_per_piece_pow2, padding=block_padding) + pieces_root = _merkle_root(piece_hashes, nb_levels=_next_pow2(piece_num), padding=piece_padding) + pieces_layers = b''.join(piece_hashes) if len(piece_hashes) > 1 else b'' + + return pieces_root, pieces_layers, piece_length + + +def merkle_sha256(file) -> str: + """ + The root of the sha256 merkle hash tree with leaf size of 16 KiB. + """ + pieces_root, _, _ = bittorrent_v2_merkle_sha256(file) + return pieces_root.hex() + + +CHECKSUM_ALGO_DICT['merkle_sha256'] = merkle_sha256 + + +def bencode(obj) -> bytes: + """ + Copied from the reference implementation of v2 bittorrent: + http://bittorrent.org/beps/bep_0052_torrent_creator.py + """ + + if isinstance(obj, int): + return b"i" + str(obj).encode() + b"e" + elif isinstance(obj, bytes): + return str(len(obj)).encode() + b":" + obj + elif isinstance(obj, str): + return bencode(obj.encode("utf-8")) + elif isinstance(obj, list): + return b"l" + b"".join(map(bencode, obj)) + b"e" + elif isinstance(obj, dict): + if all(isinstance(i, bytes) for i in obj.keys()): + items = list(obj.items()) + items.sort() + return b"d" + b"".join(map(bencode, itertools.chain(*items))) + b"e" + else: + raise ValueError("dict keys should be bytes " + str(obj.keys())) + raise ValueError("Allowed types: int, bytes, list, dict; not %s", type(obj)) + + +def construct_torrent( + scope: str, + name: str, + length: int, + piece_length: int, + pieces_root: bytes, + pieces_layers: "Optional[bytes]" = None, + trackers: "Optional[list[str]]" = None, +) -> "tuple[str, bytes]": + + torrent_dict = { + b'creation date': int(time.time()), + b'info': { + b'meta version': 2, + b'private': 1, + b'name': f'{scope}:{name}'.encode(), + b'piece length': piece_length, + b'file tree': { + name.encode(): { + b'': { + b'length': length, + b'pieces root': pieces_root, + } + } + } + }, + b'piece layers': {}, + } + if trackers: + torrent_dict[b'announce'] = trackers[0].encode() + if len(trackers) > 1: + torrent_dict[b'announce-list'] = [t.encode() for t in trackers] + if pieces_layers: + torrent_dict[b'piece layers'][pieces_root] = pieces_layers + + torrent_id = hashlib.sha256(bencode(torrent_dict[b'info'])).hexdigest()[:40] + torrent = bencode(torrent_dict) + return torrent_id, torrent + + def str_to_date(string): """ Converts a RFC-1123 string to the corresponding datetime value. @@ -908,6 +1123,27 @@ class Color: END = '\033[0m' +def resolve_ips(hostname: str) -> list[str]: + try: + ipaddress.ip_address(hostname) + return [hostname] + except ValueError: + pass + try: + addrinfo = socket.getaddrinfo(hostname, 0, socket.AF_INET, 0, socket.IPPROTO_TCP) + return [ai[4][0] for ai in addrinfo] + except socket.gaierror: + pass + return [] + + +def resolve_ip(hostname: str): + ips = resolve_ips(hostname) + if ips: + return ips[0] + return None + + def detect_client_location(): """ Normally client IP will be set on the server side (request.remote_addr) diff --git a/lib/rucio/core/request.py b/lib/rucio/core/request.py index 4343110d302..76cdf872061 100644 --- a/lib/rucio/core/request.py +++ b/lib/rucio/core/request.py @@ -774,8 +774,8 @@ def get_and_mark_next( dst_id = res_dict['dest_rse_id'] src_id = res_dict['source_rse_id'] - res_dict['dst_rse'] = rse_collection[dst_id].ensure_loaded(load_name=True) - res_dict['src_rse'] = rse_collection[src_id].ensure_loaded(load_name=True) if src_id is not None else None + res_dict['dst_rse'] = rse_collection[dst_id].ensure_loaded(load_name=True, load_attributes=True) + res_dict['src_rse'] = rse_collection[src_id].ensure_loaded(load_name=True, load_attributes=True) if src_id is not None else None result.append(res_dict) else: diff --git a/lib/rucio/core/transfer.py b/lib/rucio/core/transfer.py index bad8f486bf2..2810c74c5dd 100644 --- a/lib/rucio/core/transfer.py +++ b/lib/rucio/core/transfer.py @@ -45,14 +45,15 @@ from rucio.db.sqla.constants import DIDType, RequestState, RequestType, TransferLimitDirection from rucio.db.sqla.session import read_session, transactional_session, stream_session from rucio.rse import rsemanager as rsemgr -from rucio.transfertool.transfertool import TransferStatusReport +from rucio.transfertool.transfertool import TransferStatusReport, Transfertool +from rucio.transfertool.bittorrent import BittorrentTransfertool from rucio.transfertool.fts3 import FTS3Transfertool from rucio.transfertool.globus import GlobusTransferTool from rucio.transfertool.mock import MockTransfertool if TYPE_CHECKING: from collections.abc import Callable, Iterator, Iterable, Mapping, Sequence - from typing import Any, Optional + from typing import Any, Optional, Type from sqlalchemy.orm import Session from rucio.common.types import InternalAccount from rucio.core.topology import Topology @@ -72,10 +73,11 @@ DEFAULT_MULTIHOP_TOMBSTONE_DELAY = int(datetime.timedelta(hours=2).total_seconds()) -TRANSFERTOOL_CLASSES_BY_NAME = { +TRANSFERTOOL_CLASSES_BY_NAME: "dict[str, Type[Transfertool]]" = { FTS3Transfertool.external_name: FTS3Transfertool, GlobusTransferTool.external_name: GlobusTransferTool, MockTransfertool.external_name: MockTransfertool, + BittorrentTransfertool.external_name: BittorrentTransfertool, } @@ -157,6 +159,12 @@ def source_url(self, source: RequestSource) -> str: ) return url + def dest_protocol(self): + return self.protocol_factory.protocol(self.dst.rse, self.dst.scheme, self.operation_dest) + + def source_protocol(self, source: RequestSource): + return self.protocol_factory.protocol(source.rse, source.scheme, self.operation_src) + @property def use_ipv4(self): # If any source or destination rse is ipv4 only @@ -1444,7 +1452,7 @@ def prepare_transfers( logger(logging.WARNING, '%s: all available sources were filtered', rws) continue - update_dict: dict[Any, Any] = { + update_dict: "dict[Any, Any]" = { models.Request.state.name: _throttler_request_state( activity=rws.activity, source_rse=selected_source.rse, diff --git a/lib/rucio/daemons/conveyor/poller.py b/lib/rucio/daemons/conveyor/poller.py index 2958ccd7178..234ebabfd8b 100644 --- a/lib/rucio/daemons/conveyor/poller.py +++ b/lib/rucio/daemons/conveyor/poller.py @@ -45,8 +45,6 @@ from rucio.db.sqla.constants import RequestState, RequestType from rucio.transfertool.transfertool import Transfertool from rucio.transfertool.fts3 import FTS3Transfertool -from rucio.transfertool.globus import GlobusTransferTool -from rucio.transfertool.mock import MockTransfertool if TYPE_CHECKING: from rucio.daemons.common import HeartbeatHandler @@ -132,18 +130,22 @@ def _handle_requests( for chunk in dict_chunks(transfers_by_eid, fts_bulk): try: - if transfertool == 'mock': - transfertool_obj = MockTransfertool(external_host=MockTransfertool.external_name) - elif transfertool == 'globus': - transfertool_obj = GlobusTransferTool(external_host=GlobusTransferTool.external_name) - else: + transfertool_cls = transfer_core.TRANSFERTOOL_CLASSES_BY_NAME.get(transfertool, FTS3Transfertool) + + transfertool_kwargs = {} + if transfertool_cls.external_name == FTS3Transfertool.external_name: account = None if oidc_account: if vo: account = InternalAccount(oidc_account, vo=vo) else: account = InternalAccount(oidc_account) - transfertool_obj = FTS3Transfertool(external_host=external_host, vo=vo, oidc_account=account) + transfertool_kwargs.update({ + 'vo': vo, + 'oidc_account': account, + }) + + transfertool_obj = transfertool_cls(external_host=external_host, **transfertool_kwargs) poll_transfers( transfertool_obj=transfertool_obj, transfers_by_eid=chunk, diff --git a/lib/rucio/rse/protocols/bittorrent.py b/lib/rucio/rse/protocols/bittorrent.py new file mode 100644 index 00000000000..9b09d66a5c2 --- /dev/null +++ b/lib/rucio/rse/protocols/bittorrent.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import logging +import os.path +import time +from urllib.parse import urlparse, urlencode, parse_qs + +from rucio.common import exception +from rucio.common.utils import construct_torrent, resolve_ip +from rucio.rse.protocols.protocol import RSEProtocol +from rucio.rse import rsemanager + +from rucio.common.extra import import_extras + +EXTRA_MODULES = import_extras(['libtorrent']) + +lt = None +if EXTRA_MODULES['libtorrent']: + import libtorrent as lt # pylint: disable=E0401 + +if getattr(rsemanager, 'CLIENT_MODE', None): + from rucio.client.didclient import DIDClient + + def _fetch_meta(scope: str, name: str): + return DIDClient().get_metadata(scope=scope, name=name, plugin='all') + +else: + from rucio.common.types import InternalScope + from rucio.core.did import get_metadata + from rucio.core.rse import get_rse_vo + + def _fetch_meta(rse_id: str, scope: str, name: str): + vo = get_rse_vo(rse_id) + scope = InternalScope(scope, vo=vo) + return get_metadata(scope=scope, name=name, plugin='all') + + +class Default(RSEProtocol): + + def __init__(self, protocol_attr, rse_settings, logger=logging.log): + super(Default, self).__init__(protocol_attr, rse_settings, logger=logger) + self.logger = logger + + def lfns2pfns(self, lfns): + pfns = {} + prefix = self.attributes['prefix'] + + if not prefix.startswith('/'): + prefix = ''.join(['/', prefix]) + if not prefix.endswith('/'): + prefix = ''.join([prefix, '/']) + + host_port = '%s:%s' % (self.attributes['hostname'], str(self.attributes['port'])) + + lfns = [lfns] if isinstance(lfns, dict) else lfns + for lfn in lfns: + scope, name = lfn['scope'], lfn['name'] + + if 'path' in lfn and lfn['path'] is not None: + path = lfn['path'] if not lfn['path'].startswith('/') else lfn['path'][1:] + else: + path = self._get_path(scope=scope, name=name) + + scope_name = '%s:%s' % (scope, name) + + query = { + 'x.pe': host_port, + 'x.rucio_scope': scope, + 'x.rucio_name': name, + 'x.rucio_path': ''.join((prefix, path)) + } + pfns[scope_name] = 'magnet:?' + urlencode(query) + + return pfns + + def parse_pfns(self, pfns): + ret = dict() + pfns = [pfns] if isinstance(pfns, str) else pfns + + for pfn in pfns: + parsed = urlparse(pfn) + scheme = parsed.scheme + + query = parse_qs(parsed.query) + host_port = next(iter(query.get('x.pe', [])), ':') + hostname, port = host_port.split(':') + port = int(port) + path = next(iter(query.get('x.rucio_path', [])), '') + scope = next(iter(query.get('x.rucio_scope', [])), '') + name = next(iter(query.get('x.rucio_name', [])), '') + + # Protect against 'lazy' defined prefixes for RSEs in the repository + if not self.attributes['prefix'].startswith('/'): + self.attributes['prefix'] = '/' + self.attributes['prefix'] + if not self.attributes['prefix'].endswith('/'): + self.attributes['prefix'] += '/' + + if self.attributes['hostname'] != hostname: + if self.attributes['hostname'] != 'localhost': # In the database empty hostnames are replaced with localhost but for some URIs (e.g. file) a hostname is not included + raise exception.RSEFileNameNotSupported('Invalid hostname: provided \'%s\', expected \'%s\'' % (hostname, self.attributes['hostname'])) + + if self.attributes['port'] != port: + raise exception.RSEFileNameNotSupported('Invalid port: provided \'%s\', expected \'%s\'' % (port, self.attributes['port'])) + + if not path.startswith(self.attributes['prefix']): + raise exception.RSEFileNameNotSupported('Invalid prefix: provided \'%s\', expected \'%s\'' % ('/'.join(path.split('/')[0:len(self.attributes['prefix'].split('/')) - 1]), + self.attributes['prefix'])) # len(...)-1 due to the leading '/ + + # Spliting parsed.path into prefix, path, filename + prefix = self.attributes['prefix'] + path = path.partition(self.attributes['prefix'])[2] + path = '/'.join(path.split('/')[:-1]) + if not path.startswith('/'): + path = '/' + path + if path != '/' and not path.endswith('/'): + path = path + '/' + ret[pfn] = {'path': path, 'scope': scope, 'name': name, 'scheme': scheme, 'prefix': prefix, 'port': port, 'hostname': hostname, } + + return ret + + def connect(self): + pass + + def close(self): + pass + + def get(self, path, dest, transfer_timeout=None): + if not lt: + raise exception.RucioException('The libtorrent python package is required to perform this operation') + + [lfn] = self.parse_pfns([path]).values() + scope = lfn['scope'] + name = lfn['name'] + hostname = lfn['hostname'] + port = lfn['port'] + + meta = _fetch_meta(rse_id=self.rse['id'], scope=scope, name=name) + pieces_root = base64.b64decode(meta.get('bittorrent_pieces_root', '')) + if not pieces_root: + raise exception.RucioException('Torrent metadata missing. Cannot download file.') + + length = meta.get('bytes') + piece_length = meta.get('bittorrent_piece_length', 0) + pieces_layers = base64.b64decode(meta.get('bittorrent_pieces_layers', '')) + + _, torrent = construct_torrent( + scope=scope, + name=name, + length=length, + piece_length=piece_length, + pieces_root=pieces_root, + pieces_layers=pieces_layers, + ) + + ses = lt.session() + params = { + 'ti': lt.torrent_info(torrent), + 'save_path': os.path.dirname(dest), + 'name': os.path.basename(dest), + 'renamed_files': {0: os.path.basename(dest)}, + } + + handle = ses.add_torrent(params) + try: + handle.resume() + handle.connect_peer((resolve_ip(hostname), port)) + while handle.status().progress != 1.0: + time.sleep(0.25) + finally: + ses.remove_torrent(handle) diff --git a/lib/rucio/transfertool/bittorrent.py b/lib/rucio/transfertool/bittorrent.py new file mode 100644 index 00000000000..8c1c0aeb699 --- /dev/null +++ b/lib/rucio/transfertool/bittorrent.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import logging +from os import path +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Type + +from rucio.common import types +from rucio.common.config import config_get +from rucio.common.extra import import_extras +from rucio.common.utils import construct_torrent +from rucio.core.did_meta_plugins import get_metadata +from rucio.transfertool.transfertool import Transfertool, TransferToolBuilder, TransferStatusReport +from .bittorrent_driver import BittorrentDriver + +if TYPE_CHECKING: + from rucio.core.request import DirectTransfer + from rucio.core.rse import RseData + +DRIVER_NAME_RSE_ATTRIBUTE = 'bittorrent_driver' +DRIVER_CLASSES_BY_NAME: dict[str, Type[BittorrentDriver]] = {} + +EXTRA_MODULES = import_extras(['qbittorrentapi']) + +if EXTRA_MODULES['qbittorrentapi']: + from .bittorrent_driver_qbittorrent import QBittorrentDriver + DRIVER_CLASSES_BY_NAME[QBittorrentDriver.external_name] = QBittorrentDriver + + +class BittorrentTransfertool(Transfertool): + """ + Use bittorrent to perform the peer-to-peer transfer. + """ + external_name = 'bittorrent' + supported_schemes = {'magnet'} + + required_rse_attrs = (DRIVER_NAME_RSE_ATTRIBUTE, ) + + def __init__(self, external_host: str, logger: types.LoggerFunction = logging.log) -> None: + super().__init__(external_host=external_host, logger=logger) + + self._drivers_by_rse_id = {} + self.ca_cert, self.ca_key = None, None + + self.tracker = config_get('transfers', 'bittorrent-tracker-addr', raise_exception=False, default=None) + + @classmethod + def _pick_management_api_driver_cls(cls: "Type[BittorrentTransfertool]", rse: "RseData") -> Optional[Type[BittorrentDriver]]: + driver_cls = DRIVER_CLASSES_BY_NAME.get(rse.attributes.get(DRIVER_NAME_RSE_ATTRIBUTE, '')) + if driver_cls is None: + return None + if not all(rse.attributes.get(attribute) is not None for attribute in driver_cls.required_rse_attrs): + return None + return driver_cls + + def _driver_for_rse(self, rse: "RseData") -> Optional[BittorrentDriver]: + driver = self._drivers_by_rse_id.get(rse.id) + if driver: + return driver + + driver_cls = self._pick_management_api_driver_cls(rse) + if not driver_cls: + return None + + driver = driver_cls.make_driver(rse) + self._drivers_by_rse_id[rse.id] = driver + return driver + + @staticmethod + def _get_torrent_meta(scope: "types.InternalScope", name: str) -> tuple[bytes, bytes, int]: + meta = get_metadata(scope=scope, name=name, plugin='all') + pieces_root = base64.b64decode(meta.get('bittorrent_pieces_root', '')) + pieces_layers = base64.b64decode(meta.get('bittorrent_pieces_layers', '')) + piece_length = meta.get('bittorrent_piece_length', 0) + return pieces_root, pieces_layers, piece_length + + @classmethod + def submission_builder_for_path( + cls: "Type[BittorrentTransfertool]", + transfer_path: "list[DirectTransfer]", + logger: types.LoggerFunction = logging.log + ) -> "tuple[list[DirectTransfer], Optional[TransferToolBuilder]]": + hop = transfer_path[0] + if hop.rws.byte_count == 0: + logger(logging.INFO, f"Bittorrent cannot transfer fully empty torrents. Skipping {hop}") + return [], None + + if not cls.can_perform_transfer(hop.src.rse, hop.dst.rse): + logger(logging.INFO, f"The required RSE attributes are not set. Skipping {hop}") + return [], None + + for rse in [hop.src.rse, hop.dst.rse]: + driver_cls = cls._pick_management_api_driver_cls(rse) + if not driver_cls: + logger(logging.INFO, f"The rse '{rse}' is not configured correctly for bittorrent") + return [], None + + pieces_root, _pieces_layers, piece_length = cls._get_torrent_meta(hop.rws.scope, hop.rws.name) + if not pieces_root or not piece_length: + logger(logging.INFO, "The required bittorrent metadata not set on the DID") + return [], None + + return [hop], TransferToolBuilder(cls, external_host='Bittorrent Transfertool') + + def group_into_submit_jobs(self, transfer_paths: "Sequence[list[DirectTransfer]]") -> list[dict[str, Any]]: + return [{'transfers': transfer_path, 'job_params': {}} for transfer_path in transfer_paths] + + @staticmethod + def _connect_without_tracker(torrent_id: str, peers_drivers: Sequence[BittorrentDriver]) -> None: + peer_addr = [] + for i, driver in enumerate(peers_drivers): + peer_addr.append(driver.listen_addr()) + + for driver in peers_drivers: + driver.add_peers(torrent_id=torrent_id, peers=peer_addr) + + def submit(self, transfers: "Sequence[DirectTransfer]", job_params: dict[str, str], timeout: Optional[int] = None) -> str: + [transfer] = transfers + rws = transfer.rws + + tracker = transfer.dst.rse.attributes.get('bittorrent-tracker-addr', self.tracker) + + src_drivers = {transfer.src: self._driver_for_rse(transfer.src.rse)} + for source in transfer.sources: + driver = self._driver_for_rse(source.rse) + if driver: + src_drivers[source] = driver + + dst_driver = self._driver_for_rse(transfer.dst.rse) + + if not dst_driver or not src_drivers: + raise Exception('Cannot initialize bittorrent drivers to submit transfers') + + pieces_root, pieces_layers, piece_length = self._get_torrent_meta(rws.scope, rws.name) + torrent_id, torrent = construct_torrent( + scope=str(rws.scope), + name=rws.name, + length=rws.byte_count, + piece_length=piece_length, + pieces_root=pieces_root, + pieces_layers=pieces_layers, + ) + + for source, driver in src_drivers.items(): + source_protocol = transfer.source_protocol(source) + [lfn] = source_protocol.parse_pfns([transfer.source_url(source)]).values() + driver.add_torrent( + file_name=rws.name, + file_content=torrent, + download_location=lfn['prefix'] + path.dirname(lfn['path']), + seed_mode=True, + ) + + dest_protocol = transfer.dest_protocol() + [lfn] = dest_protocol.parse_pfns([transfer.dest_url]).values() + dst_driver.add_torrent( + file_name=rws.name, + file_content=torrent, + download_location=lfn['prefix'] + lfn['path'], + ) + + if not tracker: + self._connect_without_tracker(torrent_id, [dst_driver] + list(src_drivers.values())) + + return torrent_id + + def bulk_query(self, requests_by_eid, timeout: Optional[int] = None) -> Mapping[str, Mapping[str, TransferStatusReport]]: + response = {} + for transfer_id, requests in requests_by_eid.items(): + for request_id, request in requests.items(): + driver = self._driver_for_rse(request['dst_rse']) + if not driver: + self.logger(f'Cannot instantiate BitTorrent driver for {request["dest_rse"]}') + continue + response.setdefault(transfer_id, {})[request_id] = driver.get_status(request_id=request_id, torrent_id=transfer_id) + return response + + def query(self, transfer_ids: Sequence[str], details: bool = False, timeout: Optional[int] = None) -> None: + pass + + def cancel(self, transfer_ids: Sequence[str], timeout: Optional[int] = None) -> None: + pass + + def update_priority(self, transfer_id: str, priority: int, timeout: Optional[int] = None) -> None: + pass diff --git a/lib/rucio/transfertool/bittorrent_driver.py b/lib/rucio/transfertool/bittorrent_driver.py new file mode 100644 index 00000000000..bd642db35fc --- /dev/null +++ b/lib/rucio/transfertool/bittorrent_driver.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from abc import ABCMeta, abstractmethod +from typing import TYPE_CHECKING, Sequence + +from rucio.common import types + +if TYPE_CHECKING: + from typing import Optional, Type + + from rucio.core.rse import RseData + from rucio.transfertool.transfertool import TransferStatusReport + + +class BittorrentDriver(metaclass=ABCMeta): + external_name = '' + required_rse_attrs = tuple() + + @classmethod + @abstractmethod + def make_driver(cls: "Type[BittorrentDriver]", rse: "RseData", logger: types.LoggerFunction = logging.log) -> "Optional[BittorrentDriver]": + pass + + @abstractmethod + def listen_addr(self) -> tuple[str, int]: + pass + + @abstractmethod + def management_addr(self) -> tuple[str, int]: + pass + + @abstractmethod + def add_torrent(self, file_name: str, file_content: bytes, download_location: str, seed_mode: bool = False) -> None: + pass + + @abstractmethod + def add_peers(self, torrent_id: str, peers: Sequence[tuple[str, int]]) -> None: + pass + + @abstractmethod + def get_status(self, request_id: str, torrent_id: str) -> "TransferStatusReport": + pass diff --git a/lib/rucio/transfertool/bittorrent_driver_qbittorrent.py b/lib/rucio/transfertool/bittorrent_driver_qbittorrent.py new file mode 100644 index 00000000000..3a8380f52e1 --- /dev/null +++ b/lib/rucio/transfertool/bittorrent_driver_qbittorrent.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# Copyright European Organization for Nuclear Research (CERN) since 2012 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, Optional, Sequence +from urllib.parse import urlparse + +import qbittorrentapi + +from rucio.common import types +from rucio.common.config import get_rse_credentials +from rucio.common.utils import resolve_ip +from rucio.core.oidc import request_token +from rucio.db.sqla.constants import RequestState +from rucio.transfertool.transfertool import TransferStatusReport +from .bittorrent_driver import BittorrentDriver + +if TYPE_CHECKING: + from typing import Type + from sqlalchemy.orm import Session + from rucio.core.rse import RseData + + +class QBittorrentTransferStatusReport(TransferStatusReport): + + supported_db_fields = [ + 'state', + 'external_id', + ] + + def __init__(self, request_id: str, external_id: str, qbittorrent_response: Optional[qbittorrentapi.TorrentDictionary]) -> None: + super().__init__(request_id) + + if qbittorrent_response and qbittorrent_response.state_enum.is_complete == 1: + new_state = RequestState.DONE + else: + new_state = RequestState.SUBMITTED + + self.state = new_state + self.external_id = None + if new_state in [RequestState.FAILED, RequestState.DONE]: + self.external_id = external_id + + def initialize(self, session: "Session", logger: types.LoggerFunction = logging.log) -> None: + pass + + def get_monitor_msg_fields(self, session: "Session", logger: types.LoggerFunction = logging.log) -> dict[str: str]: + return {'protocol': 'qbittorrent'} + + +class QBittorrentDriver(BittorrentDriver): + + external_name = 'qbittorrent' + required_rse_attrs = ('qbittorrent_management_address', ) + + @classmethod + def make_driver(cls: "Type[QBittorrentDriver]", rse: "RseData", logger: types.LoggerFunction = logging.log) -> "Optional[BittorrentDriver]": + + address = rse.attributes.get('qbittorrent_management_address') + if not address: + return None + + url = urlparse(address) + token = None + if url.scheme.lower() == 'https': + token = request_token(audience=url.hostname, scope='qbittorrent_admin') + else: + logging.debug(f'{cls.external_name} will not try token authentication. Requires HTTPS.') + + rse_cred = get_rse_credentials().get(rse.id, {}) + username = rse_cred.get('qbittorrent_username') + password = rse_cred.get('qbittorrent_password') + + if not (token or (username and password)): + return None + + return cls( + address=address, + username=username, + password=password, + token=token, + logger=logger, + ) + + def __init__(self, address: str, username: str, password: str, token: Optional[str] = None, logger: types.LoggerFunction = logging.log) -> None: + extra_headers = None + if token: + extra_headers = {'Authorization': 'Bearer ' + token} + + self.client = qbittorrentapi.Client( + host=address, + username=username, + password=password, + EXTRA_HEADERS=extra_headers, + FORCE_SCHEME_FROM_HOST=True, + ) + self.logger = logger + + def listen_addr(self) -> tuple[str, int]: + preferences = self.client.app_preferences() + port = preferences['listen_port'] + ip = resolve_ip(urlparse(self.client.host).hostname) + return ip, port + + def management_addr(self) -> tuple[str, int]: + return self.client.host, self.client.port + + def add_torrent(self, file_name: str, file_content: bytes, download_location: str, seed_mode: bool = False) -> None: + self.client.torrents_add( + rename=file_name, + torrent_files=file_content, + save_path=download_location, + is_skip_checking=seed_mode, + is_sequential_download=True, + ) + + def add_peers(self, torrent_id: str, peers: Sequence[tuple[str, int]]) -> None: + self.client.torrents_add_peers(torrent_hashes=[torrent_id], peers=[f'{ip}:{port}' for ip, port in peers]) + + def get_status(self, request_id: str, torrent_id: str) -> TransferStatusReport: + info = self.client.torrents_info(torrent_hashes=[torrent_id]) + return QBittorrentTransferStatusReport(request_id, external_id=torrent_id, qbittorrent_response=info[0] if info else None) diff --git a/lib/rucio/transfertool/transfertool.py b/lib/rucio/transfertool/transfertool.py index 9956405845b..6805288def5 100644 --- a/lib/rucio/transfertool/transfertool.py +++ b/lib/rucio/transfertool/transfertool.py @@ -118,7 +118,7 @@ class Transfertool(object, metaclass=ABCMeta): external_name = '' required_rse_attrs = () - supported_schemes = set(SUPPORTED_PROTOCOLS) + supported_schemes = set(SUPPORTED_PROTOCOLS).difference(('magnet', )) def __init__(self, external_host, logger=logging.log): """ diff --git a/requirements.txt b/requirements.txt index 28532a404f8..3ff2e95e9c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,6 +37,8 @@ PyYAML==6.0.1 # globus_extras and globus-sdk==3.32.0 # globus_extras python3-saml==1.16.0 # saml_extras pymongo==4.6.0 # pymongo (metadata plugin) +libtorrent==2.0.9 # Support for the bittorrent transfertool +qbittorrent-api==2023.11.57 # qBittorrent plugin for the bittorrent tranfsertool # All dependencies needed to develop/test rucio should be defined here pytest==7.4.3 diff --git a/tests/conftest.py b/tests/conftest.py index 884e7cab492..851357e565b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -163,6 +163,13 @@ def dirac_client(): return DiracClient() +@pytest.fixture +def download_client(): + from rucio.client.downloadclient import DownloadClient + + return DownloadClient() + + @pytest.fixture def rest_client(): from rucio.tests.common import print_response diff --git a/tests/test_conveyor.py b/tests/test_conveyor.py index 4a21a2d93cd..743c5d35f52 100644 --- a/tests/test_conveyor.py +++ b/tests/test_conveyor.py @@ -16,6 +16,7 @@ import threading import time from datetime import datetime, timedelta +from tempfile import TemporaryDirectory from unittest.mock import patch from urllib.parse import urlencode, urlparse, parse_qsl, urlunparse from sqlalchemy import update @@ -24,7 +25,7 @@ import rucio.daemons.reaper.reaper from rucio.common.types import InternalAccount -from rucio.common.utils import generate_uuid +from rucio.common.utils import generate_uuid, adler32 from rucio.common.exception import ReplicaNotFound, RequestNotFound from rucio.core import config as core_config from rucio.core import did as did_core @@ -56,6 +57,11 @@ TEST_FTS_HOST = 'https://fts:8446' +@transactional_session +def __update_request(request_id, *, session=None, **kwargs): + session.query(models.Request).filter_by(id=request_id).update(kwargs, synchronize_session=False) + + def __wait_for_replica_transfer(dst_rse_id, scope, name, max_wait_seconds=MAX_POLL_WAIT_SECONDS, transfertool=None): """ Wait for the replica to become AVAILABLE on the given RSE as a result of a pending transfer @@ -223,6 +229,7 @@ def __fake_source_ranking(*, session=None): assert __get_source(request_id=request['id'], src_rse_id=src_rse2_id, **did).ranking == 0 # Only group_bulk=1 part of the path was submitted. # run submitter again to copy from jump rse to destination rse + __update_request(request_core.get_request_by_did(rse_id=dst_rse_id, **did)['id'], last_processed_by=None) submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], partition_wait_time=0, transfertype='single', filter_transfertool=None) # Wait for the destination replica to become ready @@ -1007,10 +1014,6 @@ def test_lost_transfers(rse_factory, did_factory, root_account): rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) - @transactional_session - def __update_request(request_id, *, session=None, **kwargs): - session.query(models.Request).filter_by(id=request_id).update(kwargs, synchronize_session=False) - # Fake that the transfer is submitted and lost submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=2, partition_wait_time=0, transfertype='single', filter_transfertool=None) request = request_core.get_request_by_did(rse_id=dst_rse_id, **did) @@ -1378,6 +1381,7 @@ def __init_test_for_vo(vo, scope): certs_used_by_poller = [] class _FTSWrapper(FTS3Transfertool): + supported_schemes = FTS3Transfertool.supported_schemes.union(('mock', )) # Override fts3 transfertool. Don't actually perform any interaction with fts; and record the certificates used def submit(self, transfers, job_params, timeout=None): certs_used_by_submitter.append(self.cert[0]) @@ -1391,7 +1395,6 @@ def bulk_query(self, requests_by_eid, timeout=None): submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=2, partition_wait_time=0, transfertype='single', filter_transfertool=None) assert sorted(certs_used_by_submitter) == ['DEFAULT_DUMMY_CERT', 'NEW_VO_DUMMY_CERT'] - with patch('rucio.daemons.conveyor.poller.FTS3Transfertool', _FTSWrapper): poller(once=True, older_than=0, partition_wait_time=0) assert sorted(certs_used_by_poller) == ['DEFAULT_DUMMY_CERT', 'NEW_VO_DUMMY_CERT'] @@ -1503,6 +1506,7 @@ def on_submit(file): replica_core.get_replica(rse_id=rse_id, **did) # Final hop + __update_request(request_core.get_request_by_did(rse_id=rse_id_queued, **did)['id'], last_processed_by=None) submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=10, partition_wait_time=0, transfertype='single', filter_transfertool=None) replica = __wait_for_replica_transfer(dst_rse_id=rse_id_queued, **did) assert replica['state'] == ReplicaState.AVAILABLE @@ -1657,3 +1661,44 @@ def __setup_test(): preparer(once=True, sleep_time=1, bulk=100, partition_wait_time=0, ignore_availability=True) request = request_core.get_request_by_did(rse_id=dst_rse_id, **did) assert request['state'] == RequestState.QUEUED + + +@skip_rse_tests_with_accounts +@pytest.mark.noparallel(groups=[NoParallelGroups.XRD, NoParallelGroups.SUBMITTER, NoParallelGroups.POLLER, NoParallelGroups.FINISHER]) +def test_bittorrent_submission(did_factory, root_account, vo, download_client, file_config_mock): + src_rse = 'XRD1' + src_rse_id = rse_core.get_rse_id(rse=src_rse, vo=vo) + dst_rse = 'XRD2' + dst_rse_id = rse_core.get_rse_id(rse=dst_rse, vo=vo) + all_rses = [src_rse_id, dst_rse_id] + + did = did_factory.upload_test_file(src_rse) + + rule_core.add_rule(dids=[did], account=root_account, copies=1, rse_expression=dst_rse, grouping='ALL', weight=None, lifetime=None, locked=False, subscription_id=None) + + mocked_credentials = { + src_rse_id: { + "qbittorrent_username": "rucio", + "qbittorrent_password": "rucio90df" + }, + dst_rse_id: { + "qbittorrent_username": "rucio", + "qbittorrent_password": "rucio90df" + } + } + with patch('rucio.transfertool.bittorrent_driver_qbittorrent.get_rse_credentials', return_value=mocked_credentials): + submitter(once=True, rses=[{'id': rse_id} for rse_id in all_rses], group_bulk=2, partition_wait_time=0, transfertools=['bittorrent'], filter_transfertool=None) + request = request_core.get_request_by_did(rse_id=dst_rse_id, **did) + assert request['state'] == RequestState.SUBMITTED + + replica = __wait_for_replica_transfer(dst_rse_id=dst_rse_id, max_wait_seconds=10, transfertool='bittorrent', **did) + assert replica['state'] == ReplicaState.AVAILABLE + + with TemporaryDirectory() as tmp_dir: + download_client.download_dids([{ + 'did': '{scope}:{name}'.format(**did), + 'base_dir': tmp_dir, + 'rse': dst_rse, + 'no_subdir': True, + }]) + assert adler32(f'{tmp_dir}/{did["name"]}') == did_core.get_did(**did)['adler32'] diff --git a/tests/test_upload.py b/tests/test_upload.py index b245adfc746..7c62f8bf995 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -21,7 +21,6 @@ from tempfile import TemporaryDirectory from unittest.mock import patch -from rucio.client.downloadclient import DownloadClient from rucio.client.uploadclient import UploadClient from rucio.common.config import config_add_section, config_set from rucio.common.exception import InputValidationError, NoFilesUploaded, NotAllFilesUploaded @@ -37,11 +36,6 @@ def upload_client(): return UploadClient(logger=logger) -@pytest.fixture -def download_client(): - return DownloadClient() - - @pytest.fixture def rse(containerized_rses, rse_factory): if len(containerized_rses) > 0: diff --git a/tests/test_utils.py b/tests/test_utils.py index a12f89b25a4..2c61d44b9b4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -15,12 +15,13 @@ import datetime import logging +import os from re import match import pytest from rucio.common.exception import InvalidType -from rucio.common.utils import md5, adler32, parse_did_filter_from_string, Availability, retrying +from rucio.common.utils import md5, adler32, parse_did_filter_from_string, Availability, retrying, bittorrent_v2_merkle_sha256 from rucio.common.logging import formatted_logger @@ -181,3 +182,26 @@ def retry_on_attribute_error(): with pytest.raises(ValueError): retry_on_attribute_error() assert len(attempts) == 1 + + +def test_bittorrent_sa256_merkle(file_factory): + + def _sha256_merkle_via_libtorrent(file, piece_size=0): + import libtorrent as lt + file = str(file) + fs = lt.file_storage() + lt.add_files(fs, file) + t = lt.create_torrent(fs, flags=lt.create_torrent.v2_only, piece_size=piece_size) + lt.set_piece_hashes(t, os.path.dirname(file)) + + torrent = t.generate() + pieces_root = next(iter(next(iter(torrent[b'info'][b'file tree'].values())).values()))[b'pieces root'] + pieces_layers = torrent.get(b'piece layers', {}).get(pieces_root, b'') + piece_size = t.piece_length() + + return pieces_root, pieces_layers, piece_size + + for size in (1, 333, 1024, 16384, 16390, 32768, 32769, 49152, 65530, 65536, 81920, 2**20 - 2**17, 2**20, 2**20 + 2): + file = file_factory.file_generator(size=size) + root, layers, piece_size = bittorrent_v2_merkle_sha256(file) + assert (root, layers, piece_size) == _sha256_merkle_via_libtorrent(file, piece_size=piece_size) diff --git a/tools/docker_activate_rses.sh b/tools/docker_activate_rses.sh index fe278058ec4..b31d11c0813 100755 --- a/tools/docker_activate_rses.sh +++ b/tools/docker_activate_rses.sh @@ -45,7 +45,9 @@ rucio-admin rse add WEB1 # Add the protocol definitions for the storage servers rucio-admin rse add-protocol --hostname xrd1 --scheme root --prefix //rucio --port 1094 --impl rucio.rse.protocols.xrootd.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 1}, "lan": {"read": 1, "write": 1, "delete": 1}}' XRD1 +rucio-admin rse add-protocol --hostname xrd1 --scheme magnet --prefix //rucio --port 10000 --impl rucio.rse.protocols.bittorrent.Default --domain-json '{"wan": {"read": 2, "write": 0, "delete": 0, "third_party_copy_read": 2, "third_party_copy_write": 2}, "lan": {"read": 2, "write": 0, "delete": 0}}' XRD1 rucio-admin rse add-protocol --hostname xrd2 --scheme root --prefix //rucio --port 1095 --impl rucio.rse.protocols.xrootd.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 1}, "lan": {"read": 1, "write": 1, "delete": 1}}' XRD2 +rucio-admin rse add-protocol --hostname xrd2 --scheme magnet --prefix //rucio --port 10000 --impl rucio.rse.protocols.bittorrent.Default --domain-json '{"wan": {"read": 2, "write": 0, "delete": 0, "third_party_copy_read": 2, "third_party_copy_write": 2}, "lan": {"read": 2, "write": 0, "delete": 0}}' XRD2 rucio-admin rse add-protocol --hostname xrd3 --scheme root --prefix //rucio --port 1096 --impl rucio.rse.protocols.xrootd.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 1}, "lan": {"read": 1, "write": 1, "delete": 1}}' XRD3 rucio-admin rse add-protocol --hostname xrd4 --scheme root --prefix //rucio --port 1097 --impl rucio.rse.protocols.xrootd.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 1}, "lan": {"read": 1, "write": 1, "delete": 1}}' XRD4 rucio-admin rse add-protocol --hostname xrd5 --scheme root --prefix //rucio --port 1098 --impl rucio.rse.protocols.xrootd.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 1}, "lan": {"read": 1, "write": 1, "delete": 1}}' XRD5 @@ -56,6 +58,10 @@ rucio-admin rse add-protocol --hostname ssh1 --scheme rclone --prefix /rucio --p rucio-admin rse add-protocol --hostname web1 --scheme davs --prefix /rucio --port 443 --impl rucio.rse.protocols.gfal.Default --domain-json '{"wan": {"read": 1, "write": 1, "delete": 1, "third_party_copy_read": 1, "third_party_copy_write": 2}, "lan": {"read": 1, "write": 1, "delete": 1}}' WEB1 # Set test_container_xrd attribute for xrd containers +rucio-admin rse set-attribute --rse XRD1 --key bittorrent_driver --value qbittorrent +rucio-admin rse set-attribute --rse XRD2 --key bittorrent_driver --value qbittorrent +rucio-admin rse set-attribute --rse XRD1 --key qbittorrent_management_address --value https://xrd1:8094/ +rucio-admin rse set-attribute --rse XRD2 --key qbittorrent_management_address --value https://xrd2:8095/ rucio-admin rse set-attribute --rse XRD1 --key test_container_xrd --value True rucio-admin rse set-attribute --rse XRD2 --key test_container_xrd --value True rucio-admin rse set-attribute --rse XRD3 --key test_container_xrd --value True