Skip to content

Commit

Permalink
feat: Enable downloading and uploading sandbox via DiracX
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen authored and chrisburr committed Oct 6, 2023
1 parent 1c5bdac commit c43382b
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 4 deletions.
6 changes: 6 additions & 0 deletions integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ def install_server():

# This runs a continuous loop that exports the config in yaml
# for the diracx container to use
# It needs to be started and running before the DIRAC server installation
# because after installing the databases, the install server script
# calls dirac-login.
# At this point we need the new CS to have been updated
# already else the token exchange fails.

typer.secho("Starting configuration export loop for diracx", fg=c.GREEN)
base_cmd = _build_docker_cmd("server", tty=False, daemon=True, use_root=True)
subprocess.run(
Expand Down
77 changes: 77 additions & 0 deletions src/DIRAC/FrameworkSystem/Utilities/diracx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# pylint: disable=import-error

import requests

from cachetools import TTLCache, cached
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any
from DIRAC import gConfig
from DIRAC.ConfigurationSystem.Client.Helpers import Registry


from diracx.core.preferences import DiracxPreferences

from diracx.core.utils import write_credentials

from diracx.core.models import TokenResponse
from diracx.client import DiracClient

# How long tokens are kept
DEFAULT_TOKEN_CACHE_TTL = 5 * 60

# Add a cache not to query the token all the time
_token_cache = TTLCache(maxsize=100, ttl=DEFAULT_TOKEN_CACHE_TTL)


@cached(_token_cache, key=lambda x, y: repr(x))
def _get_token(credDict, diracxUrl, /) -> Path:
"""
Write token to a temporary file and return the path to that file
"""

apiKey = gConfig.getValue("/DiracX/LegacyExchangeApiKey")
if not apiKey:
raise ValueError("Missing mandatory /DiracX/LegacyExchangeApiKey configuration")

vo = Registry.getVOForGroup(credDict["group"])
dirac_properties = list(set(credDict.get("groupProperties", [])) | set(credDict.get("properties", [])))
group = credDict["group"]

scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties]

r = requests.get(
f"{diracxUrl}/auth/legacy-exchange",
params={
"preferred_username": credDict["username"],
"scope": " ".join(scopes),
},
headers={"Authorization": f"Bearer {apiKey}"},
timeout=10,
)

r.raise_for_status()

token_location = Path(NamedTemporaryFile().name)

write_credentials(TokenResponse(**r.json()), location=token_location)

return token_location


def TheImpersonator(credDict: dict[str, Any]) -> DiracClient:
"""
Client to be used by DIRAC server needing to impersonate
a user for diracx.
It queries a token, places it in a file, and returns the `DiracClient`
class
Use as a context manager
"""

diracxUrl = gConfig.getValue("/DiracX/URL")
token_location = _get_token(credDict, diracxUrl)
pref = DiracxPreferences(url=diracxUrl, credentials_path=token_location)

return DiracClient(diracx_preferences=pref)
4 changes: 1 addition & 3 deletions src/DIRAC/Resources/Storage/StorageBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,7 @@ def constructURLFromLFN(self, lfn, withWSUrl=False):
# 2. VO name must not appear as any subdirectory or file name
lfnSplitList = lfn.split("/")
voLFN = lfnSplitList[1]
# TODO comparison to Sandbox below is for backward compatibility, should
# be removed in the next release
if voLFN != self.se.vo and voLFN != "SandBox" and voLFN != "Sandbox":
if voLFN != self.se.vo and voLFN != "SandBox" and voLFN != "S3":
return S_ERROR(f"LFN ({lfn}) path must start with VO name ({self.se.vo})")

urlDict = dict(self.protocolParameters)
Expand Down
2 changes: 2 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ Services
SandboxPrefix = Sandbox
BasePath = /opt/dirac/storage/sandboxes
DelayedExternalDeletion = True
# If true, uploads the sandbox via diracx on an S3 storage
UseDiracXBackend = False
Authorization
{
Default = authenticated
Expand Down
68 changes: 68 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""
import hashlib
import os
import requests
import tempfile
import threading
import time
Expand Down Expand Up @@ -49,6 +50,7 @@ def initializeHandler(cls, serviceInfoDict):
def initializeRequest(self):
self.__backend = self.getCSOption("Backend", "local")
self.__localSEName = self.getCSOption("LocalSE", "SandboxSE")
self._useDiracXBackend = self.getCSOption("UseDiracXBackend", False)
self._maxUploadBytes = self.getCSOption("MaxSandboxSizeMiB", 10) * 1048576
if self.__backend.lower() == "local" or self.__backend == self.__localSEName:
self.__useLocalStorage = True
Expand Down Expand Up @@ -106,6 +108,51 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
gLogger.info("Upload requested", f"for {aHash} [{extension}]")

credDict = self.getRemoteCredentials()

if self._useDiracXBackend:
from DIRAC.FrameworkSystem.Utilities.diracx import TheImpersonator
from diracx.client.models import SandboxInfo # pylint: disable=import-error

gLogger.info("Forwarding to DiracX")
with tempfile.TemporaryFile(mode="w+b") as tar_fh:
result = fileHelper.networkToDataSink(tar_fh, maxFileSize=self._maxUploadBytes)
if not result["OK"]:
return result
tar_fh.seek(0)

hasher = hashlib.sha256()
while data := tar_fh.read(512 * 1024):
hasher.update(data)
checksum = hasher.hexdigest()
tar_fh.seek(0)
gLogger.debug("Sandbox checksum is", checksum)

sandbox_info = SandboxInfo(
checksum_algorithm="sha256",
checksum=checksum,
size=os.stat(tar_fh.fileno()).st_size,
format=extension,
)

with TheImpersonator(credDict) as client:
res = client.jobs.initiate_sandbox_upload(sandbox_info)

if res.url:
gLogger.debug("Uploading sandbox for", res.pfn)
files = {"file": ("file", tar_fh)}

response = requests.post(res.url, data=res.fields, files=files, timeout=300)

gLogger.debug("Sandbox uploaded", f"for {res.pfn} with status code {response.status_code}")
# TODO: Handle this error better
try:
response.raise_for_status()
except Exception as e:
return S_ERROR("Error uploading sandbox", repr(e))
else:
gLogger.debug("Sandbox already exists in storage backend", res.pfn)
return S_OK(res.pfn)

sbPath = self.__getSandboxPath(f"{aHash}.{extension}")
# Generate the location
result = self.__generateLocation(sbPath)
Expand Down Expand Up @@ -431,6 +478,27 @@ def _sendToClient(self, fileID, token, fileHelper=None, raw=False):
credDict = self.getRemoteCredentials()
serviceURL = self.serviceInfoDict["URL"]
filePath = fileID.replace(serviceURL, "")

# If the PFN starts with S3, we know it has been uploaded to the
# S3 sandbox store, so download it from there before sending it
if filePath.startswith("/S3"):
from DIRAC.FrameworkSystem.Utilities.diracx import TheImpersonator

with TheImpersonator(credDict) as client:
res = client.jobs.get_sandbox_file(pfn=filePath)
r = requests.get(res.url)
r.raise_for_status()
sbData = r.content
if fileHelper:
from io import BytesIO

result = fileHelper.DataSourceToNetwork(BytesIO(sbData))
# fileHelper.oFile.close()
return result
if raw:
return sbData
return S_OK(sbData)

result = self.sandboxDB.getSandboxId(self.__localSEName, filePath, credDict["username"], credDict["group"])
if not result["OK"]:
return result
Expand Down
2 changes: 1 addition & 1 deletion tests/Jenkins/utilities.sh
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ diracProxies() {

# Make sure DiracX is running
# And make sure it was synced
if [[ -n $DIRACX_URL ]]; then
if [[ -n $TEST_DIRACX ]]; then
echo "Waiting for for DiracX to be available" >&2
for i in {1..100}; do
if dirac-login -C "${SERVERINSTALLDIR}/user/client.pem" -K "${SERVERINSTALLDIR}/user/client.key" -T 72 "${DEBUG}"; then
Expand Down

0 comments on commit c43382b

Please sign in to comment.