Skip to content

Commit

Permalink
change provider name and protocol handler
Browse files Browse the repository at this point in the history
  • Loading branch information
colinthomas-z80 committed Aug 21, 2024
1 parent ac34607 commit 999ed61
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 14 deletions.
6 changes: 3 additions & 3 deletions parsl/executors/taskvine/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,18 +508,18 @@ def _register_file(self, parsl_file):
if parsl_file.scheme == 'file' or \
(parsl_file.local_path and os.path.exists(parsl_file.local_path)):
to_stage = not os.path.isabs(parsl_file.filepath)
return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, to_stage, to_cache)
return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, stage=to_stage, cache=to_cache, protocol=parsl_file.scheme)
else:
# we must stage url and temp files
ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, True, to_cache)
ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, stage=True, cache=to_cache, protocol=parsl_file.scheme)
return ptv

def _std_output_to_vine(self, fdname, stdfspec):
"""Find the name of the file that will contain stdout or stderr and
return a ParslFileToVine with it. These files are never cached"""
fname, mode = putils.get_std_fname_mode(fdname, stdfspec)
to_stage = not os.path.isabs(fname)
return ParslFileToVine(fname, fname, stage=to_stage, cache=False)
return ParslFileToVine(fname, fname, stage=to_stage, cache=False, protocol="file")

def _prepare_package(self, fn, extra_pkgs):
""" Look at source code of apps to figure out their package depedencies
Expand Down
14 changes: 7 additions & 7 deletions parsl/executors/taskvine/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,13 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file,
t.add_environment(poncho_env_file)


def _handle_file_declaration_protocol(m, filename, cache):
if "taskvinetemp://" in filename:
def _handle_file_declaration_protocol(m, spec):
if "http" in spec.protocol:
return m.declare_url(spec.parsl_name, cache=spec.cache, peer_transfer=True)
elif spec.protocol == "taskvinetemp":
return m.declare_temp()
elif "https://" in filename or "http://" in filename:
return m.declare_url(filename, cache=cache, peer_transfer=True)
else:
return m.declare_file(filename, cache=cache, peer_transfer=True)
return m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True)


@wrap_with_logs
Expand Down Expand Up @@ -373,7 +373,7 @@ def _taskvine_submit_wait(ready_task_queue=None,
if spec.parsl_name in parsl_file_name_to_vine_file:
task_in_file = parsl_file_name_to_vine_file[spec.parsl_name]
else:
task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache)
task_in_file = _handle_file_declaration_protocol(m, spec)
parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file
logger.debug("Adding input file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id))
if spec.remote_name == '':
Expand All @@ -386,7 +386,7 @@ def _taskvine_submit_wait(ready_task_queue=None,
if spec.parsl_name in parsl_file_name_to_vine_file:
task_out_file = parsl_file_name_to_vine_file[spec.parsl_name]
else:
task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache)
task_out_file = _handle_file_declaration_protocol(m, spec)
parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file
logger.debug("Adding output file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id))
if spec.remote_name == '':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
known_url_schemes = ["http", "https", "taskvinetemp"]


class StubStaging(Staging, RepresentationMixin):
class TaskVineStaging(Staging, RepresentationMixin):

def can_stage_in(self, file):
logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file)))
Expand Down
4 changes: 3 additions & 1 deletion parsl/executors/taskvine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ def __init__(self,
parsl_name: str, # name of file
remote_name: str, # name of file if url
stage: bool, # whether TaskVine should know about this file
cache: bool # whether TaskVine should cache this file
cache: bool, # whether TaskVine should cache this file
protocol: str, # protocol if url
):
self.parsl_name = parsl_name
self.stage = stage
self.cache = cache
self.remote_name = remote_name
self.protocol = protocol


def run_parsl_function(map_file, function_file, argument_file, result_file):
Expand Down
4 changes: 2 additions & 2 deletions parsl/tests/configs/taskvine_ex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from parsl.data_provider.http import HTTPInTaskStaging
from parsl.data_provider.zip import ZipFileStaging
from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig
from parsl.executors.taskvine.stub_staging_provider import StubStaging
from parsl.executors.taskvine.taskvine_staging_provider import TaskVineStaging


def fresh_config():
return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000),
worker_launch_method='factory',
storage_access=[FTPInTaskStaging(), StubStaging(), NoOpFileStaging(), ZipFileStaging()])])
storage_access=[FTPInTaskStaging(), TaskVineStaging(), NoOpFileStaging(), ZipFileStaging()])])

0 comments on commit 999ed61

Please sign in to comment.