Skip to content

Commit

Permalink
addressed most comments, still some left
Browse files Browse the repository at this point in the history
  • Loading branch information
kessler-frost committed Oct 4, 2023
1 parent 35076e8 commit b1ceb26
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 45 deletions.
11 changes: 11 additions & 0 deletions covalent/_file_transfer/strategies/shutil_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ def __init__(

# return callable to copy files in the local file system
def cp(self, from_file: File, to_file: File = File()) -> None:
"""
Get a callable that copies a file from one location to another locally
Args:
from_file: File to copy from
to_file: File to copy to. Defaults to File().
Returns:
A callable that copies a file from one location to another locally
"""

def callable():
shutil.copyfile(from_file.filepath, to_file.filepath)

Expand Down
8 changes: 5 additions & 3 deletions covalent/_results_manager/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class Result:
"""

NEW_OBJ = RESULT_STATUS.NEW_OBJECT
PENDING_REUSE = RESULT_STATUS.PENDING_REUSE
PENDING_REUSE = (
RESULT_STATUS.PENDING_REUSE
) # Facilitates reuse of previous electrons in the new dispatcher design
COMPLETED = RESULT_STATUS.COMPLETED
POSTPROCESSING = RESULT_STATUS.POSTPROCESSING
PENDING_POSTPROCESSING = RESULT_STATUS.PENDING_POSTPROCESSING
Expand Down Expand Up @@ -106,8 +108,8 @@ def __str__(self):
pattern = re.compile(regex)
m = pattern.match(input_string)
if m:
arg_str_repr = m.group(1).rstrip(",")
kwarg_str_repr = m.group(2)
arg_str_repr = m[1].rstrip(",")
kwarg_str_repr = m[2]
else:
arg_str_repr = str(None)
kwarg_str_repr = str(None)

Check warning on line 115 in covalent/_results_manager/result.py

View check run for this annotation

Codecov / codecov/patch

covalent/_results_manager/result.py#L114-L115

Added lines #L114 - L115 were not covered by tests
Expand Down
86 changes: 79 additions & 7 deletions covalent/_serialize/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

""" Serialization/Deserialization methods for Assets """

import hashlib
import json
Expand All @@ -26,17 +27,41 @@
from .._shared_files.schemas.asset import AssetSchema
from .._workflow.transportable_object import TransportableObject

__all__ = [
"AssetType",
"save_asset",
"load_asset",
]


CHECKSUM_ALGORITHM = "sha"


class AssetType(Enum):
OBJECT = 0
TRANSPORTABLE = 1
"""
Enum for the type of Asset data
"""

OBJECT = 0 # Fallback to cloudpickling
TRANSPORTABLE = 1 # Custom TO serialization
JSONABLE = 2
TEXT = 3
TEXT = 3 # Mainly for stdout, stderr, docstrings, etc.


def serialize_asset(data: Any, data_type: AssetType) -> bytes:
"""
Serialize the asset data
Args:
data: Data to serialize
data_type: Type of the Asset data to serialize
Returns:
Serialized data as bytes
"""

if data_type == AssetType.OBJECT:
return cloudpickle.dumps(data)
elif data_type == AssetType.TRANSPORTABLE:
Expand All @@ -50,6 +75,18 @@ def serialize_asset(data: Any, data_type: AssetType) -> bytes:


def deserialize_asset(data: bytes, data_type: AssetType) -> Any:
"""
Deserialize the asset data
Args:
data: Data to deserialize
data_type: Type of the Asset data to deserialize
Returns:
Deserialized data
"""

if data_type == AssetType.OBJECT:
return cloudpickle.loads(data)
elif data_type == AssetType.TRANSPORTABLE:
Expand All @@ -63,10 +100,35 @@ def deserialize_asset(data: bytes, data_type: AssetType) -> Any:


def _sha1_asset(data: bytes) -> str:
"""
Compute the sha1 checksum of the asset data
Args:
data: Data to compute checksum for
Returns:
sha1 checksum of the data
"""

return hashlib.sha1(data).hexdigest()


def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str) -> AssetSchema:
"""
Save the asset data to the storage path
Args:
data: Data to save
data_type: Type of the Asset data to save
storage_path: Path to save the data to
filename: Name of the file to save the data to
Returns:
AssetSchema object containing metadata about the saved data
"""

scheme = "file"

serialized = serialize_asset(data, data_type)
Expand All @@ -80,16 +142,26 @@ def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str


def load_asset(asset_meta: AssetSchema, data_type: AssetType) -> Any:
"""
Load the asset data from the storage path
Args:
asset_meta: Metadata about the asset to load
data_type: Type of the Asset data to load
Returns:
Asset data
"""

scheme_prefix = "file://"
uri = asset_meta.uri

if not uri:
return None

Check warning on line 161 in covalent/_serialize/common.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/common.py#L161

Added line #L161 was not covered by tests

if uri.startswith(scheme_prefix):
path = uri[len(scheme_prefix) :]
else:
path = uri
path = uri[len(scheme_prefix) :] if uri.startswith(scheme_prefix) else uri

with open(path, "rb") as f:
data = f.read()
return deserialize_asset(data, data_type)
10 changes: 8 additions & 2 deletions covalent/_serialize/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
from .._workflow.transportable_object import TransportableObject
from .common import AssetType, load_asset, save_asset

__all__ = [
"serialize_node",
"deserialize_node",
]


ASSET_TYPES = {
"function": AssetType.TRANSPORTABLE,
"function_string": AssetType.TEXT,
Expand All @@ -52,11 +58,11 @@ def _serialize_node_metadata(node_attrs: dict, node_storage_path: str) -> Electr
# Optional
status = node_attrs.get("status", RESULT_STATUS.NEW_OBJECT)

start_time = node_attrs.get("start_time", None)
start_time = node_attrs.get("start_time")
if start_time:
start_time = start_time.isoformat()

Check warning on line 63 in covalent/_serialize/electron.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/electron.py#L63

Added line #L63 was not covered by tests

end_time = node_attrs.get("end_time", None)
end_time = node_attrs.get("end_time")
if end_time:
end_time = end_time.isoformat()

Check warning on line 67 in covalent/_serialize/electron.py

View check run for this annotation

Codecov / codecov/patch

covalent/_serialize/electron.py#L67

Added line #L67 was not covered by tests

Expand Down
6 changes: 6 additions & 0 deletions covalent/_serialize/lattice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
from .common import AssetType, load_asset, save_asset
from .transport_graph import deserialize_transport_graph, serialize_transport_graph

__all__ = [
"serialize_lattice",
"deserialize_lattice",
]


ASSET_TYPES = {
"workflow_function": AssetType.TRANSPORTABLE,
"workflow_function_string": AssetType.TEXT,
Expand Down
26 changes: 16 additions & 10 deletions covalent/_serialize/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@
from .common import AssetType, load_asset, save_asset
from .lattice import deserialize_lattice, serialize_lattice

__all__ = [
"serialize_result",
"deserialize_result",
"strip_local_uris",
"merge_response_manifest",
"extract_assets",
]


ASSET_TYPES = {
"error": AssetType.TEXT,
"result": AssetType.TRANSPORTABLE,
Expand Down Expand Up @@ -136,6 +145,7 @@ def merge_response_manifest(manifest: ResultSchema, response: ResultSchema) -> R
response: The manifest returned from `/register`.
Returns:
A combined manifest with asset `remote_uri`s populated.
"""

manifest.metadata.dispatch_id = response.metadata.dispatch_id
Expand Down Expand Up @@ -170,32 +180,28 @@ def merge_response_manifest(manifest: ResultSchema, response: ResultSchema) -> R


def extract_assets(manifest: ResultSchema) -> List[AssetSchema]:
"""Extract all of the asset metadata from a manifest dictionary.
"""
Extract all of the asset metadata from a manifest dictionary.
Args:
manifest: A result manifest
Returns:
A list of assets
"""
assets = []
"""

# workflow-level assets
dispatch_assets = manifest.assets
for key, asset in dispatch_assets:
assets.append(asset)

assets = [asset for key, asset in dispatch_assets]
lattice = manifest.lattice
lattice_assets = lattice.assets
for key, asset in lattice_assets:
assets.append(asset)
assets.extend(asset for key, asset in lattice_assets)

# Node assets
tg = lattice.transport_graph
nodes = tg.nodes
for node in nodes:
node_assets = node.assets
for key, asset in node_assets:
assets.append(asset)
assets.extend(asset for key, asset in node_assets)
return assets
Loading

0 comments on commit b1ceb26

Please sign in to comment.