From 40cfdb861f221fe56c13c447aa3671cb3fcee428 Mon Sep 17 00:00:00 2001 From: Casey Jao Date: Wed, 6 Dec 2023 11:21:07 -0500 Subject: [PATCH] Add framework for building sublattices inside new-style executors `LocalDispatcher.prepare_manifest()` will attempt to copy the task artifacts to `COVALENT_STAGING_URI_PREFIX` if that environment variable is set. --- covalent/_dispatcher_plugins/local.py | 65 +++++++++++++++++-- .../strategies/shutil_strategy.py | 2 + covalent/_workflow/electron.py | 34 +++++----- covalent/executor/utils/wrappers.py | 2 +- .../_core/data_modules/importer.py | 6 +- .../strategies/shutil_strategy_test.py | 1 + .../covalent_tests/workflow/electron_test.py | 48 ++++++++++++++ 7 files changed, 133 insertions(+), 25 deletions(-) diff --git a/covalent/_dispatcher_plugins/local.py b/covalent/_dispatcher_plugins/local.py index 9857342cf..d75166a9b 100644 --- a/covalent/_dispatcher_plugins/local.py +++ b/covalent/_dispatcher_plugins/local.py @@ -23,6 +23,8 @@ from furl import furl +from covalent._file_transfer import File, FileTransfer + from .._api.apiclient import CovalentAPIClient as APIClient from .._results_manager.result import Result from .._results_manager.results_manager import get_result, get_result_manager @@ -36,7 +38,7 @@ from .._shared_files.config import get_config from .._shared_files.schemas.asset import AssetSchema from .._shared_files.schemas.result import ResultSchema -from .._shared_files.utils import copy_file_locally, format_server_url +from .._shared_files.utils import format_server_url from .._workflow.lattice import Lattice from ..triggers import BaseTrigger from .base import BaseDispatcher @@ -521,7 +523,41 @@ def prepare_manifest(lattice, storage_path) -> ResultSchema: """Prepare a built-out lattice for submission""" result_object = Result(lattice) - return serialize_result(result_object, storage_path) + manifest = serialize_result(result_object, storage_path) + LocalDispatcher.transfer_local_assets_to_remote(manifest, storage_path) + return manifest + + @staticmethod + def transfer_local_assets_to_remote(manifest: ResultSchema, storage_path) -> ResultSchema: + """Transfer assets from temporary staging directory to remote storage. + + This will be used when building sublattice graphs in an + executor. The executor will deposit the workflow assets at a + location mutually agreed upon between the orchestrator and the + executor plugin. + + """ + remote_uri_prefix = os.environ.get("COVALENT_STAGING_URI_PREFIX", None) + if not remote_uri_prefix: + return manifest + + local_prefix = "file://" + + assets = extract_assets(manifest) + + for asset in assets: + # Don't upload empty files + if asset.size > 0: + local_object_key = asset.uri[len(local_prefix) :] + asset.remote_uri = f"{remote_uri_prefix}{local_object_key}" + + LocalDispatcher._upload(assets) + + for asset in assets: + asset.uri = asset.remote_uri + asset.remote_uri = "" + + return manifest @staticmethod def register_manifest( @@ -602,17 +638,17 @@ def _upload(assets: List[AssetSchema]): if not asset.remote_uri or not asset.uri: app_log.debug(f"Skipping asset {i + 1} out of {total}") continue - if asset.remote_uri.startswith(local_scheme_prefix): - copy_file_locally(asset.uri, asset.remote_uri) - number_uploaded += 1 else: - _upload_asset(asset.uri, asset.remote_uri) + _transfer_asset(asset.uri, asset.remote_uri) number_uploaded += 1 app_log.debug(f"Uploaded asset {i + 1} out of {total}.") app_log.debug(f"uploaded {number_uploaded} assets.") -def _upload_asset(local_uri, remote_uri): +# Future improvement: we can probably fold this functionality +# into the HTTP file transfer strategy +def _put_asset(local_uri, remote_uri): + """Upload asset to an http PUT endpoint.""" scheme_prefix = "file://" if local_uri.startswith(scheme_prefix): local_path = local_uri[len(scheme_prefix) :] @@ -637,3 +673,18 @@ def _upload_asset(local_uri, remote_uri): r = api_client.put(endpoint, headers={"Content-Length": str(filesize)}, data=data) r.raise_for_status() + + +def _transfer_asset(local_uri, remote_uri): + """Attempt to upload asset using a generalized file transfer.""" + + http_prefix = "http://" + https_prefix = "https://" + if remote_uri.startswith(http_prefix) or remote_uri.startswith(https_prefix): + _put_asset(local_uri, remote_uri) + + else: + local_obj = File(local_uri) + remote_obj = File(remote_uri) + _, transfer_callable = FileTransfer(local_obj, remote_obj).cp() + transfer_callable() diff --git a/covalent/_file_transfer/strategies/shutil_strategy.py b/covalent/_file_transfer/strategies/shutil_strategy.py index 319d47d04..b2585a5ab 100644 --- a/covalent/_file_transfer/strategies/shutil_strategy.py +++ b/covalent/_file_transfer/strategies/shutil_strategy.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import shutil from .. import File @@ -46,6 +47,7 @@ def cp(self, from_file: File, to_file: File = File()) -> None: """ def callable(): + os.makedirs(os.path.dirname(to_file.filepath), exist_ok=True) shutil.copyfile(from_file.filepath, to_file.filepath) return callable diff --git a/covalent/_workflow/electron.py b/covalent/_workflow/electron.py index e6a6e4648..2f492c070 100644 --- a/covalent/_workflow/electron.py +++ b/covalent/_workflow/electron.py @@ -865,25 +865,29 @@ def _build_sublattice_graph(sub: Lattice, json_parent_metadata: str, *args, **kw DISABLE_LEGACY_SUBLATTICES = os.environ.get("COVALENT_DISABLE_LEGACY_SUBLATTICES") == "1" try: - # Attempt multistage sublattice dispatch. For now we require - # the executor to reach the Covalent server - parent_dispatch_id = os.environ["COVALENT_DISPATCH_ID"] - dispatcher_url = os.environ["COVALENT_DISPATCHER_URL"] + # Attempt multistage sublattice dispatch. with tempfile.TemporaryDirectory(prefix="covalent-") as staging_path: + # Try depositing the assets in a location readable by Covalent and + # request Covalent to pull those assets. + staging_uri_prefix = os.environ.get("COVALENT_STAGING_URI_PREFIX", None) manifest = LocalDispatcher.prepare_manifest(sub, staging_path) + recv_manifest = manifest + + # If the executor can reach the Covalent server directly, + # submit the sublattice dispatch to Covalent but don't start it. + if not staging_uri_prefix: + parent_dispatch_id = os.environ["COVALENT_DISPATCH_ID"] + dispatcher_url = os.environ["COVALENT_DISPATCHER_URL"] + recv_manifest = LocalDispatcher.register_manifest( + manifest, + dispatcher_addr=dispatcher_url, + parent_dispatch_id=parent_dispatch_id, + push_assets=True, + ) + LocalDispatcher.upload_assets(recv_manifest) - # Omit these two steps to return the manifest to Covalent and - # request the assets be pulled - recv_manifest = LocalDispatcher.register_manifest( - manifest, - dispatcher_addr=dispatcher_url, - parent_dispatch_id=parent_dispatch_id, - push_assets=True, - ) - LocalDispatcher.upload_assets(recv_manifest) - - return recv_manifest.model_dump_json() + return recv_manifest.model_dump_json() except Exception as ex: # Fall back to legacy sublattice handling diff --git a/covalent/executor/utils/wrappers.py b/covalent/executor/utils/wrappers.py index 8f7fd8256..fb8a5ae0a 100644 --- a/covalent/executor/utils/wrappers.py +++ b/covalent/executor/utils/wrappers.py @@ -365,8 +365,8 @@ def run_task_group_alt( task_ids = task_group_metadata["node_ids"] gid = task_group_metadata["task_group_id"] + os.environ["COVALENT_STAGING_URI_PREFIX"] = f"file://{results_dir}/staging" os.environ["COVALENT_DISPATCH_ID"] = dispatch_id - os.environ["COVALENT_DISPATCHER_URL"] = server_url for i, task in enumerate(task_specs): result_uri, stdout_uri, stderr_uri, qelectron_db_uri = output_uris[i] diff --git a/covalent_dispatcher/_core/data_modules/importer.py b/covalent_dispatcher/_core/data_modules/importer.py index 4630b9544..196afd517 100644 --- a/covalent_dispatcher/_core/data_modules/importer.py +++ b/covalent_dispatcher/_core/data_modules/importer.py @@ -82,16 +82,18 @@ def _get_all_assets(dispatch_id: str): def _pull_assets(manifest: ResultSchema) -> None: dispatch_id = manifest.metadata.dispatch_id assets = _get_all_assets(dispatch_id) - futs = [] + download_count = 0 for asset in assets["lattice"]: if asset.remote_uri: + download_count += 1 asset.download(asset.remote_uri) for asset in assets["nodes"]: if asset.remote_uri: + download_count += 1 asset.download(asset.remote_uri) - app_log.debug(f"imported {len(futs)} assets for dispatch {dispatch_id}") + app_log.debug(f"imported {download_count} assets for dispatch {dispatch_id}") async def import_manifest( diff --git a/tests/covalent_tests/file_transfer/strategies/shutil_strategy_test.py b/tests/covalent_tests/file_transfer/strategies/shutil_strategy_test.py index 654a2bc11..5dab32488 100644 --- a/tests/covalent_tests/file_transfer/strategies/shutil_strategy_test.py +++ b/tests/covalent_tests/file_transfer/strategies/shutil_strategy_test.py @@ -26,6 +26,7 @@ class TestShutilStrategy: MOCK_TO_FILEPATH = "/home/user/data.csv.bak" def test_cp(self, mocker): + mocker.patch("os.makedirs") mock_copyfile = mocker.patch("shutil.copyfile") from_file = File(TestShutilStrategy.MOCK_FROM_FILEPATH) to_file = File(TestShutilStrategy.MOCK_TO_FILEPATH) diff --git a/tests/covalent_tests/workflow/electron_test.py b/tests/covalent_tests/workflow/electron_test.py index 327673b6f..e3e1db1a8 100644 --- a/tests/covalent_tests/workflow/electron_test.py +++ b/tests/covalent_tests/workflow/electron_test.py @@ -17,6 +17,7 @@ """Unit tests for electron""" import json +import tempfile from unittest.mock import ANY, MagicMock import flake8 @@ -142,6 +143,53 @@ def mock_register(manifest, *args, **kwargs): assert lat.metadata.workflow_executor_data == parent_metadata["workflow_executor_data"] +def test_build_sublattice_graph_staging_uri(mocker): + """Test that building a sublattice graph with staging uri.""" + + dispatch_id = "test_build_sublattice_graph_staging_uri" + + @ct.electron + def task(x): + return x + + @ct.lattice + def workflow(x): + return task(x) + + parent_metadata = { + "executor": "parent_executor", + "executor_data": {}, + "workflow_executor": "my_postprocessor", + "workflow_executor_data": {}, + "hooks": { + "deps": {"bash": None, "pip": None}, + "call_before": [], + "call_after": [], + }, + "triggers": "mock-trigger", + "qelectron_data_exists": False, + "results_dir": None, + } + + with tempfile.TemporaryDirectory() as tmp_dir: + mock_environ = { + "COVALENT_DISPATCH_ID": dispatch_id, + "COVALENT_STAGING_URI_PREFIX": f"file://{tmp_dir}", + } + mocker.patch("os.environ", mock_environ) + json_manifest = _build_sublattice_graph(workflow, json.dumps(parent_metadata), 1) + + # Check that asset uris start with the staging prefix + manifest = ResultSchema.model_validate_json(json_manifest) + for key, asset in manifest.assets: + if asset.size > 0: + assert asset.uri.startswith(mock_environ["COVALENT_STAGING_URI_PREFIX"]) + + for key, asset in manifest.lattice.assets: + if asset.size > 0: + assert asset.uri.startswith(mock_environ["COVALENT_STAGING_URI_PREFIX"]) + + def test_build_sublattice_graph_fallback(mocker): """ Test falling back to monolithic sublattice dispatch