From a21f366221c3b8b88344b8046fa39f98cbb8129a Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Tue, 6 Feb 2024 13:15:14 +0100 Subject: [PATCH] cont --- .../DataManagementSystem/Agent/FTS3Agent.py | 40 ++++++++++++++ .../DataManagementSystem/Client/FTS3File.py | 1 + .../DataManagementSystem/Client/FTS3Job.py | 54 +++++++++++-------- 3 files changed, 73 insertions(+), 22 deletions(-) diff --git a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py index 4151a3fffb8..de325f5c572 100644 --- a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py +++ b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py @@ -38,6 +38,7 @@ from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername from DIRAC.FrameworkSystem.Client.Logger import gLogger from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager +from DIRAC.FrameworkSystem.Client.TokenManagerClient import gTokenManager from DIRAC.DataManagementSystem.private import FTS3Utilities from DIRAC.DataManagementSystem.DB.FTS3DB import FTS3DB from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job @@ -106,6 +107,8 @@ def __readConf(self): # lifetime of the proxy we download to delegate to FTS self.proxyLifetime = self.am_getOption("ProxyLifetime", PROXY_LIFETIME) + self.useTokens = self.am_getOption("UseTokens", False) + return S_OK() def initialize(self): @@ -138,6 +141,40 @@ def beginExecution(self): self.dataOpSender = DataOperationSender() return self.__readConf() + def getFTS3TokenContext(self, username, group, ftsServer, threadID): + log = gLogger.getSubLogger("getFTS3TokenContext") + + contextes = self._globalContextCache.setdefault(threadID, DictCache()) + + idTuple = (ftsServer,) + log.debug(f"Getting context for {idTuple}") + + # We need a context (and so a token) valid for at least 10mn + # such that FTS has time to refresh + if not contextes.exists(idTuple, 10 * 60): + res = gTokenManager.getToken( + userGroup="lhcb_data", + requiredTimeLeft=3600, + scope=[f"fts"], + ) + if not res["OK"]: + return res + expires_in = res["Value"]["expires_in"] + access_token = res["Value"]["access_token"] + + # We generate the context + + res = FTS3Job.generateContext(ftsServer, None, fts_access_token=access_token) + + if not res["OK"]: + return res + context = res["Value"] + + # we add it to the cache for this thread for 1h + contextes.add(idTuple, expires_in, context) + + return S_OK(contextes.get(idTuple)) + def getFTS3Context(self, username, group, ftsServer, threadID): """Returns an fts3 context for a given user, group and fts server @@ -159,6 +196,9 @@ def getFTS3Context(self, username, group, ftsServer, threadID): """ + if self.useTokens: + return self.getFTS3TokenContext(username, group, ftsServer, threadID) + log = gLogger.getSubLogger("getFTS3Context") contextes = self._globalContextCache.setdefault(threadID, DictCache()) diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3File.py b/src/DIRAC/DataManagementSystem/Client/FTS3File.py index 0b687300af3..bcd4e34a55f 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3File.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3File.py @@ -21,6 +21,7 @@ class FTS3File(JSerializable): "Started", # From FTS: File transfer has started "Not_used", # From FTS: Transfer not being considered yet, waiting for another one (multihop) "Archiving", # From FTS: file not yet migrated to tape + "Token_prep", # From FTS: When using token, used before Submitted until FTS fetched a refresh token ] # These are the states that we consider final. diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 41e4dfa7126..18cf97727d3 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -30,7 +30,7 @@ class FTS3Job(JSerializable): to an FTS3Operation """ - # alter table Jobs CHANGE `status` `status` enum('Submitted','Ready','Active','Finished','Canceled','Failed','Finisheddirty','Staging','Archiving', 'Token_prep') DEFAULT 'Submitted', + # `status` enum('Submitted','Ready','Active','Finished','Canceled','Failed','Finisheddirty','Staging','Archiving', 'Token_prep') DEFAULT 'Submitted', # START states # States from FTS doc https://fts3-docs.web.cern.ch/fts3-docs/docs/state_machine.html @@ -44,7 +44,6 @@ class FTS3Job(JSerializable): "Finisheddirty", # Some files Failed "Staging", # One of the files within a job went to Staging state "Archiving", # From FTS: one of the files within a job went to Archiving state - "Token_prep", # From FTS: When using token, used before Submitted until FTS fetched a refresh token ] FINAL_STATES = ["Canceled", "Failed", "Finished", "Finisheddirty"] @@ -425,7 +424,8 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log.debug(f"Not preparing transfer for file {ftsFile.lfn}") continue - srcToken, dstToken = None + srcToken = None + dstToken = None sourceSURL, targetSURL = allSrcDstSURLs[ftsFile.lfn] stageURL = allStageURLs.get(ftsFile.lfn) @@ -484,7 +484,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N trans_metadata["activity"] = self.activity # Add tokens if both storages support it - if self.__seTokenSupport(hopSrcSEName) and self.__seTokenSupport(hopDstSEName): + if self.__seTokenSupport(hopSrcSEName, self.vo) and self.__seTokenSupport(hopDstSEName, self.vo): res = srcSE.getWLCGTokenPath(ftsFile.lfn) if not res["OK"]: return res @@ -496,7 +496,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N ) if not res["OK"]: return res - srcToken = res["Value"] + srcToken = res["Value"]["access_token"] res = dstSE.getWLCGTokenPath(ftsFile.lfn) if not res["OK"]: @@ -505,11 +505,11 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N res = gTokenManager.getToken( userGroup="lhcb_data", requiredTimeLeft=3600, - scope=[f"storage.create:/{dstTokenPath}", "offline_access"], + scope=[f"storage.modify:/{dstTokenPath}", "offline_access"], ) if not res["OK"]: return res - dstToken = res["Value"] + dstToken = res["Value"]["access_token"] # because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433) # the checksum needs to be lowercase. It does not impact the other @@ -551,7 +551,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N transfers=transfers, overwrite=True, source_spacetoken=source_spacetoken, - spacetoken=target_spacetoken, + destination_spacetoken=target_spacetoken, bring_online=bring_online, copy_pin_lifetime=copy_pin_lifetime, retry=3, @@ -791,7 +791,7 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc return S_OK(fileIDsInTheJob) @staticmethod - def generateContext(ftsServer, ucert, lifetime=25200): + def generateContext(ftsServer, ucert, fts_access_token=None, lifetime=25200): """This method generates an fts3 context :param ftsServer: address of the fts3 server @@ -801,21 +801,31 @@ def generateContext(ftsServer, ucert, lifetime=25200): :returns: an fts3 context """ + if fts_access_token and ucert: + return S_ERROR("fts_access_token and ucert cannot be both set") + try: - context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False) + context = fts3.Context( + endpoint=ftsServer, + ucert=ucert, + request_class=ftsSSLRequest, + verify=False, + fts_access_token=fts_access_token, + ) - # Explicitely delegate to be sure we have the lifetime we want - # Note: the delegation will re-happen only when the FTS server - # decides that there is not enough timeleft. - # At the moment, this is 1 hour, which effectively means that if you do - # not submit a job for more than 1h, you have no valid proxy in FTS servers - # anymore, and all the jobs failed. So we force it when - # one third of the lifetime will be left. - # Also, the proxy given as parameter might have less than "lifetime" left - # since it is cached, but it does not matter, because in the FTS3Agent - # we make sure that we renew it often enough - td_lifetime = datetime.timedelta(seconds=lifetime) - fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3) + if ucert: + # Explicitely delegate to be sure we have the lifetime we want + # Note: the delegation will re-happen only when the FTS server + # decides that there is not enough timeleft. + # At the moment, this is 1 hour, which effectively means that if you do + # not submit a job for more than 1h, you have no valid proxy in FTS servers + # anymore, and all the jobs failed. So we force it when + # one third of the lifetime will be left. + # Also, the proxy given as parameter might have less than "lifetime" left + # since it is cached, but it does not matter, because in the FTS3Agent + # we make sure that we renew it often enough + td_lifetime = datetime.timedelta(seconds=lifetime) + fts3.delegate(context, lifetime=td_lifetime, delegate_when_lifetime_lt=td_lifetime // 3) return S_OK(context) except FTS3ClientException as e: