From b5bfd786678587ef25295a21e44398d7270f6c05 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Mon, 7 Oct 2024 15:30:42 +0200 Subject: [PATCH] sweep: #7440 Support token transfers in FTS --- dirac.cfg | 1 + .../AdministratorGuide/Resources/storage.rst | 1 + .../Systems/DataManagement/fts3.rst | 24 ++++ .../DataManagementSystem/Agent/FTS3Agent.py | 24 +++- .../DataManagementSystem/Client/FTS3File.py | 1 + .../DataManagementSystem/Client/FTS3Job.py | 129 +++++++++++++++--- .../DataManagementSystem/ConfigTemplate.cfg | 3 + .../Client/TokenManagerClient.py | 66 ++++++++- src/DIRAC/Resources/Storage/StorageBase.py | 15 ++ src/DIRAC/Resources/Storage/StorageElement.py | 22 ++- 10 files changed, 261 insertions(+), 25 deletions(-) diff --git a/dirac.cfg b/dirac.cfg index 1afcb93626c..1a03c0888ea 100644 --- a/dirac.cfg +++ b/dirac.cfg @@ -825,6 +825,7 @@ Resources SpaceReservation = LHCb-EOS # Space reservation name if any. Concept like SpaceToken ArchiveTimeout = 84600 # Timeout for the FTS archiving BringOnlineTimeout = 84600 # Timeout for the bring online operation used by FTS + WLCGTokenBasePath = /eos/lhcb # EXPERIMENTAL Path from which the token should be relative to # Protocol section, see http://dirac.readthedocs.io/en/latest/AdministratorGuide/Resources/Storages/index.html#available-protocol-plugins GFAL2_SRM2 { diff --git a/docs/source/AdministratorGuide/Resources/storage.rst b/docs/source/AdministratorGuide/Resources/storage.rst index 244ab335ce8..f1c3724e465 100644 --- a/docs/source/AdministratorGuide/Resources/storage.rst +++ b/docs/source/AdministratorGuide/Resources/storage.rst @@ -59,6 +59,7 @@ Configuration options are: * ``SpaceReservation``: just a name of a zone of the physical storage which can have some space reserved. Extends the SRM ``SpaceToken`` concept. * ``ArchiveTimeout``: for tape SE only. If set to a value in seconds, enables the `FTS Archive Monitoring feature `_ * ``BringOnlineTimeout``: for tape SE only. If set to a value in seconds, specify the BringOnline parameter for FTS transfers. Otherwise, the default is whatever is in the ``FTS3Job`` class. +* ``WLCGTokenBasePath``: EXPERIMENTAL Path from which the token should be relative to VO specific paths ----------------- diff --git a/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst b/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst index 030504cef11..57d5c4d2a35 100644 --- a/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst +++ b/docs/source/AdministratorGuide/Systems/DataManagement/fts3.rst @@ -198,3 +198,27 @@ More details on how the intermediate SE selection is done and how the matrix is Work in FTS has a `task `_ to try and bring that feature in. A future solution may come from DIRAC. In the meantime, the best solution is to ask the site to either cleanup themselves (some storages like EOS have that built in) or to give you a dump of the namespace, and then do the cleaning yourself. + + +Token support +---------------- + +.. versionadded:: v8.0.51 + +.. warning:: + Very experimental feature + + +The current state is the one in which LHCb ran the DC24 challenge. It only worked for dCache site, as there is still not a uniform way for storages to understand permissions... +A transfer will happen with token if: + + * ``UseTokens`` is true in the FTSAgent configuration + * ``WLCGTokenBasePath`` is set for both the source and the destination + +The tokens use specific file path, and not generic wildcard permissions. + +.. warning:: + Token support is as experimental as can be in any layer of the stack (DIRAC, storage, FTS... even the model is experimental) + +.. warning:: + The FTS3Agent got occasionaly stuck when tokens were used diff --git a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py index 4531178b64f..68ad5a753ca 100644 --- a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py +++ b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py @@ -12,6 +12,7 @@ :caption: FTS3Agent options """ + import datetime import errno import os @@ -39,6 +40,10 @@ from DIRAC.FrameworkSystem.Client.Logger import gLogger from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager from DIRAC.MonitoringSystem.Client.DataOperationSender import DataOperationSender +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager +from DIRAC.DataManagementSystem.private import FTS3Utilities +from DIRAC.DataManagementSystem.DB.FTS3DB import FTS3DB +from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient # pylint: disable=attribute-defined-outside-init @@ -103,6 +108,8 @@ def __readConf(self): self.maxDelete = self.am_getOption("DeleteLimitPerCycle", 100) # lifetime of the proxy we download to delegate to FTS self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME) + self.jobMonitoringBatchSize = self.am_getOption("JobMonitoringBatchSize", JOB_MONITORING_BATCH_SIZE) + self.useTokens = self.am_getOption("UseTokens", False) self.jobMonitoringBatchSize = self.am_getOption("JobMonitoringBatchSize", JOB_MONITORING_BATCH_SIZE) @@ -495,7 +502,22 @@ def _treatOperation(self, operation): log.error("Could not select TPC list", repr(e)) continue - res = ftsJob.submit(context=context, protocols=tpcProtocols) + # If we use token, get an access token with the + # fts scope in it + # The FTS3Job will decide to use it or not + fts_access_token = None + if self.useTokens: + res = gTokenManager.getToken( + userGroup=ftsJob.userGroup, + requiredTimeLeft=3600, + scope=["fts"], + ) + if not res["OK"]: + return res + + fts_access_token = res["Value"]["access_token"] + + res = ftsJob.submit(context=context, protocols=tpcProtocols, fts_access_token=fts_access_token) if not res["OK"]: log.error("Could not submit FTS3Job", f"FTS3Operation {operation.operationID} : {res}") diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3File.py b/src/DIRAC/DataManagementSystem/Client/FTS3File.py index 0b687300af3..bcd4e34a55f 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3File.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3File.py @@ -21,6 +21,7 @@ class FTS3File(JSerializable): "Started", # From FTS: File transfer has started "Not_used", # From FTS: Transfer not being considered yet, waiting for another one (multihop) "Archiving", # From FTS: file not yet migrated to tape + "Token_prep", # From FTS: When using token, used before Submitted until FTS fetched a refresh token ] # These are the states that we consider final. diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 83f2724027a..4d17ca1b430 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -25,6 +25,7 @@ from DIRAC.Resources.Storage.StorageElement import StorageElement from DIRAC.FrameworkSystem.Client.Logger import gLogger +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR from DIRAC.Core.Utilities.DErrno import cmpError @@ -301,7 +302,18 @@ def __isTapeSE(seName, vo): return isTape - def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=None): + @staticmethod + def __seTokenSupport(seObj): + """Check whether a given SE supports token + + :param seObj: StorageElement object + + :returns: True/False + In case of error, returns False + """ + return seObj.options.get("TokenSupport", "").lower() in ("true", "yes") + + def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=None, tokensEnabled=False): """Build a job for transfer Some attributes of the job are expected to be set @@ -329,6 +341,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log = gLogger.getSubLogger(f"constructTransferJob/{self.operationID}/{self.sourceSE}_{self.targetSE}") isMultiHop = False + useTokens = False # Check if it is a multiHop transfer if self.multiHopSE: @@ -429,6 +442,9 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log.debug(f"Not preparing transfer for file {ftsFile.lfn}") continue + srcToken = None + dstToken = None + sourceSURL, targetSURL = allSrcDstSURLs[ftsFile.lfn] stageURL = allStageURLs.get(ftsFile.lfn) @@ -485,6 +501,44 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N if self.activity: trans_metadata["activity"] = self.activity + # Add tokens if both storages support it and if the requested + if tokensEnabled and self.__seTokenSupport(srcSE) and self.__seTokenSupport(dstSE): + # We get a read token for the source + # offline_access is to allow FTS to refresh it + res = srcSE.getWLCGTokenPath(ftsFile.lfn) + if not res["OK"]: + return res + srcTokenPath = res["Value"] + res = gTokenManager.getToken( + userGroup=self.userGroup, + requiredTimeLeft=3600, + scope=[f"storage.read:/{srcTokenPath}", "offline_access"], + useCache=False, + ) + if not res["OK"]: + return res + srcToken = res["Value"]["access_token"] + + # We get a token with modify and read for the destination + # We need the read to be able to stat + # CAUTION: only works with dcache for now, other storages + # interpret permissions differently + # offline_access is to allow FTS to refresh it + res = dstSE.getWLCGTokenPath(ftsFile.lfn) + if not res["OK"]: + return res + dstTokenPath = res["Value"] + res = gTokenManager.getToken( + userGroup=self.userGroup, + requiredTimeLeft=3600, + scope=[f"storage.modify:/{dstTokenPath}", f"storage.read:/{dstTokenPath}", "offline_access"], + useCache=False, + ) + if not res["OK"]: + return res + dstToken = res["Value"]["access_token"] + useTokens = True + # because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433) # the checksum needs to be lowercase. It does not impact the other # protocol, so it's fine to put it here. @@ -497,6 +551,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N filesize=ftsFile.size, metadata=trans_metadata, activity=self.activity, + source_token=srcToken, + destination_token=dstToken, ) transfers.append(trans) @@ -514,6 +570,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N "rmsReqID": self.rmsReqID, "sourceSE": self.sourceSE, "targetSE": self.targetSE, + "useTokens": useTokens, # Store the information here to propagate it to submission } if self.activity: @@ -676,7 +733,7 @@ def _constructStagingJob(self, pinTime, allLFNs, target_spacetoken): return S_OK((job, fileIDsInTheJob)) - def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None): + def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None, fts_access_token=None): """submit the job to the FTS server Some attributes are expected to be defined for the submission to work: @@ -700,17 +757,13 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc :param ucert: path to the user certificate/proxy. Might be inferred by the fts cli (see its doc) :param protocols: list of protocols from which we should choose the protocol to use + :param fts_access_token: token to be used to talk to FTS and to be passed when creating a context :returns: S_OK([FTSFiles ids of files submitted]) """ log = gLogger.getLocalSubLogger(f"submit/{self.operationID}/{self.sourceSE}_{self.targetSE}") - if not context: - if not ftsServer: - ftsServer = self.ftsServer - context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False) - # Construct the target SURL res = self.__fetchSpaceToken(self.targetSE, self.vo) if not res["OK"]: @@ -720,7 +773,10 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc allLFNs = [ftsFile.lfn for ftsFile in self.filesToSubmit] if self.type == "Transfer": - res = self._constructTransferJob(pinTime, allLFNs, target_spacetoken, protocols=protocols) + res = self._constructTransferJob( + pinTime, allLFNs, target_spacetoken, protocols=protocols, tokensEnabled=bool(fts_access_token) + ) + elif self.type == "Staging": res = self._constructStagingJob(pinTime, allLFNs, target_spacetoken) # elif self.type == 'Removal': @@ -731,6 +787,21 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc job, fileIDsInTheJob = res["Value"] + # If we need a token, don't use the context given in parameter + # because the one given in parameter is only with X509 creds + if job["params"].get("job_metadata", {}).get("useTokens"): + if not fts_access_token: + return S_ERROR("Job needs token support but no FTS token was supplied") + context = None + + if not context: + if not ftsServer: + ftsServer = self.ftsServer + res = self.generateContext(ftsServer, ucert, fts_access_token) + if not res["OK"]: + return res + context = res["Value"] + try: self.ftsGUID = fts3.submit(context, job) log.info(f"Got GUID {self.ftsGUID}") @@ -766,31 +837,45 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc return S_OK(fileIDsInTheJob) @staticmethod - def generateContext(ftsServer, ucert, lifetime=25200): + def generateContext(ftsServer, ucert, fts_access_token=None, lifetime=25200): """This method generates an fts3 context + Only a certificate or an fts token can be given + :param ftsServer: address of the fts3 server :param ucert: the path to the certificate to be used + :param fts_access_token: token to access FTS :param lifetime: duration (in sec) of the delegation to the FTS3 server (default is 7h, like FTS3 default) :returns: an fts3 context """ + if fts_access_token and ucert: + return S_ERROR("fts_access_token and ucert cannot be both set") + try: - context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False) + context = fts3.Context( + endpoint=ftsServer, + ucert=ucert, + request_class=ftsSSLRequest, + verify=False, + fts_access_token=fts_access_token, + ) - # Explicitely delegate to be sure we have the lifetime we want - # Note: the delegation will re-happen only when the FTS server - # decides that there is not enough timeleft. - # At the moment, this is 1 hour, which effectively means that if you do - # not submit a job for more than 1h, you have no valid proxy in FTS servers - # anymore, and all the jobs failed. So we force it when - # one third of the lifetime will be left. - # Also, the proxy given as parameter might have less than "lifetime" left - # since it is cached, but it does not matter, because in the FTS3Agent - # we make sure that we renew it often enough - td_lifetime = datetime.timedelta(seconds=lifetime) - fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3) + # The delegation only makes sense for X509 auth + if ucert: + # Explicitely delegate to be sure we have the lifetime we want + # Note: the delegation will re-happen only when the FTS server + # decides that there is not enough timeleft. + # At the moment, this is 1 hour, which effectively means that if you do + # not submit a job for more than 1h, you have no valid proxy in FTS servers + # anymore, and all the jobs failed. So we force it when + # one third of the lifetime will be left. + # Also, the proxy given as parameter might have less than "lifetime" left + # since it is cached, but it does not matter, because in the FTS3Agent + # we make sure that we renew it often enough + td_lifetime = datetime.timedelta(seconds=lifetime) + fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3) return S_OK(context) except FTS3ClientException as e: diff --git a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg index b36078cef1c..16c103c67cd 100644 --- a/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg +++ b/src/DIRAC/DataManagementSystem/ConfigTemplate.cfg @@ -152,6 +152,9 @@ Agents KickLimitPerCycle = 100 # Lifetime in sec of the Proxy we download to delegate to FTS3 (default 36h) ProxyLifetime = 129600 + # Whether we use tokens to submit jobs to FTS3 + # VERY EXPERIMENTAL + UseTokens = False } ##END FTS3Agent } diff --git a/src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py b/src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py index 74324bcbddc..8e4a1c9670f 100644 --- a/src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py +++ b/src/DIRAC/FrameworkSystem/Client/TokenManagerClient.py @@ -1,6 +1,7 @@ """ The TokenManagerClient is a class representing the client of the DIRAC :py:mod:`TokenManager ` service. """ + import time from DIRAC import S_OK, S_ERROR @@ -31,7 +32,6 @@ def __init__(self, **kwargs): self.__tokensCache = DictCache() self.idps = IdProviderFactory() - @gTokensSync def getToken( self, username: str = None, @@ -40,6 +40,70 @@ def getToken( audience: str = None, identityProvider: str = None, requiredTimeLeft: int = 0, + useCache: bool = True, + ): + """Get an access token for a user/group + + :param username: user name + :param userGroup: group name + :param scope: scope + :param audience: audience + :param identityProvider: identity Provider + :param requiredTimeLeft: required time + :param cacheToken: if True (default) save the token in cache. + Otherwise it is not cached but it avoids the lock + + :return: S_OK(dict)/S_ERROR() + """ + meth = self.getTokenWithCache if useCache else self.getTokenWithoutCache + + return meth( + username=username, + userGroup=userGroup, + scope=scope, + audience=audience, + identityProvider=identityProvider, + requiredTimeLeft=requiredTimeLeft, + ) + + def getTokenWithoutCache( + self, + username: str = None, + userGroup: str = None, + scope: list[str] = None, + audience: str = None, + identityProvider: str = None, + requiredTimeLeft: int = 0, + ): + """Get an access token for a user/group without caching it + + :param username: user name + :param userGroup: group name + :param scope: scope + :param audience: audience + :param identityProvider: identity Provider + :param requiredTimeLeft: required time + + :return: S_OK(dict)/S_ERROR() + """ + # Get an IdProvider Client instance + result = getIdProviderClient(userGroup, identityProvider) + if not result["OK"]: + return result + idpObj = result["Value"] + + # No token in cache: get a token from the server + return self.executeRPC(username, userGroup, scope, audience, idpObj.name, requiredTimeLeft, call="getToken") + + @gTokensSync + def getTokenWithCache( + self, + username: str = None, + userGroup: str = None, + scope: list[str] = None, + audience: str = None, + identityProvider: str = None, + requiredTimeLeft: int = 0, ): """Get an access token for a user/group keeping the local cache diff --git a/src/DIRAC/Resources/Storage/StorageBase.py b/src/DIRAC/Resources/Storage/StorageBase.py index bc707d3f865..8c793d39e86 100755 --- a/src/DIRAC/Resources/Storage/StorageBase.py +++ b/src/DIRAC/Resources/Storage/StorageBase.py @@ -39,6 +39,8 @@ import shutil import tempfile +from pathlib import Path + from DIRAC import S_OK, S_ERROR from DIRAC.Core.Utilities.Pfn import pfnparse, pfnunparse from DIRAC.Core.Utilities.ReturnValues import returnSingleResult @@ -460,3 +462,16 @@ def getOccupancy(self, **kwargs): finally: # Clean the temporary dir shutil.rmtree(tmpDirName) + + def getWLCGTokenPath(self, lfn: str, wlcgTokenBasePath: str) -> str: + """ + Returns the path expected to be in a WLCG token + It basically consists of ``basepath - tokenBasePath + LFN`` + The tokenBasePath is a configuration on the storage side. + + """ + + allDict = dict.fromkeys(["Protocol", "Host", "Port", "Path", "FileName", "Options"], "") + allDict.update({"Path": self.protocolParameters["Path"], "FileName": lfn.lstrip("/")}) + fullPath = pfnunparse(allDict)["Value"] + return Path(fullPath).relative_to(Path(wlcgTokenBasePath)) diff --git a/src/DIRAC/Resources/Storage/StorageElement.py b/src/DIRAC/Resources/Storage/StorageElement.py index b11e6382f2c..e7e8e1424dc 100755 --- a/src/DIRAC/Resources/Storage/StorageElement.py +++ b/src/DIRAC/Resources/Storage/StorageElement.py @@ -21,7 +21,7 @@ from DIRAC.Core.Utilities import DErrno from DIRAC.Core.Utilities.File import convertSizeUnits from DIRAC.Core.Utilities.List import getIndexInList -from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnSingleResult +from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, returnSingleResult, convertToReturnValue from DIRAC.Core.Utilities.TimeUtilities import toEpochMilliSeconds from DIRAC.Resources.Storage.StorageFactory import StorageFactory from DIRAC.Core.Utilities.Pfn import pfnparse @@ -294,6 +294,7 @@ def __init__(self, name, protocolSections=None, vo=None, hideExceptions=False): "isDirectory", "isFile", "getOccupancy", + "getWLCGTokenPath", ] self.okMethods = [ @@ -964,6 +965,25 @@ def getLFNFromURL(self, urls): # This is the generic wrapper for file operations # + @convertToReturnValue + def getWLCGTokenPath(self, lfn: str): + """ + EXPERIMENTAL + return the path to put in the token, relative to the vo path as configured + in the storage. + + """ + wlcgTokenBasePath = self.options.get("WLCGTokenBasePath") + if not wlcgTokenBasePath: + raise ValueError("WLCGTokenBasePath not configured") + + for storage in self.storages.values(): + try: + return storage.getWLCGTokenPath(lfn, wlcgTokenBasePath) + except Exception: + continue + raise RuntimeError("Could not get WLCGTokenPath") + def getURL(self, lfn, protocol=False, replicaDict=None): """execute 'getTransportURL' operation.