Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Oct 3, 2023
1 parent 2e92006 commit a9c68d0
Showing 1 changed file with 47 additions and 0 deletions.
47 changes: 47 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Service/SandboxStoreHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def initializeHandler(cls, serviceInfoDict):
def initializeRequest(self):
self.__backend = self.getCSOption("Backend", "local")
self.__localSEName = self.getCSOption("LocalSE", "SandboxSE")
self._useDiracXBackend = self.getCSOption("UseDiracXBackend", False)
self._maxUploadBytes = self.getCSOption("MaxSandboxSizeMiB", 10) * 1048576
if self.__backend.lower() == "local" or self.__backend == self.__localSEName:
self.__useLocalStorage = True
Expand Down Expand Up @@ -108,6 +109,52 @@ def _getFromClient(self, fileId, token, fileSize, fileHelper=None, data=""):
gLogger.info("Upload requested", f"for {aHash} [{extension}]")

credDict = self.getRemoteCredentials()

self._useDiracXBackend = True
if self._useDiracXBackend:
gLogger.info("Forwarding to DiracX")
with tempfile.TemporaryFile(mode="w+b") as tar_fh:
result = fileHelper.networkToDataSink(tar_fh, maxFileSize=self._maxUploadBytes)
if not result["OK"]:
return result
tar_fh.seek(0)

import hashlib
from diracx.client.models import SandboxInfo

hasher = hashlib.sha256()
while data := tar_fh.read(512 * 1024):
hasher.update(data)
checksum = hasher.hexdigest()
tar_fh.seek(0)
gLogger.debug("Sandbox checksum is", checksum)

sandbox_info = SandboxInfo(
checksum_algorithm="sha256",
checksum=checksum,
size=os.stat(tar_fh.fileno()).st_size,
format=extension,
)

with TheImpersonator(credDict) as client:
res = client.jobs.initiate_sandbox_upload(sandbox_info)

if res.url:
gLogger.debug("Uploading sandbox for", res.pfn)
files = {"file": ("file", tar_fh)}

response = requests.post(res.url, data=res.fields, files=files, timeout=300)

gLogger.debug("Sandbox uploaded", f"for {res.pfn} with status code {response.status_code}")
# TODO: Handle this error better
try:
response.raise_for_status()
except Exception as e:
return S_ERROR("Error uploading sandbox", repr(e))
else:
gLogger.debug("Sandbox already exists in storage backend", res.pfn)
return S_OK(res.pfn)

sbPath = self.__getSandboxPath(f"{aHash}.{extension}")
# Generate the location
result = self.__generateLocation(sbPath)
Expand Down

0 comments on commit a9c68d0

Please sign in to comment.