Skip to content

Commit

Permalink
synced with develop
Browse files Browse the repository at this point in the history
  • Loading branch information
kessler-frost committed Oct 16, 2023
2 parents 1008312 + 314e859 commit d8c427e
Show file tree
Hide file tree
Showing 81 changed files with 1,193 additions and 870 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,6 @@ node_modules/
!.yarn/releases
!.yarn/sdks
!.yarn/versions

# Ignore mock database
**/*.sqlite
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- [Significant Changes] Improving memory management part 1/3
- Removed strict version pins on `lmdbm`, `mpire`, `orjson`, and `pennylane`
- Changed license to Apache
- Migrated core server-side code to new data access layer.
Expand Down Expand Up @@ -149,10 +150,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fix for double locking file in configurations.
- Introduced new data access layer
- Introduced Shutil file transfer strategy for local file transfers
<<<<<<< HEAD

### Fixed

- Reduced server memory consumption during workflow processing
=======
>>>>>>> develop
### Docs

Expand Down
11 changes: 11 additions & 0 deletions covalent/_file_transfer/strategies/shutil_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ def __init__(

# return callable to copy files in the local file system
def cp(self, from_file: File, to_file: File = File()) -> None:
"""
Get a callable that copies a file from one location to another locally
Args:
from_file: File to copy from
to_file: File to copy to. Defaults to File().
Returns:
A callable that copies a file from one location to another locally
"""

def callable():
shutil.copyfile(from_file.filepath, to_file.filepath)

Expand Down
13 changes: 9 additions & 4 deletions covalent/_results_manager/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class Result:
"""

NEW_OBJ = RESULT_STATUS.NEW_OBJECT
PENDING_REUSE = RESULT_STATUS.PENDING_REUSE
PENDING_REUSE = (
RESULT_STATUS.PENDING_REUSE
) # Facilitates reuse of previous electrons in the new dispatcher design
COMPLETED = RESULT_STATUS.COMPLETED
POSTPROCESSING = RESULT_STATUS.POSTPROCESSING
PENDING_POSTPROCESSING = RESULT_STATUS.PENDING_POSTPROCESSING
Expand Down Expand Up @@ -209,7 +211,10 @@ def result(self) -> Union[int, float, list, dict]:
Final result of current dispatch.
"""

return self._result.get_deserialized() if self._result is not None else None
if self._result is not None:
return self._result.get_deserialized()
else:
return None

@property
def inputs(self) -> dict:
Expand Down Expand Up @@ -438,7 +443,7 @@ def _update_node(
sublattice_result: "Result" = None,
stdout: str = None,
stderr: str = None,
qelectron_data_exists: bool = False,
qelectron_data_exists: bool = None,
) -> None:
"""
Update the node result in the transport graph.
Expand Down Expand Up @@ -497,7 +502,7 @@ def _update_node(
if stderr is not None:
self.lattice.transport_graph.set_node_value(node_id, "stderr", stderr)

if qelectron_data_exists:
if qelectron_data_exists is not None:
self.lattice.transport_graph.set_node_value(
node_id, "qelectron_data_exists", qelectron_data_exists
)
Expand Down
86 changes: 79 additions & 7 deletions covalent/_serialize/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

""" Serialization/Deserialization methods for Assets """

import hashlib
import json
Expand All @@ -26,17 +27,41 @@
from .._shared_files.schemas.asset import AssetSchema
from .._workflow.transportable_object import TransportableObject

__all__ = [
"AssetType",
"save_asset",
"load_asset",
]


CHECKSUM_ALGORITHM = "sha"


class AssetType(Enum):
OBJECT = 0
TRANSPORTABLE = 1
"""
Enum for the type of Asset data
"""

OBJECT = 0 # Fallback to cloudpickling
TRANSPORTABLE = 1 # Custom TO serialization
JSONABLE = 2
TEXT = 3
TEXT = 3 # Mainly for stdout, stderr, docstrings, etc.


def serialize_asset(data: Any, data_type: AssetType) -> bytes:
"""
Serialize the asset data
Args:
data: Data to serialize
data_type: Type of the Asset data to serialize
Returns:
Serialized data as bytes
"""

if data_type == AssetType.OBJECT:
return cloudpickle.dumps(data)
elif data_type == AssetType.TRANSPORTABLE:
Expand All @@ -50,6 +75,18 @@ def serialize_asset(data: Any, data_type: AssetType) -> bytes:


def deserialize_asset(data: bytes, data_type: AssetType) -> Any:
"""
Deserialize the asset data
Args:
data: Data to deserialize
data_type: Type of the Asset data to deserialize
Returns:
Deserialized data
"""

if data_type == AssetType.OBJECT:
return cloudpickle.loads(data)
elif data_type == AssetType.TRANSPORTABLE:
Expand All @@ -63,10 +100,35 @@ def deserialize_asset(data: bytes, data_type: AssetType) -> Any:


def _sha1_asset(data: bytes) -> str:
"""
Compute the sha1 checksum of the asset data
Args:
data: Data to compute checksum for
Returns:
sha1 checksum of the data
"""

return hashlib.sha1(data).hexdigest()


def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str) -> AssetSchema:
"""
Save the asset data to the storage path
Args:
data: Data to save
data_type: Type of the Asset data to save
storage_path: Path to save the data to
filename: Name of the file to save the data to
Returns:
AssetSchema object containing metadata about the saved data
"""

scheme = "file"

serialized = serialize_asset(data, data_type)
Expand All @@ -80,16 +142,26 @@ def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str


def load_asset(asset_meta: AssetSchema, data_type: AssetType) -> Any:
"""
Load the asset data from the storage path
Args:
asset_meta: Metadata about the asset to load
data_type: Type of the Asset data to load
Returns:
Asset data
"""

scheme_prefix = "file://"
uri = asset_meta.uri

if not uri:
return None

if uri.startswith(scheme_prefix):
path = uri[len(scheme_prefix) :]
else:
path = uri
path = uri[len(scheme_prefix) :] if uri.startswith(scheme_prefix) else uri

with open(path, "rb") as f:
data = f.read()
return deserialize_asset(data, data_type)
13 changes: 11 additions & 2 deletions covalent/_serialize/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
from .._workflow.transportable_object import TransportableObject
from .common import AssetType, load_asset, save_asset

__all__ = [
"serialize_node",
"deserialize_node",
]


ASSET_TYPES = {
"function": AssetType.TRANSPORTABLE,
"function_string": AssetType.TEXT,
Expand All @@ -48,15 +54,16 @@ def _serialize_node_metadata(node_attrs: dict, node_storage_path: str) -> Electr
name = node_attrs["name"]
executor = node_attrs["metadata"]["executor"]
executor_data = node_attrs["metadata"]["executor_data"]
qelectron_data_exists = node_attrs["metadata"]["qelectron_data_exists"]

# Optional
status = node_attrs.get("status", RESULT_STATUS.NEW_OBJECT)

start_time = node_attrs.get("start_time", None)
start_time = node_attrs.get("start_time")
if start_time:
start_time = start_time.isoformat()

end_time = node_attrs.get("end_time", None)
end_time = node_attrs.get("end_time")
if end_time:
end_time = end_time.isoformat()

Expand All @@ -65,6 +72,7 @@ def _serialize_node_metadata(node_attrs: dict, node_storage_path: str) -> Electr
name=name,
executor=executor,
executor_data=executor_data,
qelectron_data_exists=qelectron_data_exists,
status=str(status),
start_time=start_time,
end_time=end_time,
Expand All @@ -82,6 +90,7 @@ def _deserialize_node_metadata(meta: ElectronMetadata) -> dict:
"metadata": {
"executor": meta.executor,
"executor_data": meta.executor_data,
"qelectron_data_exists": meta.qelectron_data_exists,
},
}

Expand Down
6 changes: 6 additions & 0 deletions covalent/_serialize/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
from .common import AssetType, load_asset, save_asset
from .transport_graph import deserialize_transport_graph, serialize_transport_graph

__all__ = [
"serialize_lattice",
"deserialize_lattice",
]


ASSET_TYPES = {
"workflow_function": AssetType.TRANSPORTABLE,
"workflow_function_string": AssetType.TEXT,
Expand Down
25 changes: 15 additions & 10 deletions covalent/_serialize/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
from .common import AssetType, load_asset, save_asset
from .lattice import deserialize_lattice, serialize_lattice

__all__ = [
"serialize_result",
"deserialize_result",
"strip_local_uris",
"merge_response_manifest",
"extract_assets",
]


ASSET_TYPES = {
"error": AssetType.TEXT,
"result": AssetType.TRANSPORTABLE,
Expand Down Expand Up @@ -170,32 +179,28 @@ def merge_response_manifest(manifest: ResultSchema, response: ResultSchema) -> R


def extract_assets(manifest: ResultSchema) -> List[AssetSchema]:
"""Extract all of the asset metadata from a manifest dictionary.
"""
Extract all of the asset metadata from a manifest dictionary.
Args:
manifest: A result manifest
Returns:
A list of assets
"""
assets = []
"""

# workflow-level assets
dispatch_assets = manifest.assets
for key, asset in dispatch_assets:
assets.append(asset)

assets = [asset for key, asset in dispatch_assets]
lattice = manifest.lattice
lattice_assets = lattice.assets
for key, asset in lattice_assets:
assets.append(asset)
assets.extend(asset for key, asset in lattice_assets)

# Node assets
tg = lattice.transport_graph
nodes = tg.nodes
for node in nodes:
node_assets = node.assets
for key, asset in node_assets:
assets.append(asset)
assets.extend(asset for key, asset in node_assets)
return assets
Loading

0 comments on commit d8c427e

Please sign in to comment.