diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 87388395c6..ca2c9b2f5f 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -508,10 +508,10 @@ def _register_file(self, parsl_file): if parsl_file.scheme == 'file' or \ (parsl_file.local_path and os.path.exists(parsl_file.local_path)): to_stage = not os.path.isabs(parsl_file.filepath) - return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, to_stage, to_cache) + return ParslFileToVine(parsl_file.filepath, parsl_file.filepath, stage=to_stage, cache=to_cache, protocol=parsl_file.scheme) else: # we must stage url and temp files - ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, True, to_cache) + ptv = ParslFileToVine(parsl_file.url, parsl_file.local_path, stage=True, cache=to_cache, protocol=parsl_file.scheme) return ptv def _std_output_to_vine(self, fdname, stdfspec): @@ -519,7 +519,7 @@ def _std_output_to_vine(self, fdname, stdfspec): return a ParslFileToVine with it. These files are never cached""" fname, mode = putils.get_std_fname_mode(fdname, stdfspec) to_stage = not os.path.isabs(fname) - return ParslFileToVine(fname, fname, stage=to_stage, cache=False) + return ParslFileToVine(fname, fname, stage=to_stage, cache=False, protocol="file") def _prepare_package(self, fn, extra_pkgs): """ Look at source code of apps to figure out their package depedencies diff --git a/parsl/executors/taskvine/manager.py b/parsl/executors/taskvine/manager.py index b7c6a1fd8a..5515eb3d2e 100644 --- a/parsl/executors/taskvine/manager.py +++ b/parsl/executors/taskvine/manager.py @@ -128,13 +128,13 @@ def _prepare_environment_regular(m, manager_config, t, task, poncho_env_to_file, t.add_environment(poncho_env_file) -def _handle_file_declaration_protocol(m, filename, cache): - if "taskvinetemp://" in filename: +def _handle_file_declaration_protocol(m, spec): + if "http" in spec.protocol: + return m.declare_url(spec.parsl_name, cache=spec.cache, peer_transfer=True) + elif spec.protocol == "taskvinetemp": return m.declare_temp() - elif "https://" in filename or "http://" in filename: - return m.declare_url(filename, cache=cache, peer_transfer=True) else: - return m.declare_file(filename, cache=cache, peer_transfer=True) + return m.declare_file(spec.parsl_name, cache=spec.cache, peer_transfer=True) @wrap_with_logs @@ -373,7 +373,7 @@ def _taskvine_submit_wait(ready_task_queue=None, if spec.parsl_name in parsl_file_name_to_vine_file: task_in_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_in_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) + task_in_file = _handle_file_declaration_protocol(m, spec) parsl_file_name_to_vine_file[spec.parsl_name] = task_in_file logger.debug("Adding input file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) if spec.remote_name == '': @@ -386,7 +386,7 @@ def _taskvine_submit_wait(ready_task_queue=None, if spec.parsl_name in parsl_file_name_to_vine_file: task_out_file = parsl_file_name_to_vine_file[spec.parsl_name] else: - task_out_file = _handle_file_declaration_protocol(m, spec.parsl_name, spec.cache) + task_out_file = _handle_file_declaration_protocol(m, spec) parsl_file_name_to_vine_file[spec.parsl_name] = task_out_file logger.debug("Adding output file {}, {} to TaskVine".format(spec.parsl_name, task.executor_id)) if spec.remote_name == '': diff --git a/parsl/executors/taskvine/stub_staging_provider.py b/parsl/executors/taskvine/taskvine_staging_provider.py similarity index 96% rename from parsl/executors/taskvine/stub_staging_provider.py rename to parsl/executors/taskvine/taskvine_staging_provider.py index 818d764295..68aef0049f 100644 --- a/parsl/executors/taskvine/stub_staging_provider.py +++ b/parsl/executors/taskvine/taskvine_staging_provider.py @@ -12,7 +12,7 @@ known_url_schemes = ["http", "https", "taskvinetemp"] -class StubStaging(Staging, RepresentationMixin): +class TaskVineStaging(Staging, RepresentationMixin): def can_stage_in(self, file): logger.debug("Task vine staging provider checking passthrough for {}".format(repr(file))) diff --git a/parsl/executors/taskvine/utils.py b/parsl/executors/taskvine/utils.py index 1a6e5643a1..e3f1d31782 100644 --- a/parsl/executors/taskvine/utils.py +++ b/parsl/executors/taskvine/utils.py @@ -71,12 +71,14 @@ def __init__(self, parsl_name: str, # name of file remote_name: str, # name of file if url stage: bool, # whether TaskVine should know about this file - cache: bool # whether TaskVine should cache this file + cache: bool, # whether TaskVine should cache this file + protocol: str, # protocol if url ): self.parsl_name = parsl_name self.stage = stage self.cache = cache self.remote_name = remote_name + self.protocol = protocol def run_parsl_function(map_file, function_file, argument_file, result_file): diff --git a/parsl/tests/configs/taskvine_ex.py b/parsl/tests/configs/taskvine_ex.py index 18937a152d..f42ed9ccbb 100644 --- a/parsl/tests/configs/taskvine_ex.py +++ b/parsl/tests/configs/taskvine_ex.py @@ -4,10 +4,10 @@ from parsl.data_provider.http import HTTPInTaskStaging from parsl.data_provider.zip import ZipFileStaging from parsl.executors.taskvine import TaskVineExecutor, TaskVineManagerConfig -from parsl.executors.taskvine.stub_staging_provider import StubStaging +from parsl.executors.taskvine.taskvine_staging_provider import TaskVineStaging def fresh_config(): return Config(executors=[TaskVineExecutor(manager_config=TaskVineManagerConfig(port=9000), worker_launch_method='factory', - storage_access=[FTPInTaskStaging(), StubStaging(), NoOpFileStaging(), ZipFileStaging()])]) + storage_access=[FTPInTaskStaging(), TaskVineStaging(), NoOpFileStaging(), ZipFileStaging()])])