Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve support for remote executors #1885

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@ node_modules/

# Ignore mock database
**/*.sqlite

# Ignore virtual envs
*.venv
65 changes: 58 additions & 7 deletions covalent/_dispatcher_plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

from furl import furl

from covalent._file_transfer import File, FileTransfer

from .._api.apiclient import CovalentAPIClient as APIClient
from .._results_manager.result import Result
from .._results_manager.results_manager import get_result, get_result_manager
Expand All @@ -36,7 +38,7 @@
from .._shared_files.config import get_config
from .._shared_files.schemas.asset import AssetSchema
from .._shared_files.schemas.result import ResultSchema
from .._shared_files.utils import copy_file_locally, format_server_url
from .._shared_files.utils import format_server_url
from .._workflow.lattice import Lattice
from ..triggers import BaseTrigger
from .base import BaseDispatcher
Expand Down Expand Up @@ -521,7 +523,41 @@ def prepare_manifest(lattice, storage_path) -> ResultSchema:
"""Prepare a built-out lattice for submission"""

result_object = Result(lattice)
return serialize_result(result_object, storage_path)
manifest = serialize_result(result_object, storage_path)
LocalDispatcher.transfer_local_assets_to_remote(manifest, storage_path)
return manifest

@staticmethod
def transfer_local_assets_to_remote(manifest: ResultSchema, storage_path) -> ResultSchema:
"""Transfer assets from temporary staging directory to remote storage.

This will be used when building sublattice graphs in an
executor. The executor will deposit the workflow assets at a
location mutually agreed upon between the orchestrator and the
executor plugin.

"""
remote_uri_prefix = os.environ.get("COVALENT_STAGING_URI_PREFIX", None)
if not remote_uri_prefix:
return manifest

local_prefix = "file://"

assets = extract_assets(manifest)

for asset in assets:
# Don't upload empty files
if asset.size > 0:
local_object_key = asset.uri[len(local_prefix) :]
asset.remote_uri = f"{remote_uri_prefix}{local_object_key}"

LocalDispatcher._upload(assets)

for asset in assets:
asset.uri = asset.remote_uri
asset.remote_uri = ""

return manifest

@staticmethod
def register_manifest(
Expand Down Expand Up @@ -602,17 +638,17 @@ def _upload(assets: List[AssetSchema]):
if not asset.remote_uri or not asset.uri:
app_log.debug(f"Skipping asset {i + 1} out of {total}")
continue
if asset.remote_uri.startswith(local_scheme_prefix):
copy_file_locally(asset.uri, asset.remote_uri)
number_uploaded += 1
else:
_upload_asset(asset.uri, asset.remote_uri)
_transfer_asset(asset.uri, asset.remote_uri)
number_uploaded += 1
app_log.debug(f"Uploaded asset {i + 1} out of {total}.")
app_log.debug(f"uploaded {number_uploaded} assets.")


def _upload_asset(local_uri, remote_uri):
# Future improvement: we can probably fold this functionality
# into the HTTP file transfer strategy
def _put_asset(local_uri, remote_uri):
"""Upload asset to an http PUT endpoint."""
scheme_prefix = "file://"
if local_uri.startswith(scheme_prefix):
local_path = local_uri[len(scheme_prefix) :]
Expand All @@ -637,3 +673,18 @@ def _upload_asset(local_uri, remote_uri):

r = api_client.put(endpoint, headers={"Content-Length": str(filesize)}, data=data)
r.raise_for_status()


def _transfer_asset(local_uri, remote_uri):
"""Attempt to upload asset using a generalized file transfer."""

http_prefix = "http://"
https_prefix = "https://"
if remote_uri.startswith(http_prefix) or remote_uri.startswith(https_prefix):
_put_asset(local_uri, remote_uri)

else:
local_obj = File(local_uri)
remote_obj = File(remote_uri)
_, transfer_callable = FileTransfer(local_obj, remote_obj).cp()
transfer_callable()
36 changes: 28 additions & 8 deletions covalent/_file_transfer/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,37 @@

from .enums import FileTransferStrategyTypes, FtCallDepReturnValue, Order
from .file import File
from .strategies.gcloud_strategy import GCloud
from .strategies.http_strategy import HTTP
from .strategies.s3_strategy import S3
from .strategies.shutil_strategy import Shutil
from .strategies.transfer_strategy_base import FileTransferStrategy

# TODO: make this pluggable similar to executor plugins
_strategy_type_map = {
FileTransferStrategyTypes.Shutil: Shutil,
FileTransferStrategyTypes.S3: S3,
FileTransferStrategyTypes.HTTP: HTTP,
FileTransferStrategyTypes.GCloud: GCloud,
}


def _guess_transfer_strategy(from_file: File, to_file: File) -> FileTransferStrategy:
# Handle the following cases automatically
# Local-Remote (except HTTP destination)
# Remote-local
# Local-local

if (
from_file.mapped_strategy_type == FileTransferStrategyTypes.Shutil
and to_file.mapped_strategy_type != FileTransferStrategyTypes.HTTP
):
return _strategy_type_map[to_file.mapped_strategy_type]
elif to_file.mapped_strategy_type == FileTransferStrategyTypes.Shutil:
return _strategy_type_map[from_file.mapped_strategy_type]
else:
raise AttributeError("FileTransfer requires a file transfer strategy to be specified")


class FileTransfer:
"""
Expand Down Expand Up @@ -58,15 +85,8 @@ def __init__(
# assign explicit strategy or default to strategy based on from_file & to_file schemes
if strategy:
self.strategy = strategy
elif (
from_file.mapped_strategy_type == FileTransferStrategyTypes.Shutil
and to_file.mapped_strategy_type == FileTransferStrategyTypes.Shutil
):
self.strategy = Shutil()
elif from_file.mapped_strategy_type == FileTransferStrategyTypes.HTTP:
self.strategy = HTTP()
else:
raise AttributeError("FileTransfer requires a file transfer strategy to be specified")
self.strategy = _guess_transfer_strategy(from_file, to_file)()

self.to_file = to_file
self.from_file = from_file
Expand Down
2 changes: 2 additions & 0 deletions covalent/_file_transfer/strategies/shutil_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil

from .. import File
Expand Down Expand Up @@ -46,6 +47,7 @@ def cp(self, from_file: File, to_file: File = File()) -> None:
"""

def callable():
os.makedirs(os.path.dirname(to_file.filepath), exist_ok=True)
shutil.copyfile(from_file.filepath, to_file.filepath)

return callable
Expand Down
34 changes: 19 additions & 15 deletions covalent/_workflow/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,25 +865,29 @@ def _build_sublattice_graph(sub: Lattice, json_parent_metadata: str, *args, **kw
DISABLE_LEGACY_SUBLATTICES = os.environ.get("COVALENT_DISABLE_LEGACY_SUBLATTICES") == "1"

try:
# Attempt multistage sublattice dispatch. For now we require
# the executor to reach the Covalent server
parent_dispatch_id = os.environ["COVALENT_DISPATCH_ID"]
dispatcher_url = os.environ["COVALENT_DISPATCHER_URL"]
# Attempt multistage sublattice dispatch.

with tempfile.TemporaryDirectory(prefix="covalent-") as staging_path:
# Try depositing the assets in a location readable by Covalent and
# request Covalent to pull those assets.
staging_uri_prefix = os.environ.get("COVALENT_STAGING_URI_PREFIX", None)
manifest = LocalDispatcher.prepare_manifest(sub, staging_path)
recv_manifest = manifest

# If the executor can reach the Covalent server directly,
# submit the sublattice dispatch to Covalent but don't start it.
if not staging_uri_prefix:
parent_dispatch_id = os.environ["COVALENT_DISPATCH_ID"]
dispatcher_url = os.environ["COVALENT_DISPATCHER_URL"]
recv_manifest = LocalDispatcher.register_manifest(
manifest,
dispatcher_addr=dispatcher_url,
parent_dispatch_id=parent_dispatch_id,
push_assets=True,
)
LocalDispatcher.upload_assets(recv_manifest)

# Omit these two steps to return the manifest to Covalent and
# request the assets be pulled
recv_manifest = LocalDispatcher.register_manifest(
manifest,
dispatcher_addr=dispatcher_url,
parent_dispatch_id=parent_dispatch_id,
push_assets=True,
)
LocalDispatcher.upload_assets(recv_manifest)

return recv_manifest.model_dump_json()
return recv_manifest.model_dump_json()

except Exception as ex:
# Fall back to legacy sublattice handling
Expand Down
2 changes: 1 addition & 1 deletion covalent/executor/utils/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,8 @@ def run_task_group_alt(
task_ids = task_group_metadata["node_ids"]
gid = task_group_metadata["task_group_id"]

os.environ["COVALENT_STAGING_URI_PREFIX"] = f"file://{results_dir}/staging"
os.environ["COVALENT_DISPATCH_ID"] = dispatch_id
os.environ["COVALENT_DISPATCHER_URL"] = server_url

for i, task in enumerate(task_specs):
result_uri, stdout_uri, stderr_uri, qelectron_db_uri = output_uris[i]
Expand Down
6 changes: 4 additions & 2 deletions covalent_dispatcher/_core/data_modules/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,18 @@ def _get_all_assets(dispatch_id: str):
def _pull_assets(manifest: ResultSchema) -> None:
dispatch_id = manifest.metadata.dispatch_id
assets = _get_all_assets(dispatch_id)
futs = []
download_count = 0
for asset in assets["lattice"]:
if asset.remote_uri:
download_count += 1
asset.download(asset.remote_uri)

for asset in assets["nodes"]:
if asset.remote_uri:
download_count += 1
asset.download(asset.remote_uri)

app_log.debug(f"imported {len(futs)} assets for dispatch {dispatch_id}")
app_log.debug(f"imported {download_count} assets for dispatch {dispatch_id}")


async def import_manifest(
Expand Down
17 changes: 17 additions & 0 deletions tests/covalent_tests/file_transfer/file_transfer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
TransferToRemote,
)
from covalent._file_transfer.strategies.rsync_strategy import Rsync
from covalent._file_transfer.strategies.s3_strategy import S3
from covalent._file_transfer.strategies.shutil_strategy import Shutil


class TestFileTransfer:
Expand Down Expand Up @@ -109,3 +111,18 @@ def test_transfer_to_remote(self):

with pytest.raises(ValueError):
result = TransferToRemote("file:///home/one", "file:///home/one/", strategy=strategy)

def test_auto_transfer_strategy(self):
from_file = File("s3://bucket/object.pkl")
to_file = File("file:///tmp/object.pkl")
ft = FileTransfer(from_file, to_file)
assert type(ft.strategy) is S3

ft = FileTransfer(to_file, from_file)
assert type(ft.strategy) is S3

ft = FileTransfer(to_file, to_file)
assert type(ft.strategy) is Shutil

with pytest.raises(AttributeError):
_ = FileTransfer(from_file, from_file)
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class TestShutilStrategy:
MOCK_TO_FILEPATH = "/home/user/data.csv.bak"

def test_cp(self, mocker):
mocker.patch("os.makedirs")
mock_copyfile = mocker.patch("shutil.copyfile")
from_file = File(TestShutilStrategy.MOCK_FROM_FILEPATH)
to_file = File(TestShutilStrategy.MOCK_TO_FILEPATH)
Expand Down
48 changes: 48 additions & 0 deletions tests/covalent_tests/workflow/electron_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Unit tests for electron"""

import json
import tempfile
from unittest.mock import ANY, MagicMock

import flake8
Expand Down Expand Up @@ -142,6 +143,53 @@ def mock_register(manifest, *args, **kwargs):
assert lat.metadata.workflow_executor_data == parent_metadata["workflow_executor_data"]


def test_build_sublattice_graph_staging_uri(mocker):
"""Test that building a sublattice graph with staging uri."""

dispatch_id = "test_build_sublattice_graph_staging_uri"

@ct.electron
def task(x):
return x

@ct.lattice
def workflow(x):
return task(x)

parent_metadata = {
"executor": "parent_executor",
"executor_data": {},
"workflow_executor": "my_postprocessor",
"workflow_executor_data": {},
"hooks": {
"deps": {"bash": None, "pip": None},
"call_before": [],
"call_after": [],
},
"triggers": "mock-trigger",
"qelectron_data_exists": False,
"results_dir": None,
}

with tempfile.TemporaryDirectory() as tmp_dir:
mock_environ = {
"COVALENT_DISPATCH_ID": dispatch_id,
"COVALENT_STAGING_URI_PREFIX": f"file://{tmp_dir}",
}
mocker.patch("os.environ", mock_environ)
json_manifest = _build_sublattice_graph(workflow, json.dumps(parent_metadata), 1)

# Check that asset uris start with the staging prefix
manifest = ResultSchema.model_validate_json(json_manifest)
for key, asset in manifest.assets:
if asset.size > 0:
assert asset.uri.startswith(mock_environ["COVALENT_STAGING_URI_PREFIX"])

for key, asset in manifest.lattice.assets:
if asset.size > 0:
assert asset.uri.startswith(mock_environ["COVALENT_STAGING_URI_PREFIX"])


def test_build_sublattice_graph_fallback(mocker):
"""
Test falling back to monolithic sublattice dispatch
Expand Down
Loading