Skip to content

Commit

Permalink
Protocols: implement token support for xrootd protocol
Browse files Browse the repository at this point in the history
Pass the token as an ENV variable which is automatically used
by the xrdfs binary which we call via a sub-process invocation.
  • Loading branch information
Radu Carpa committed Jan 8, 2024
1 parent 436ecc9 commit 2690291
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions lib/rucio/rse/protocols/xrootd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@
class Default(protocol.RSEProtocol):
""" Implementing access to RSEs using the XRootD protocol using GSI authentication."""

@property
def _auth_env(self):
if self.auth_token:
return f"XrdSecPROTOCOL=ztn BEARER_TOKEN='{self.auth_token}'"
else:
return 'XrdSecPROTOCOL=gsi'

def __init__(self, protocol_attr, rse_settings, logger=logging.log):
""" Initializes the object with information about the referred RSE.
Expand Down Expand Up @@ -66,7 +73,7 @@ def exists(self, pfn):
self.logger(logging.DEBUG, 'xrootd.exists: pfn: {}'.format(pfn))
try:
path = self.pfn2path(pfn)
cmd = 'XrdSecPROTOCOL=gsi xrdfs %s:%s stat %s' % (self.hostname, self.port, path)
cmd = f'{self._auth_env} xrdfs {self.hostname}:{self.port} stat {path}'
self.logger(logging.DEBUG, 'xrootd.exists: cmd: {}'.format(cmd))
status, out, err = execute(cmd)
if status != 0:
Expand All @@ -86,15 +93,15 @@ def stat(self, path):
:returns: a dict with two keys, filesize and an element of GLOBALLY_SUPPORTED_CHECKSUMS.
"""
self.logger(logging.DEBUG, 'xrootd.stat: path: {}'.format(path))
self.logger(logging.DEBUG, f'xrootd.stat: path: {path}')
ret = {}
chsum = None
if path.startswith('root:'):
path = self.pfn2path(path)

try:
# xrdfs stat for getting filesize
cmd = 'XrdSecPROTOCOL=gsi xrdfs %s:%s stat %s' % (self.hostname, self.port, path)
cmd = f'{self._auth_env} xrdfs {self.hostname}:{self.port} stat {path}'
self.logger(logging.DEBUG, 'xrootd.stat: filesize cmd: {}'.format(cmd))
status_stat, out, err = execute(cmd)
if status_stat == 0:
Expand All @@ -106,7 +113,7 @@ def stat(self, path):
break

# xrdfs query checksum for getting checksum
cmd = 'XrdSecPROTOCOL=gsi xrdfs %s:%s query checksum %s' % (self.hostname, self.port, path)
cmd = f'{self._auth_env} xrdfs {self.hostname}:{self.port} query checksum {path}'
self.logger(logging.DEBUG, 'xrootd.stat: checksum cmd: {}'.format(cmd))
status_query, out, err = execute(cmd)
if status_query == 0:
Expand Down Expand Up @@ -183,7 +190,7 @@ def connect(self):
try:
# The query stats call is not implemented on some xroot doors.
# Workaround: fail, if server does not reply within 10 seconds for static config query
cmd = 'XrdSecPROTOCOL=gsi XRD_REQUESTTIMEOUT=10 xrdfs %s:%s query config %s:%s' % (self.hostname, self.port, self.hostname, self.port)
cmd = f'{self._auth_env} XRD_REQUESTTIMEOUT=10 xrdfs {self.hostname}:{self.port} query config {self.hostname}:{self.port}'
self.logger(logging.DEBUG, 'xrootd.connect: cmd: {}'.format(cmd))
status, out, err = execute(cmd)
if status != 0:
Expand All @@ -206,7 +213,7 @@ def get(self, pfn, dest, transfer_timeout=None):
"""
self.logger(logging.DEBUG, 'xrootd.get: pfn: {}'.format(pfn))
try:
cmd = 'XrdSecPROTOCOL=gsi xrdcp -f %s %s' % (pfn, dest)
cmd = f'{self._auth_env} xrdcp -f {pfn} {dest}'
self.logger(logging.DEBUG, 'xrootd.get: cmd: {}'.format(cmd))
status, out, err = execute(cmd)
if status == 54:
Expand Down Expand Up @@ -237,7 +244,7 @@ def put(self, filename, target, source_dir, transfer_timeout=None):
if not os.path.exists(source_url):
raise exception.SourceNotFound()
try:
cmd = 'XrdSecPROTOCOL=gsi xrdcp -f %s %s' % (source_url, path)
cmd = f'{self._auth_env} xrdcp -f {source_url} {path}'
self.logger(logging.DEBUG, 'xrootd.put: cmd: {}'.format(cmd))
status, out, err = execute(cmd)
if status != 0:
Expand All @@ -259,7 +266,7 @@ def delete(self, pfn):
raise exception.SourceNotFound()
try:
path = self.pfn2path(pfn)
cmd = 'XrdSecPROTOCOL=gsi xrdfs %s:%s rm %s' % (self.hostname, self.port, path)
cmd = f'{self._auth_env} xrdfs {self.hostname}:{self.port} rm {path}'
self.logger(logging.DEBUG, 'xrootd.delete: cmd: {}'.format(cmd))
status, out, err = execute(cmd)
if status != 0:
Expand All @@ -283,10 +290,10 @@ def rename(self, pfn, new_pfn):
path = self.pfn2path(pfn)
new_path = self.pfn2path(new_pfn)
new_dir = new_path[:new_path.rindex('/') + 1]
cmd = 'XrdSecPROTOCOL=gsi xrdfs %s:%s mkdir -p %s' % (self.hostname, self.port, new_dir)
cmd = f'{self._auth_env} xrdfs {self.hostname}:{self.port} mkdir -p {new_dir}'
self.logger(logging.DEBUG, 'xrootd.stat: mkdir cmd: {}'.format(cmd))
status, out, err = execute(cmd)
cmd = 'XrdSecPROTOCOL=gsi xrdfs %s:%s mv %s %s' % (self.hostname, self.port, path, new_path)
cmd = f'{self._auth_env} xrdfs {self.hostname}:{self.port} mv {path} {new_path}'
self.logger(logging.DEBUG, 'xrootd.stat: rename cmd: {}'.format(cmd))
status, out, err = execute(cmd)
if status != 0:
Expand Down

0 comments on commit 2690291

Please sign in to comment.