Skip to content

Commit

Permalink
cont
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Feb 6, 2024
1 parent a21f366 commit 1e9aca1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 18 deletions.
27 changes: 15 additions & 12 deletions src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,6 @@ def getFTS3TokenContext(self, username, group, ftsServer, threadID):
# We need a context (and so a token) valid for at least 10mn
# such that FTS has time to refresh
if not contextes.exists(idTuple, 10 * 60):
res = gTokenManager.getToken(
userGroup="lhcb_data",
requiredTimeLeft=3600,
scope=[f"fts"],
)
if not res["OK"]:
return res
expires_in = res["Value"]["expires_in"]
access_token = res["Value"]["access_token"]

# We generate the context

res = FTS3Job.generateContext(ftsServer, None, fts_access_token=access_token)
Expand Down Expand Up @@ -196,8 +186,9 @@ def getFTS3Context(self, username, group, ftsServer, threadID):
"""

# If we use tokens, basically no caching is possible
if self.useTokens:
return self.getFTS3TokenContext(username, group, ftsServer, threadID)
return S_OK(None)

log = gLogger.getSubLogger("getFTS3Context")

Expand Down Expand Up @@ -535,7 +526,19 @@ def _treatOperation(self, operation):
log.error("Could not select TPC list", repr(e))
continue

res = ftsJob.submit(context=context, protocols=tpcProtocols)
fts_access_token = None
if self.useTokens:
res = gTokenManager.getToken(
userGroup="lhcb_data",
requiredTimeLeft=3600,
scope=[f"fts"],
)
if not res["OK"]:
return res

fts_access_token = res["Value"]["access_token"]

res = ftsJob.submit(context=context, protocols=tpcProtocols, fts_access_token=fts_access_token)

if not res["OK"]:
log.error("Could not submit FTS3Job", f"FTS3Operation {operation.operationID} : {res}")
Expand Down
24 changes: 18 additions & 6 deletions src/DIRAC/DataManagementSystem/Client/FTS3Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
log = gLogger.getSubLogger(f"constructTransferJob/{self.operationID}/{self.sourceSE}_{self.targetSE}")

isMultiHop = False
useToken = False

# Check if it is a multiHop transfer
if self.multiHopSE:
Expand Down Expand Up @@ -510,6 +511,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
if not res["OK"]:
return res
dstToken = res["Value"]["access_token"]
useToken = True

# because of an xroot bug (https://github.com/xrootd/xrootd/issues/1433)
# the checksum needs to be lowercase. It does not impact the other
Expand Down Expand Up @@ -542,6 +544,7 @@ def _constructTransferJob(self, pinTime, allLFNs, target_spacetoken, protocols=N
"rmsReqID": self.rmsReqID,
"sourceSE": self.sourceSE,
"targetSE": self.targetSE,
"useToken": useToken,
}

if self.activity:
Expand Down Expand Up @@ -701,7 +704,7 @@ def _constructStagingJob(self, pinTime, allLFNs, target_spacetoken):

return S_OK((job, fileIDsInTheJob))

def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None):
def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protocols=None, fts_access_token=None):
"""submit the job to the FTS server
Some attributes are expected to be defined for the submission to work:
Expand Down Expand Up @@ -731,11 +734,6 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc

log = gLogger.getLocalSubLogger(f"submit/{self.operationID}/{self.sourceSE}_{self.targetSE}")

if not context:
if not ftsServer:
ftsServer = self.ftsServer
context = fts3.Context(endpoint=ftsServer, ucert=ucert, request_class=ftsSSLRequest, verify=False)

# Construct the target SURL
res = self.__fetchSpaceToken(self.targetSE, self.vo)
if not res["OK"]:
Expand All @@ -756,6 +754,20 @@ def submit(self, context=None, ftsServer=None, ucert=None, pinTime=36000, protoc

job, fileIDsInTheJob = res["Value"]

# If we need a token, don't use the context given in parameter
if job["params"].get("job_metadata", {}).get("useTokens"):
if not fts_access_token:
return S_ERROR("Job needs token support but no FTS token was supplied")
context = None

if not context:
if not ftsServer:
ftsServer = self.ftsServer
res = self.generateContext(ftsServer, ucert, fts_access_token)
if not res["OK"]:
return res
context = res["Value"]

try:
self.ftsGUID = fts3.submit(context, job)
log.info(f"Got GUID {self.ftsGUID}")
Expand Down

0 comments on commit 1e9aca1

Please sign in to comment.