From 1e9aca1d8f0956ab172e9ad70f4e689844a5c6e2 Mon Sep 17 00:00:00 2001 From: Christophe Haen Date: Tue, 6 Feb 2024 22:26:03 +0100 Subject: [PATCH] cont --- .../DataManagementSystem/Agent/FTS3Agent.py | 27 ++++++++++--------- .../DataManagementSystem/Client/FTS3Job.py | 24 ++++++++++++----- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py index de325f5c572..42238d92bc5 100644 --- a/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py +++ b/src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py @@ -152,16 +152,6 @@ def getFTS3TokenContext(self, username, group, ftsServer, threadID): # We need a context (and so a token) valid for at least 10mn # such that FTS has time to refresh if not contextes.exists(idTuple, 10 * 60): - res = gTokenManager.getToken( - userGroup="lhcb_data", - requiredTimeLeft=3600, - scope=[f"fts"], - ) - if not res["OK"]: - return res - expires_in = res["Value"]["expires_in"] - access_token = res["Value"]["access_token"] - # We generate the context res = FTS3Job.generateContext(ftsServer, None, fts_access_token=access_token) @@ -196,8 +186,9 @@ def getFTS3Context(self, username, group, ftsServer, threadID): """ + # If we use tokens, basically no caching is possible if self.useTokens: - return self.getFTS3TokenContext(username, group, ftsServer, threadID) + return S_OK(None) log = gLogger.getSubLogger("getFTS3Context") @@ -535,7 +526,19 @@ def _treatOperation(self, operation): log.error("Could not select TPC list", repr(e)) continue - res = ftsJob.submit(context=context, protocols=tpcProtocols) + fts_access_token = None + if self.useTokens: + res = gTokenManager.getToken( + userGroup="lhcb_data", + requiredTimeLeft=3600, + scope=[f"fts"], + ) + if not res["OK"]: + return res + + fts_access_token = res["Value"]["access_token"] + + res = ftsJob.submit(context=context, protocols=tpcProtocols, fts_access_token=fts_access_token) if not res["OK"]: log.error("Could not submit FTS3Job", f"FTS3Operation {operation.operationID} : {res}") diff --git a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py index 18cf97727d3..3afd0abc5f0 100644 --- a/src/DIRAC/DataManagementSystem/Client/FTS3Job.py +++ b/src/DIRAC/DataManagementSystem/Client/FTS3Job.py @@ -324,6 +324,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N log = gLogger.getSubLogger(f"constructTransferJob/{self.operationID}/{self.sourceSE}_{self.targetSE}") isMultiHop = False + useToken = False # Check if it is a multiHop transfer if self.multiHopSE: @@ -510,6 +511,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N if not res["OK"]: return res dstToken = res["Value"]["access_token"] + useToken = True # because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433) # the checksum needs to be lowercase. It does not impact the other @@ -542,6 +544,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N "rmsReqID": self.rmsReqID, "sourceSE": self.sourceSE, "targetSE": self.targetSE, + "useToken": useToken, } if self.activity: @@ -701,7 +704,7 @@ def _constructStagingJob(self, pinTime, allLFNs, target_spacetoken): return S_OK((job, fileIDsInTheJob)) - def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None): + def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None, fts_access_token=None): """submit the job to the FTS server Some attributes are expected to be defined for the submission to work: @@ -731,11 +734,6 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc log = gLogger.getLocalSubLogger(f"submit/{self.operationID}/{self.sourceSE}_{self.targetSE}") - if not context: - if not ftsServer: - ftsServer = self.ftsServer - context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False) - # Construct the target SURL res = self.__fetchSpaceToken(self.targetSE, self.vo) if not res["OK"]: @@ -756,6 +754,20 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc job, fileIDsInTheJob = res["Value"] + # If we need a token, don't use the context given in parameter + if job["params"].get("job_metadata", {}).get("useTokens"): + if not fts_access_token: + return S_ERROR("Job needs token support but no FTS token was supplied") + context = None + + if not context: + if not ftsServer: + ftsServer = self.ftsServer + res = self.generateContext(ftsServer, ucert, fts_access_token) + if not res["OK"]: + return res + context = res["Value"] + try: self.ftsGUID = fts3.submit(context, job) log.info(f"Got GUID {self.ftsGUID}")