Skip to content

Commit

Permalink
Merge branch 'develop' into 1539-inevitable-recursion-error-from-insi…
Browse files Browse the repository at this point in the history
…de-get_result
  • Loading branch information
ArunPsiog committed Dec 18, 2023
2 parents d9390df + 6e331d4 commit e7375e2
Show file tree
Hide file tree
Showing 72 changed files with 1,326 additions and 1,764 deletions.
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,34 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [UNRELEASED]

### Added

- Added feature to use custom python files as modules to be used in the electron function

### Changed

- SDK no longer uploads empty assets when submitting a dispatch.
- Results Manager avoids downloading assets with size 0.
- Local and Dask executor plugins now return accurate sizes of task
artifacts.
- Size (number of bytes) is now a required attribute whenever updating
asset metadata. Although the exact numerical value is not yet
important, whether the size is reported to be zero or positive does
have consequences.
- Pack deps, call_before, and call_after assets into one file.
- Changed handling of tuples and sets when building the transport graph - they will be converted to electron lists as well for now
- `qelectron_db`, `qelectron_data_exists`, `python_version`, and `covalent_version`
are now optional in the pydantic model definitions.

### Fixed

- Reduced number of assets to upload when submitting a dispatch.

### Operations

- Allow `cloudpickle` >= 3.0.0
- Remove `boto3` dependency from `tests/requirements.txt`

## [0.232.0-rc.0] - 2023-12-01

### Authors
Expand Down
1 change: 1 addition & 0 deletions covalent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ._workflow import ( # nopycln: import
DepsBash,
DepsCall,
DepsModule,
DepsPip,
Lepton,
TransportableObject,
Expand Down
8 changes: 6 additions & 2 deletions covalent/_dispatcher_plugins/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,15 +593,19 @@ def upload_assets(manifest: ResultSchema):
def _upload(assets: List[AssetSchema]):
local_scheme_prefix = "file://"
total = len(assets)
number_uploaded = 0
for i, asset in enumerate(assets):
if not asset.remote_uri:
if not asset.remote_uri or not asset.uri:
app_log.debug(f"Skipping asset {i+1} out of {total}")
continue
if asset.remote_uri.startswith(local_scheme_prefix):
copy_file_locally(asset.uri, asset.remote_uri)
number_uploaded += 1
else:
_upload_asset(asset.uri, asset.remote_uri)
app_log.debug(f"uploaded {i+1} out of {total} assets.")
number_uploaded += 1
app_log.debug(f"Uploaded asset {i+1} out of {total}.")
app_log.debug(f"uploaded {number_uploaded} assets.")


def _upload_asset(local_uri, remote_uri):
Expand Down
4 changes: 2 additions & 2 deletions covalent/_results_manager/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ def __init__(self, lattice: Lattice, dispatch_id: str = "") -> None:
self._task_failed = False
self._task_cancelled = False

self._result = TransportableObject(None)
self._result = None

self._num_nodes = -1

self._error = ""
self._error = None

def __str__(self):
"""String representation of the result object"""
Expand Down
35 changes: 20 additions & 15 deletions covalent/_results_manager/results_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,15 @@
SDK_NODE_META_KEYS = {
"executor",
"executor_data",
"deps",
"call_before",
"call_after",
"hooks",
}

SDK_LAT_META_KEYS = {
"executor",
"executor_data",
"workflow_executor",
"workflow_executor_data",
"deps",
"call_before",
"call_after",
"hooks",
}

DEFERRED_KEYS = {
Expand Down Expand Up @@ -248,26 +244,35 @@ def download_asset(remote_uri: str, local_path: str, chunk_size: int = 1024 * 10

def _download_result_asset(manifest: dict, results_dir: str, key: str):
remote_uri = manifest["assets"][key]["remote_uri"]
local_path = get_result_asset_path(results_dir, key)
download_asset(remote_uri, local_path)
manifest["assets"][key]["uri"] = f"file://{local_path}"
size = manifest["assets"][key]["size"]

if size > 0:
local_path = get_result_asset_path(results_dir, key)
download_asset(remote_uri, local_path)
manifest["assets"][key]["uri"] = f"file://{local_path}"


def _download_lattice_asset(manifest: dict, results_dir: str, key: str):
lattice_assets = manifest["lattice"]["assets"]
remote_uri = lattice_assets[key]["remote_uri"]
local_path = get_lattice_asset_path(results_dir, key)
download_asset(remote_uri, local_path)
lattice_assets[key]["uri"] = f"file://{local_path}"
size = lattice_assets[key]["size"]

if size > 0:
local_path = get_lattice_asset_path(results_dir, key)
download_asset(remote_uri, local_path)
lattice_assets[key]["uri"] = f"file://{local_path}"


def _download_node_asset(manifest: dict, results_dir: str, node_id: int, key: str):
node = manifest["lattice"]["transport_graph"]["nodes"][node_id]
node_assets = node["assets"]
remote_uri = node_assets[key]["remote_uri"]
local_path = get_node_asset_path(results_dir, node_id, key)
download_asset(remote_uri, local_path)
node_assets[key]["uri"] = f"file://{local_path}"
size = node_assets[key]["size"]

if size > 0:
local_path = get_node_asset_path(results_dir, node_id, key)
download_asset(remote_uri, local_path)
node_assets[key]["uri"] = f"file://{local_path}"


def _load_result_asset(manifest: dict, key: str):
Expand Down
3 changes: 3 additions & 0 deletions covalent/_serialize/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str

scheme = "file"

if data is None:
return AssetSchema(size=0)

serialized = serialize_asset(data, data_type)
digest = _sha1_asset(serialized)
path = Path(storage_path) / filename
Expand Down
56 changes: 15 additions & 41 deletions covalent/_serialize/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
ElectronSchema,
)
from .._shared_files.util_classes import RESULT_STATUS, Status
from .._workflow.transportable_object import TransportableObject
from .common import AssetType, load_asset, save_asset

__all__ = [
Expand All @@ -40,9 +39,7 @@
"function_string": AssetType.TEXT,
"value": AssetType.TRANSPORTABLE,
"output": AssetType.TRANSPORTABLE,
"deps": AssetType.JSONABLE,
"call_before": AssetType.JSONABLE,
"call_after": AssetType.JSONABLE,
"hooks": AssetType.JSONABLE,
"qelectron_db": AssetType.BYTES,
"stdout": AssetType.TEXT,
"stderr": AssetType.TEXT,
Expand Down Expand Up @@ -105,83 +102,66 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron
ASSET_FILENAME_MAP["function"],
)

function_string = node_attrs.get("function_string", "")
function_string = node_attrs.get("function_string", None)
function_string_asset = save_asset(
function_string,
ASSET_TYPES["function_string"],
node_storage_path,
ASSET_FILENAME_MAP["function_string"],
)

node_value = node_attrs.get("value", TransportableObject(None))
node_value = node_attrs.get("value", None)
value_asset = save_asset(
node_value,
ASSET_TYPES["value"],
node_storage_path,
ASSET_FILENAME_MAP["value"],
)

node_output = node_attrs.get("output", TransportableObject(None))
node_output = node_attrs.get("output", None)
output_asset = save_asset(
node_output,
ASSET_TYPES["output"],
node_storage_path,
ASSET_FILENAME_MAP["output"],
)

node_stdout = node_attrs.get("stdout", "")
node_stdout = node_attrs.get("stdout", None)
stdout_asset = save_asset(
node_stdout,
ASSET_TYPES["stdout"],
node_storage_path,
ASSET_FILENAME_MAP["stdout"],
)

node_stderr = node_attrs.get("stderr", "")
node_stderr = node_attrs.get("stderr", None)
stderr_asset = save_asset(
node_stderr,
ASSET_TYPES["stderr"],
node_storage_path,
ASSET_FILENAME_MAP["stderr"],
)

qelectron_db = node_attrs.get("qelectron_db", bytes())
qelectron_db = node_attrs.get("qelectron_db", None)
qelectron_db_asset = save_asset(
qelectron_db,
ASSET_TYPES["qelectron_db"],
node_storage_path,
ASSET_FILENAME_MAP["qelectron_db"],
)

node_error = node_attrs.get("error", "")
node_error = node_attrs.get("error", None)
error_asset = save_asset(
node_error,
ASSET_TYPES["error"],
node_storage_path,
ASSET_FILENAME_MAP["error"],
)

deps = node_attrs["metadata"]["deps"]
deps_asset = save_asset(
deps, ASSET_TYPES["deps"], node_storage_path, ASSET_FILENAME_MAP["deps"]
hooks = node_attrs["metadata"]["hooks"]
hooks_asset = save_asset(
hooks, ASSET_TYPES["hooks"], node_storage_path, ASSET_FILENAME_MAP["hooks"]
)

call_before = node_attrs["metadata"]["call_before"]
call_before_asset = save_asset(
call_before,
ASSET_TYPES["call_before"],
node_storage_path,
ASSET_FILENAME_MAP["call_before"],
)

call_after = node_attrs["metadata"]["call_after"]
call_after_asset = save_asset(
call_after,
ASSET_TYPES["call_after"],
node_storage_path,
ASSET_FILENAME_MAP["call_after"],
)

return ElectronAssets(
function=function_asset,
function_string=function_string_asset,
Expand All @@ -191,9 +171,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron
stderr=stderr_asset,
qelectron_db=qelectron_db_asset,
error=error_asset,
deps=deps_asset,
call_before=call_before_asset,
call_after=call_after_asset,
hooks=hooks_asset,
)


Expand All @@ -207,9 +185,7 @@ def _deserialize_node_assets(ea: ElectronAssets) -> dict:
qelectron_db = load_asset(ea.qelectron_db, ASSET_TYPES["qelectron_db"])
error = load_asset(ea.error, ASSET_TYPES["error"])

deps = load_asset(ea.deps, ASSET_TYPES["deps"])
call_before = load_asset(ea.call_before, ASSET_TYPES["call_before"])
call_after = load_asset(ea.call_after, ASSET_TYPES["call_after"])
hooks = load_asset(ea.hooks, ASSET_TYPES["hooks"])

return {
"function": function,
Expand All @@ -221,16 +197,14 @@ def _deserialize_node_assets(ea: ElectronAssets) -> dict:
"qelectron_db": qelectron_db,
"error": error,
"metadata": {
"deps": deps,
"call_before": call_before,
"call_after": call_after,
"hooks": hooks,
},
}


def _get_node_custom_assets(node_attrs: dict) -> Dict[str, AssetSchema]:
if "custom_asset_keys" in node_attrs["metadata"]:
return {key: AssetSchema() for key in node_attrs["metadata"]["custom_asset_keys"]}
return {key: AssetSchema(size=0) for key in node_attrs["metadata"]["custom_asset_keys"]}


def serialize_node(node_id: int, node_attrs: dict, node_storage_path) -> ElectronSchema:
Expand Down
40 changes: 10 additions & 30 deletions covalent/_serialize/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
"named_kwargs": AssetType.TRANSPORTABLE,
"cova_imports": AssetType.JSONABLE,
"lattice_imports": AssetType.TEXT,
"deps": AssetType.JSONABLE,
"call_before": AssetType.JSONABLE,
"call_after": AssetType.JSONABLE,
"hooks": AssetType.JSONABLE,
}


Expand Down Expand Up @@ -102,7 +100,7 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets:
ASSET_FILENAME_MAP["workflow_function_string"],
)

docstring = "" if lat.__doc__ is None else lat.__doc__
docstring = lat.__doc__
docstring_asset = save_asset(
docstring,
ASSET_TYPES["doc"],
Expand Down Expand Up @@ -141,23 +139,11 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets:
)

# NOTE: these are actually JSONable
deps_asset = save_asset(
lat.metadata["deps"],
ASSET_TYPES["deps"],
hooks_asset = save_asset(
lat.metadata["hooks"],
ASSET_TYPES["hooks"],
storage_path,
ASSET_FILENAME_MAP["deps"],
)
call_before_asset = save_asset(
lat.metadata["call_before"],
ASSET_TYPES["call_before"],
storage_path,
ASSET_FILENAME_MAP["call_before"],
)
call_after_asset = save_asset(
lat.metadata["call_after"],
ASSET_TYPES["call_after"],
storage_path,
ASSET_FILENAME_MAP["call_after"],
ASSET_FILENAME_MAP["hooks"],
)

return LatticeAssets(
Expand All @@ -169,9 +155,7 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets:
named_kwargs=named_kwargs_asset,
cova_imports=cova_imports_asset,
lattice_imports=lattice_imports_asset,
deps=deps_asset,
call_before=call_before_asset,
call_after=call_after_asset,
hooks=hooks_asset,
)


Expand All @@ -186,9 +170,7 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict:
named_kwargs = load_asset(assets.named_kwargs, ASSET_TYPES["named_kwargs"])
cova_imports = load_asset(assets.cova_imports, ASSET_TYPES["cova_imports"])
lattice_imports = load_asset(assets.lattice_imports, ASSET_TYPES["lattice_imports"])
deps = load_asset(assets.deps, ASSET_TYPES["deps"])
call_before = load_asset(assets.call_before, ASSET_TYPES["call_before"])
call_after = load_asset(assets.call_after, ASSET_TYPES["call_after"])
hooks = load_asset(assets.hooks, ASSET_TYPES["hooks"])
return {
"workflow_function": workflow_function,
"workflow_function_string": workflow_function_string,
Expand All @@ -199,16 +181,14 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict:
"cova_imports": cova_imports,
"lattice_imports": lattice_imports,
"metadata": {
"deps": deps,
"call_before": call_before,
"call_after": call_after,
"hooks": hooks,
},
}


def _get_lattice_custom_assets(lat: Lattice) -> Dict[str, AssetSchema]:
if "custom_asset_keys" in lat.metadata:
return {key: AssetSchema() for key in lat.metadata["custom_asset_keys"]}
return {key: AssetSchema(size=0) for key in lat.metadata["custom_asset_keys"]}


def serialize_lattice(lat, storage_path: str) -> LatticeSchema:
Expand Down
Loading

0 comments on commit e7375e2

Please sign in to comment.