Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add DOWNLOAD staging action #3282

Merged
merged 8 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions examples/05_task_input_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,16 @@
# Here we don't use dict initialization.
td = rp.TaskDescription()
td.executable = '/usr/bin/wc'
td.arguments = ['-c', 'input.dat']
td.arguments = ['-c', 'input_1.dat', 'input_2.dat']
# td.input_staging = ['input.dat']

# this is a shortcut for:
td.input_staging = {'source': 'client:///input.dat',
'target': 'task:///input.dat',
'action': rp.TRANSFER}
td.input_staging = [{'source': 'client:///input.dat',
'target': 'task:///input_1.dat',
'action': rp.TRANSFER},
{'source': 'https://1.1.1.1/',
andre-merzky marked this conversation as resolved.
Show resolved Hide resolved
'target': 'task:///input_2.dat',
'action': rp.DOWNLOAD}]
tds.append(td)
report.progress()
report.ok('>>ok\n')
Expand Down
68 changes: 16 additions & 52 deletions src/radical/pilot/agent/staging_input/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self, cfg, session):
def initialize(self):

self._pwd = os.getcwd()
self._stager = rpu.StagingHelper(log=self._log)

self.register_input(rps.AGENT_STAGING_INPUT_PENDING,
rpc.AGENT_STAGING_INPUT_QUEUE, self.work)
Expand All @@ -67,7 +68,8 @@ def _work(self, tasks):
actionables = list()
for sd in task['description'].get('input_staging', []):

if sd['action'] in [rpc.LINK, rpc.COPY, rpc.MOVE, rpc.TARBALL]:
if sd['action'] in [rpc.LINK, rpc.COPY, rpc.MOVE,
rpc.TARBALL, rpc.DOWNLOAD]:
actionables.append(sd)

if actionables:
Expand Down Expand Up @@ -147,11 +149,12 @@ def _handle_task_staging(self, task, actionables):
did = sd['uid']
src = sd['source']
tgt = sd['target']
flags = sd.get('flags', 0)

self._prof.prof('staging_in_start', uid=uid, msg=did)

# agent stager only handles local actions
if action not in [rpc.COPY, rpc.LINK, rpc.MOVE]:
if action not in [rpc.COPY, rpc.LINK, rpc.MOVE, rpc.DOWNLOAD]:
self._prof.prof('staging_in_skip', uid=uid, msg=did)
continue

Expand All @@ -169,60 +172,13 @@ def _handle_task_staging(self, task, actionables):
elif os.path.exists(tgt.strip()) and os.path.isdir(tgt.strip()):
tgt = os.path.join(tgt, os.path.basename(src))


src = complete_url(src, src_context, self._log)
tgt = complete_url(tgt, tgt_context, self._log)

# Currently, we use the same schema for files and folders.
assert tgt.schema == 'file', 'staging tgt must be file://'

if action in [rpc.COPY, rpc.LINK, rpc.MOVE]:
assert src.schema == 'file', 'staging src expected as file://'

# implicitly create target dir if needed - but only for local ops
if action != rpc.TRANSFER:
tgtdir = os.path.dirname(tgt.path)
if tgtdir != task_sandbox.path:
self._log.debug("mkdir %s", tgtdir)
ru.rec_makedir(tgtdir)

if action == rpc.COPY:
try:
shutil.copytree(src.path, tgt.path)
except OSError as exc:
if exc.errno == errno.ENOTDIR:
shutil.copy(src.path, tgt.path)
else:
raise

elif action == rpc.LINK:

# Fix issue/1513 if link source is file and target is folder.
# should support POSIX standard where link is created
# with the same name as the source
if os.path.isfile(src.path) and os.path.isdir(tgt.path):
os.symlink(src.path,
'%s/%s' % (tgt.path, os.path.basename(src.path)))

else:
os.symlink(src.path, tgt.path)

elif action == rpc.MOVE:
shutil.move(src.path, tgt.path)

elif action == rpc.TRANSFER:

# NOTE: TRANSFER directives don't arrive here right now.
# FIXME: we only handle srm staging right now, and only for
# a specific target proxy. Other TRANSFER directives are
# left to tmgr input staging. We should use SAGA to
# attempt all staging ops which do not involve the client
# machine.
self._log.error('no transfer for %s -> %s', src, tgt)
self._prof.prof('staging_in_fail', uid=uid, msg=did)
raise NotImplementedError('unsupported transfer %s' % src)

elif action == rpc.TARBALL:
assert tgt.schema == 'file', 'staging tgt expected as file://'

if action == rpc.TARBALL:

# If somethig was staged via the tarball method, the tarball is
# extracted and then removed from the task folder. The target
Expand All @@ -238,6 +194,14 @@ def _handle_task_staging(self, task, actionables):
# FIXME: make tarball removal dependent on debug settings
# os.remove(os.path.dirname(tgt.path) + '/' + uid + '.tar')

else:

self._stager.handle_staging_directive({'source': src,
'target': tgt,
'action': action,
'flags' : flags})


self._prof.prof('staging_in_stop', uid=uid, msg=did)

# all staging is done -- pass on to the scheduler
Expand Down
36 changes: 6 additions & 30 deletions src/radical/pilot/agent/staging_output/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, cfg, session):
def initialize(self):

self._pwd = os.getcwd()
self._stager = rpu.StagingHelper(log=self._log)

self.register_input(rps.AGENT_STAGING_OUTPUT_PENDING,
rpc.AGENT_STAGING_OUTPUT_QUEUE, self.work)
Expand Down Expand Up @@ -311,37 +312,12 @@ def _handle_task_staging(self, task, actionables):
if action in [rpc.COPY, rpc.LINK, rpc.MOVE]:
assert tgt.schema == 'file', 'staging tgt expected as file://'

# implicitly create target dir if needed - but only for local ops
if action != rpc.TRANSFER:
tgtdir = os.path.dirname(tgt.path)
if tgtdir != task_sandbox.path:
self._log.debug("mkdir %s", tgtdir)
ru.rec_makedir(tgtdir)
self._stager.handle_staging_directive({'source': src,
'target': tgt,
'action': action,
'flags' : flags})


if action == rpc.COPY:
try:
shutil.copytree(src.path, tgt.path)
except OSError as exc:
if exc.errno == errno.ENOTDIR:
shutil.copy(src.path, tgt.path)
else:
raise

elif action == rpc.LINK:
# Fix issue/1513 if link source is file and target is folder
# should support POSIX standard where link is created
# with the same name as the source
if os.path.isfile(src.path) and os.path.isdir(tgt.path):
os.symlink(src.path,
os.path.join(tgt.path,
os.path.basename(src.path)))
else: # default behavior
os.symlink(src.path, tgt.path)
elif action == rpc.MOVE: shutil.move(src.path, tgt.path)
elif action == rpc.TRANSFER: pass
# This is currently never executed. Commenting it out.
# Uncomment and implement when uploads directly to remote URLs
# from tasks are supported.
self._prof.prof('staging_out_stop', uid=uid, msg=did)

# all agent staging is done -- pass on to tmgr output staging
Expand Down
1 change: 1 addition & 0 deletions src/radical/pilot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
COPY = 'Copy' # local cp
LINK = 'Link' # local ln -s
MOVE = 'Move' # local mv
DOWNLOAD = 'Download' # remote download by agent
TRANSFER = 'Transfer' # remote transfer from / to client
TARBALL = 'Tarball' # remote staging will be executed using a tarball.

Expand Down
3 changes: 2 additions & 1 deletion src/radical/pilot/task_description.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,11 @@ class TaskDescription(ru.TypedDict):

*Action operators*

- rp.TRANSFER : remote file transfer from `source` URL to `target` URL
- rp.TRANSFER : remote file transfer from `source` to `target` URL (client)
- rp.COPY : local file copy, i.e., not crossing host boundaries
- rp.MOVE : local file move
- rp.LINK : local file symlink
- rp.DOWNLOAD : fetch remote file from `source` URL to `target` URL (agent)

*Flags*

Expand Down
7 changes: 0 additions & 7 deletions src/radical/pilot/tmgr/staging_input/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,17 +318,10 @@ def _handle_task(self, task, actionables):
new_actionables.append(sd)

else:

action = sd['action']
did = sd['uid']
src = sd['source']
tgt = sd['target']

# client stager only handles remote actions
if action not in [rpc.TRANSFER, rpc.TARBALL]:
self._prof.prof('staging_in_skip', uid=uid, msg=did)
continue

src = complete_url(src, src_context, self._log)
tgt = complete_url(tgt, tgt_context, self._log)

Expand Down
22 changes: 19 additions & 3 deletions src/radical/pilot/utils/staging_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import radical.utils as ru

from ..constants import COPY, LINK, MOVE, TRANSFER
from ..constants import COPY, LINK, MOVE, TRANSFER, DOWNLOAD
from ..constants import TARBALL # , CREATE_PARENTS, RECURSIVE


Expand Down Expand Up @@ -42,6 +42,10 @@ def link(self, src, tgt, flags=None):
self._log.debug('link %s %s', src, tgt)
self._backend.link(src, tgt, flags)

def download(self, src, tgt, flags=None):
self._log.debug('download %s %s', src, tgt)
self._backend.download(src, tgt, flags)

def delete(self, tgt, flags=None):
self._log.debug('rm %s', tgt)
self._backend.delete(tgt, flags)
Expand All @@ -55,10 +59,9 @@ def handle_staging_directive(self, sd):
action = sd['action']
src = sd['source']
tgt = sd['target']
uid = sd.get('uid', '')
flags = sd.get('flags', 0)

assert action in [COPY, LINK, MOVE, TRANSFER]
assert action in [COPY, LINK, MOVE, TRANSFER, DOWNLOAD]

self._log.info('%-10s %s', action, src)
self._log.info('%-10s %s', '', tgt)
Expand All @@ -72,6 +75,9 @@ def handle_staging_directive(self, sd):
elif action == MOVE:
self.move(src, tgt, flags)

elif action in [DOWNLOAD]:
self.download(src, tgt, flags)


# ------------------------------------------------------------------------------
#
Expand Down Expand Up @@ -107,6 +113,11 @@ def link(self, src, tgt, flags):
self.mkdir(os.path.dirname(tgt), flags)
os.link(src, tgt)

def download(self, src, tgt, flags):
tgt = ru.Url(tgt).path
self.mkdir(os.path.dirname(tgt), flags)
ru.sh_callout('wget -r %s -O %s' % (src, tgt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am mistaken, but some Linux flavors might not have the wget command. So to avoid this, I think Python approach is the most guaranteed to work in most cases as Python interpreter must exist. So I would suggest to do the following:

import requests 

def download(self, src: str, tgt: str, flags:str) -> None:
    """Download the remote file to the current tgt directory."""
    response = requests.get(src, stream=True)

    tgt = ru.Url(tgt).path

    try:
       response.raise_for_status()  # Check if the download was successful
    except Exception as e:
       print(f'ERROR: skipping staging, download failed due to: {e}')
       return
   
    self.mkdir(os.path.dirname(tgt), flags)

    # Save the file content
    with open(tgt, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, we can use python for that, will change!


def delete(self, tgt, flags):
tgt = ru.Url(tgt).path
try : os.unlink(tgt)
Expand Down Expand Up @@ -170,6 +181,11 @@ def move(self, src, tgt, flags):
def link(self, src, tgt, flags):
assert self._has_saga

def download(self, src, tgt, flags):
assert self._has_saga

self.copy(src, tgt, flags)


def delete(self, tgt, flags):
assert self._has_saga
Expand Down
Loading