From 262c075a7e10cb292d7abb912331b014fa27bbfa Mon Sep 17 00:00:00 2001 From: Casey Jao Date: Wed, 6 Dec 2023 14:13:53 -0500 Subject: [PATCH] Reduce the number of assets to upload during dispatch Initialize unset assets to `None` and don' upload them. When downloading assets during `get_result()`, ignore assets with zero size. Also require asset sizes when retrieving asset updates in `Executor.receive()`. --- CHANGELOG.md | 11 +++ covalent/_dispatcher_plugins/local.py | 8 +- covalent/_results_manager/result.py | 4 +- covalent/_results_manager/results_manager.py | 27 ++++--- covalent/_serialize/common.py | 3 + covalent/_serialize/electron.py | 17 ++--- covalent/_serialize/lattice.py | 4 +- covalent/_shared_files/schemas/asset.py | 6 +- covalent/executor/executor_plugins/dask.py | 20 ++++- covalent/executor/executor_plugins/local.py | 17 +---- covalent/executor/utils/wrappers.py | 75 +++++++++++++++---- covalent_dispatcher/_dal/asset.py | 21 ++++-- covalent_dispatcher/_dal/base.py | 13 +++- covalent_dispatcher/_db/upsert.py | 6 +- covalent_dispatcher/_object_store/base.py | 3 + covalent_dispatcher/_object_store/local.py | 10 ++- covalent_dispatcher/_service/assets.py | 5 +- .../_core/runner_ng_test.py | 3 + .../_dal/asset_test.py | 12 +-- .../_dal/importers/result_import_test.py | 4 +- .../_db/upsert_test.py | 2 +- .../_object_store/local_test.py | 4 +- .../dispatcher_plugins/local_test.py | 6 +- tests/functional_tests/local_executor_test.py | 2 +- tests/functional_tests/workflow_stack_test.py | 4 +- 25 files changed, 201 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index afd38c20f..d7ffb8aca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [UNRELEASED] +### Changed + +- SDK no longer uploads empty assets when submitting a dispatch. +- Results Manager avoids downloading assets with size 0. +- Local and Dask executor plugins now return accurate sizes of task + artifacts. +- Size (number of bytes) is now a required attribute whenever updating + asset metadata. Although the exact numerical value is not yet + important, whether the size is reported to be zero or positive does + have consequences. + ### Operations - Allow `cloudpickle` >= 3.0.0 diff --git a/covalent/_dispatcher_plugins/local.py b/covalent/_dispatcher_plugins/local.py index f0df0f8a0..8760cec96 100644 --- a/covalent/_dispatcher_plugins/local.py +++ b/covalent/_dispatcher_plugins/local.py @@ -593,15 +593,19 @@ def upload_assets(manifest: ResultSchema): def _upload(assets: List[AssetSchema]): local_scheme_prefix = "file://" total = len(assets) + number_uploaded = 0 for i, asset in enumerate(assets): - if not asset.remote_uri: + if not asset.remote_uri or not asset.uri: app_log.debug(f"Skipping asset {i+1} out of {total}") continue if asset.remote_uri.startswith(local_scheme_prefix): copy_file_locally(asset.uri, asset.remote_uri) + number_uploaded += 1 else: _upload_asset(asset.uri, asset.remote_uri) - app_log.debug(f"uploaded {i+1} out of {total} assets.") + number_uploaded += 1 + app_log.debug(f"Uploaded asset {i+1} out of {total}.") + app_log.debug(f"uploaded {number_uploaded} assets.") def _upload_asset(local_uri, remote_uri): diff --git a/covalent/_results_manager/result.py b/covalent/_results_manager/result.py index c9d42c2fd..a42f514a6 100644 --- a/covalent/_results_manager/result.py +++ b/covalent/_results_manager/result.py @@ -90,11 +90,11 @@ def __init__(self, lattice: Lattice, dispatch_id: str = "") -> None: self._task_failed = False self._task_cancelled = False - self._result = TransportableObject(None) + self._result = None self._num_nodes = -1 - self._error = "" + self._error = None def __str__(self): """String representation of the result object""" diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index 458d746d4..9372a43e0 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -248,26 +248,35 @@ def download_asset(remote_uri: str, local_path: str, chunk_size: int = 1024 * 10 def _download_result_asset(manifest: dict, results_dir: str, key: str): remote_uri = manifest["assets"][key]["remote_uri"] - local_path = get_result_asset_path(results_dir, key) - download_asset(remote_uri, local_path) - manifest["assets"][key]["uri"] = f"file://{local_path}" + size = manifest["assets"][key]["size"] + + if size > 0: + local_path = get_result_asset_path(results_dir, key) + download_asset(remote_uri, local_path) + manifest["assets"][key]["uri"] = f"file://{local_path}" def _download_lattice_asset(manifest: dict, results_dir: str, key: str): lattice_assets = manifest["lattice"]["assets"] remote_uri = lattice_assets[key]["remote_uri"] - local_path = get_lattice_asset_path(results_dir, key) - download_asset(remote_uri, local_path) - lattice_assets[key]["uri"] = f"file://{local_path}" + size = lattice_assets[key]["size"] + + if size > 0: + local_path = get_lattice_asset_path(results_dir, key) + download_asset(remote_uri, local_path) + lattice_assets[key]["uri"] = f"file://{local_path}" def _download_node_asset(manifest: dict, results_dir: str, node_id: int, key: str): node = manifest["lattice"]["transport_graph"]["nodes"][node_id] node_assets = node["assets"] remote_uri = node_assets[key]["remote_uri"] - local_path = get_node_asset_path(results_dir, node_id, key) - download_asset(remote_uri, local_path) - node_assets[key]["uri"] = f"file://{local_path}" + size = node_assets[key]["size"] + + if size > 0: + local_path = get_node_asset_path(results_dir, node_id, key) + download_asset(remote_uri, local_path) + node_assets[key]["uri"] = f"file://{local_path}" def _load_result_asset(manifest: dict, key: str): diff --git a/covalent/_serialize/common.py b/covalent/_serialize/common.py index c36f7d685..341e112f0 100644 --- a/covalent/_serialize/common.py +++ b/covalent/_serialize/common.py @@ -136,6 +136,9 @@ def save_asset(data: Any, data_type: AssetType, storage_path: str, filename: str scheme = "file" + if data is None: + return AssetSchema(size=0) + serialized = serialize_asset(data, data_type) digest = _sha1_asset(serialized) path = Path(storage_path) / filename diff --git a/covalent/_serialize/electron.py b/covalent/_serialize/electron.py index 3229875ae..03a3b1ae2 100644 --- a/covalent/_serialize/electron.py +++ b/covalent/_serialize/electron.py @@ -26,7 +26,6 @@ ElectronSchema, ) from .._shared_files.util_classes import RESULT_STATUS, Status -from .._workflow.transportable_object import TransportableObject from .common import AssetType, load_asset, save_asset __all__ = [ @@ -105,7 +104,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron ASSET_FILENAME_MAP["function"], ) - function_string = node_attrs.get("function_string", "") + function_string = node_attrs.get("function_string", None) function_string_asset = save_asset( function_string, ASSET_TYPES["function_string"], @@ -113,7 +112,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron ASSET_FILENAME_MAP["function_string"], ) - node_value = node_attrs.get("value", TransportableObject(None)) + node_value = node_attrs.get("value", None) value_asset = save_asset( node_value, ASSET_TYPES["value"], @@ -121,7 +120,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron ASSET_FILENAME_MAP["value"], ) - node_output = node_attrs.get("output", TransportableObject(None)) + node_output = node_attrs.get("output", None) output_asset = save_asset( node_output, ASSET_TYPES["output"], @@ -129,7 +128,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron ASSET_FILENAME_MAP["output"], ) - node_stdout = node_attrs.get("stdout", "") + node_stdout = node_attrs.get("stdout", None) stdout_asset = save_asset( node_stdout, ASSET_TYPES["stdout"], @@ -137,7 +136,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron ASSET_FILENAME_MAP["stdout"], ) - node_stderr = node_attrs.get("stderr", "") + node_stderr = node_attrs.get("stderr", None) stderr_asset = save_asset( node_stderr, ASSET_TYPES["stderr"], @@ -145,7 +144,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron ASSET_FILENAME_MAP["stderr"], ) - qelectron_db = node_attrs.get("qelectron_db", bytes()) + qelectron_db = node_attrs.get("qelectron_db", None) qelectron_db_asset = save_asset( qelectron_db, ASSET_TYPES["qelectron_db"], @@ -153,7 +152,7 @@ def _serialize_node_assets(node_attrs: dict, node_storage_path: str) -> Electron ASSET_FILENAME_MAP["qelectron_db"], ) - node_error = node_attrs.get("error", "") + node_error = node_attrs.get("error", None) error_asset = save_asset( node_error, ASSET_TYPES["error"], @@ -230,7 +229,7 @@ def _deserialize_node_assets(ea: ElectronAssets) -> dict: def _get_node_custom_assets(node_attrs: dict) -> Dict[str, AssetSchema]: if "custom_asset_keys" in node_attrs["metadata"]: - return {key: AssetSchema() for key in node_attrs["metadata"]["custom_asset_keys"]} + return {key: AssetSchema(size=0) for key in node_attrs["metadata"]["custom_asset_keys"]} def serialize_node(node_id: int, node_attrs: dict, node_storage_path) -> ElectronSchema: diff --git a/covalent/_serialize/lattice.py b/covalent/_serialize/lattice.py index ebe405ac5..b188b8edf 100644 --- a/covalent/_serialize/lattice.py +++ b/covalent/_serialize/lattice.py @@ -102,7 +102,7 @@ def _serialize_lattice_assets(lat, storage_path: str) -> LatticeAssets: ASSET_FILENAME_MAP["workflow_function_string"], ) - docstring = "" if lat.__doc__ is None else lat.__doc__ + docstring = lat.__doc__ docstring_asset = save_asset( docstring, ASSET_TYPES["doc"], @@ -208,7 +208,7 @@ def _deserialize_lattice_assets(assets: LatticeAssets) -> dict: def _get_lattice_custom_assets(lat: Lattice) -> Dict[str, AssetSchema]: if "custom_asset_keys" in lat.metadata: - return {key: AssetSchema() for key in lat.metadata["custom_asset_keys"]} + return {key: AssetSchema(size=0) for key in lat.metadata["custom_asset_keys"]} def serialize_lattice(lat, storage_path: str) -> LatticeSchema: diff --git a/covalent/_shared_files/schemas/asset.py b/covalent/_shared_files/schemas/asset.py index 957560cc6..ff93d92bc 100644 --- a/covalent/_shared_files/schemas/asset.py +++ b/covalent/_shared_files/schemas/asset.py @@ -28,11 +28,13 @@ class AssetSchema(BaseModel): remote_uri: Optional[str] = None # Size of the asset in bytes - size: Optional[int] = 0 + size: int class AssetUpdate(BaseModel): remote_uri: Optional[str] = None - size: Optional[int] = None digest_alg: Optional[str] = None digest: Optional[str] = None + + # Size of the asset in bytes + size: int diff --git a/covalent/executor/executor_plugins/dask.py b/covalent/executor/executor_plugins/dask.py index 73b719b90..fa7c8c315 100644 --- a/covalent/executor/executor_plugins/dask.py +++ b/covalent/executor/executor_plugins/dask.py @@ -307,19 +307,27 @@ async def receive(self, task_group_metadata: Dict, data: Any) -> List[TaskUpdate if terminal_status == RESULT_STATUS.CANCELLED: output_uri = "" + output_size = 0 stdout_uri = "" + stdout_size = 0 stderr_uri = "" + stderr_size = 0 qelectron_db_uri = "" + qelectron_db_size = 0 else: result_path = os.path.join(self.cache_dir, f"result-{dispatch_id}:{task_id}.json") with open(result_path, "r") as f: result_summary = json.load(f) node_id = result_summary["node_id"] - output_uri = result_summary["output_uri"] - stdout_uri = result_summary["stdout_uri"] - stderr_uri = result_summary["stderr_uri"] - qelectron_db_uri = result_summary["qelectron_db_uri"] + output_uri = result_summary["output"]["uri"] + output_size = result_summary["output"]["size"] + stdout_uri = result_summary["stdout"]["uri"] + stdout_size = result_summary["stdout"]["size"] + stderr_uri = result_summary["stderr"]["uri"] + stderr_size = result_summary["stderr"]["size"] + qelectron_db_uri = result_summary["qelectron_db"]["uri"] + qelectron_db_size = result_summary["qelectron_db"]["size"] exception_raised = result_summary["exception_occurred"] terminal_status = ( @@ -333,15 +341,19 @@ async def receive(self, task_group_metadata: Dict, data: Any) -> List[TaskUpdate "assets": { "output": { "remote_uri": output_uri, + "size": output_size, }, "stdout": { "remote_uri": stdout_uri, + "size": stdout_size, }, "stderr": { "remote_uri": stderr_uri, + "size": stderr_size, }, "qelectron_db": { "remote_uri": qelectron_db_uri, + "size": qelectron_db_size, }, }, } diff --git a/covalent/executor/executor_plugins/local.py b/covalent/executor/executor_plugins/local.py index 5cf2659b9..38047d040 100644 --- a/covalent/executor/executor_plugins/local.py +++ b/covalent/executor/executor_plugins/local.py @@ -216,24 +216,13 @@ def _receive(self, task_group_metadata: Dict, data: Any) -> List[TaskUpdate]: received = ReceiveModel.model_validate(data) terminal_status = Status(received.status.value) + # Don't update any asset metadata since all assets will be + # pushed from the executor task_result = { "dispatch_id": dispatch_id, "node_id": task_id, "status": terminal_status, - "assets": { - "output": { - "remote_uri": "", - }, - "stdout": { - "remote_uri": "", - }, - "stderr": { - "remote_uri": "", - }, - "qelectron_db": { - "remote_uri": "", - }, - }, + "assets": {}, } task_results.append(TaskUpdate(**task_result)) diff --git a/covalent/executor/utils/wrappers.py b/covalent/executor/utils/wrappers.py index 15f406b51..90dec9e64 100644 --- a/covalent/executor/utils/wrappers.py +++ b/covalent/executor/utils/wrappers.py @@ -469,12 +469,30 @@ def run_task_from_uris_alt( resources["inputs"][task_id] = result_uri + output_size = len(ser_output) + qelectron_db_size = len(qelectron_db_bytes) + stdout.flush() + stderr.flush() + stdout_size = os.path.getsize(stdout_uri) + stderr_size = os.path.getsize(stderr_uri) result_summary = { "node_id": task_id, - "output_uri": result_uri, - "stdout_uri": stdout_uri, - "stderr_uri": stderr_uri, - "qelectron_db_uri": qelectron_db_uri, + "output": { + "uri": result_uri, + "size": output_size, + }, + "stdout": { + "uri": stdout_uri, + "size": stdout_size, + }, + "stderr": { + "uri": stderr_uri, + "size": stderr_size, + }, + "qelectron_db": { + "uri": qelectron_db_uri, + "size": qelectron_db_size, + }, "exception_occurred": exception_occurred, } @@ -482,13 +500,30 @@ def run_task_from_uris_alt( exception_occurred = True tb = "".join(traceback.TracebackException.from_exception(ex).format()) print(tb, file=sys.stderr) - result_uri = None + result_uri = "" + stdout.flush() + stderr.flush() + stdout_size = os.path.getsize(stdout_uri) + stderr_size = os.path.getsize(stderr_uri) + qelectron_db_size = len(qelectron_db_bytes) result_summary = { "node_id": task_id, - "output_uri": result_uri, - "stdout_uri": stdout_uri, - "stderr_uri": stderr_uri, - "qelectron_db_uri": qelectron_db_uri, + "output": { + "uri": result_uri, + "size": 0, + }, + "stdout": { + "uri": stdout_uri, + "size": stdout_size, + }, + "stderr": { + "uri": stderr_uri, + "size": stderr_size, + }, + "qelectron_db": { + "uri": qelectron_db_uri, + "size": qelectron_db_size, + }, "exception_occurred": exception_occurred, } @@ -508,11 +543,23 @@ def run_task_from_uris_alt( if n < len(task_ids): for i in range(n, len(task_ids)): result_summary = { - "node_id": task_ids[i], - "output_uri": "", - "stdout_uri": "", - "stderr_uri": "", - "qelectron_db_uri": "", + "node_id": task_id, + "output": { + "uri": "", + "size": 0, + }, + "stdout": { + "uri": "", + "size": 0, + }, + "stderr": { + "uri": "", + "size": 0, + }, + "qelectron_db": { + "uri": "", + "size": 0, + }, "exception_occurred": True, } diff --git a/covalent_dispatcher/_dal/asset.py b/covalent_dispatcher/_dal/asset.py index 8eda3a740..d524dc389 100644 --- a/covalent_dispatcher/_dal/asset.py +++ b/covalent_dispatcher/_dal/asset.py @@ -109,8 +109,16 @@ def size(self) -> int: def set_remote(self, session: Session, uri: str): self.update(session, values={"remote_uri": uri}) - def store_data(self, data: Any) -> None: - self.object_store.store_file(self.storage_path, self.object_key, data) + def store_data(self, data: Any, session: Session) -> None: + digest, size = self.object_store.store_file(self.storage_path, self.object_key, data) + self.update( + session, + values={ + "digest_alg": digest.algorithm, + "digest": digest.hexdigest, + "size": size, + }, + ) def load_data(self) -> Any: return self.object_store.load_file(self.storage_path, self.object_key) @@ -146,9 +154,12 @@ def copy_asset(src: Asset, dest: Asset): dest The destination asset """ - scheme = dest.storage_type.value - dest_uri = scheme + "://" + os.path.join(dest.storage_path, dest.object_key) - src.upload(dest_uri) + if src.size > 0: + scheme = dest.storage_type.value + dest_uri = scheme + "://" + os.path.join(dest.storage_path, dest.object_key) + src.upload(dest_uri) + else: + app_log.debug(f"Refusing to copy zero-sized asset {src.internal_uri}") def copy_asset_meta(session: Session, src: Asset, dest: Asset): diff --git a/covalent_dispatcher/_dal/base.py b/covalent_dispatcher/_dal/base.py index 1ecb84fcd..ad0b04d78 100644 --- a/covalent_dispatcher/_dal/base.py +++ b/covalent_dispatcher/_dal/base.py @@ -126,8 +126,15 @@ def incr_metadata(self, key: str, delta: int, session: Session): attr = type(self).meta_record_map(key) self.metadata.incr(session, increments={attr: delta}) - def get_asset(self, key: str, session: Session) -> Asset: - if key not in self.assets: + def get_asset(self, key: str, session: Session, refresh: bool = True) -> Asset: + if key in self.assets: + if refresh: + if session: + self.assets[key].refresh(session, fields=FIELDS) + else: + with self.session() as session: + self.assets[key].refresh(session, fields=FIELDS) + else: if session: asset_id = self.get_asset_ids(session, [key])[key] self.assets[key] = Asset.from_id(asset_id, session) @@ -176,7 +183,7 @@ def _set_value(self, key: str, val: Any, session: Session) -> None: if key in type(self).metadata_keys: self.set_metadata(key, val, session) else: - self.get_asset(key, session).store_data(val) + self.get_asset(key, session).store_data(val, session) def set_value(self, key: str, val: Any, session: Session = None) -> None: if session is not None: diff --git a/covalent_dispatcher/_db/upsert.py b/covalent_dispatcher/_db/upsert.py index 64bcf2375..cc2e76309 100644 --- a/covalent_dispatcher/_db/upsert.py +++ b/covalent_dispatcher/_db/upsert.py @@ -121,13 +121,14 @@ def _lattice_data(session: Session, result: Result, electron_id: int = None) -> ("cova_imports", LATTICE_COVA_IMPORTS_FILENAME, result.lattice.cova_imports), ("lattice_imports", LATTICE_LATTICE_IMPORTS_FILENAME, result.lattice.lattice_imports), ]: - digest = local_store.store_file(data_storage_path, filename, data) + digest, size = local_store.store_file(data_storage_path, filename, data) asset_record_kwargs = { "storage_type": LATTICE_STORAGE_TYPE, "storage_path": str(data_storage_path), "object_key": filename, "digest_alg": digest.algorithm, "digest": digest.hexdigest, + "size": size, } assets[key] = Asset.create(session, insert_kwargs=asset_record_kwargs, flush=True) @@ -310,13 +311,14 @@ def _electron_data( ("error", ELECTRON_ERROR_FILENAME, node_error), ("output", ELECTRON_RESULTS_FILENAME, node_output), ]: - digest = local_store.store_file(node_path, filename, data) + digest, size = local_store.store_file(node_path, filename, data) asset_record_kwargs = { "storage_type": ELECTRON_STORAGE_TYPE, "storage_path": str(node_path), "object_key": filename, "digest_alg": digest.algorithm, "digest": digest.hexdigest, + "size": size, } assets[key] = Asset.create(session, insert_kwargs=asset_record_kwargs, flush=True) diff --git a/covalent_dispatcher/_object_store/base.py b/covalent_dispatcher/_object_store/base.py index 09bfad137..7329c6671 100644 --- a/covalent_dispatcher/_object_store/base.py +++ b/covalent_dispatcher/_object_store/base.py @@ -35,6 +35,9 @@ def scheme(cls) -> str: def digest(self, bucket_name: str, object_key: str) -> Digest: raise NotImplementedError + def size(self, bucket_name: str, object_key: str) -> int: + raise NotImplementedError + def get_uri_components( self, dispatch_id: str, node_id: Optional[int], asset_key: str ) -> Tuple[str, str]: diff --git a/covalent_dispatcher/_object_store/local.py b/covalent_dispatcher/_object_store/local.py index 5e8d08616..6bd4e7ec9 100644 --- a/covalent_dispatcher/_object_store/local.py +++ b/covalent_dispatcher/_object_store/local.py @@ -69,7 +69,7 @@ def size(self, bucket_name: str, object_key: str) -> int: path = os.path.join(bucket_name, object_key) try: - return os.path.size(path) + return os.path.getsize(path) except OSError: return 0 @@ -104,9 +104,12 @@ def get_uri_components( return storage_path, object_key - def store_file(self, storage_path: str, filename: str, data: Any = None) -> Digest: + def store_file(self, storage_path: str, filename: str, data: Any = None) -> Tuple[Digest, int]: """This function writes data corresponding to the filepaths in the DB.""" + if data is None: + return Digest(algorithm="sha1", hexdigest=""), 0 + if filename.endswith(".pkl"): with open(Path(storage_path) / filename, "wb") as f: cloudpickle.dump(data, f) @@ -136,7 +139,8 @@ def store_file(self, storage_path: str, filename: str, data: Any = None) -> Dige raise InvalidFileExtension("The file extension is not supported.") digest = self.digest(bucket_name=storage_path, object_key=filename) - return digest + size = self.size(bucket_name=storage_path, object_key=filename) + return digest, size def load_file(self, storage_path: str, filename: str) -> Any: """This function loads data for the filenames in the DB.""" diff --git a/covalent_dispatcher/_service/assets.py b/covalent_dispatcher/_service/assets.py index 83caffd07..0664e5058 100644 --- a/covalent_dispatcher/_service/assets.py +++ b/covalent_dispatcher/_service/assets.py @@ -256,7 +256,7 @@ async def upload_node_asset( content_length: (header) digest: (header) """ - app_log.debug(f"Requested asset {key} for node {dispatch_id}:{node_id}") + app_log.debug(f"Uploading node asset {dispatch_id}:{node_id}:{key} ({content_length} bytes) ") try: metadata = {"size": content_length, "digest_alg": digest_alg, "digest": digest} @@ -294,6 +294,7 @@ async def upload_dispatch_asset( content_length: (header) digest: (header) """ + app_log.debug(f"Uploading dispatch asset {dispatch_id}:{key} ({content_length} bytes) ") try: metadata = {"size": content_length, "digest_alg": digest_alg, "digest": digest} internal_uri = await _run_in_executor( @@ -329,6 +330,7 @@ async def upload_lattice_asset( digest: (header) """ try: + app_log.debug(f"Uploading lattice asset {dispatch_id}:{key} ({content_length} bytes) ") metadata = {"size": content_length, "digest_alg": digest_alg, "digest": digest} internal_uri = await _run_in_executor( _update_lattice_asset_metadata, @@ -513,6 +515,7 @@ async def _transfer_data(req: Request, destination_url: str): # Stream data to a temporary file, then replace the destination # file atomically tmp_path = f"{dest_path}.tmp" + app_log.debug(f"Streaming file upload to {tmp_path}") async with aiofiles.open(tmp_path, "wb") as f: async for chunk in req.stream(): diff --git a/tests/covalent_dispatcher_tests/_core/runner_ng_test.py b/tests/covalent_dispatcher_tests/_core/runner_ng_test.py index fd88b0839..c1d095cdb 100644 --- a/tests/covalent_dispatcher_tests/_core/runner_ng_test.py +++ b/tests/covalent_dispatcher_tests/_core/runner_ng_test.py @@ -318,12 +318,15 @@ async def test_get_task_result(mocker): "assets": { "output": { "remote_uri": asset_uri, + "size": 0, }, "stdout": { "remote_uri": asset_uri, + "size": 0, }, "stderr": { "remote_uri": asset_uri, + "size": 0, }, }, "status": RESULT_STATUS.COMPLETED, diff --git a/tests/covalent_dispatcher_tests/_dal/asset_test.py b/tests/covalent_dispatcher_tests/_dal/asset_test.py index 2f3975525..4a9f6fe78 100644 --- a/tests/covalent_dispatcher_tests/_dal/asset_test.py +++ b/tests/covalent_dispatcher_tests/_dal/asset_test.py @@ -36,7 +36,7 @@ def test_db(): ) -def get_asset_record(storage_path, object_key, digest_alg="", digest="", size=0): +def get_asset_record(storage_path, object_key, digest_alg="", digest="", size=1024): return models.Asset( storage_type=StorageType.LOCAL.value, storage_path=storage_path, @@ -61,7 +61,7 @@ def test_asset_load_data(): os.unlink(temppath) -def test_asset_store_data(): +def test_asset_store_data(test_db): with tempfile.NamedTemporaryFile("w", delete=True, suffix=".txt") as temp: temppath = temp.name key = os.path.basename(temppath) @@ -70,7 +70,8 @@ def test_asset_store_data(): rec = get_asset_record(storage_path, key) a = Asset(None, rec) - a.store_data("Hello\n") + with test_db.session() as session: + a.store_data("Hello\n", session) with open(temppath, "r") as f: assert f.read() == "Hello\n" @@ -78,7 +79,7 @@ def test_asset_store_data(): os.unlink(temppath) -def test_upload_asset(): +def test_upload_asset(test_db): with tempfile.NamedTemporaryFile("w", delete=True, suffix=".txt") as temp: src_path = temp.name src_key = os.path.basename(src_path) @@ -87,7 +88,8 @@ def test_upload_asset(): rec = get_asset_record(storage_path, src_key) a = Asset(None, rec) - a.store_data("Hello\n") + with test_db.session() as session: + a.store_data("Hello\n", session) with tempfile.NamedTemporaryFile("w", delete=True, suffix=".txt") as temp: dest_path = temp.name diff --git a/tests/covalent_dispatcher_tests/_dal/importers/result_import_test.py b/tests/covalent_dispatcher_tests/_dal/importers/result_import_test.py index b612e8ac8..440742cba 100644 --- a/tests/covalent_dispatcher_tests/_dal/importers/result_import_test.py +++ b/tests/covalent_dispatcher_tests/_dal/importers/result_import_test.py @@ -230,9 +230,9 @@ def test_import_result_with_custom_assets(mocker, test_db): prefix="covalent-" ) as srv_dir: manifest = get_mock_result(dispatch_id, sdk_dir) - manifest.lattice.custom_assets = {"custom_lattice_asset": AssetSchema()} + manifest.lattice.custom_assets = {"custom_lattice_asset": AssetSchema(size=0)} manifest.lattice.transport_graph.nodes[0].custom_assets = { - "custom_electron_asset": AssetSchema() + "custom_electron_asset": AssetSchema(size=0) } filtered_res = import_result(manifest, srv_dir, None) diff --git a/tests/covalent_dispatcher_tests/_db/upsert_test.py b/tests/covalent_dispatcher_tests/_db/upsert_test.py index 2618870bc..63418c66d 100644 --- a/tests/covalent_dispatcher_tests/_db/upsert_test.py +++ b/tests/covalent_dispatcher_tests/_db/upsert_test.py @@ -83,7 +83,7 @@ def test_upsert_electron_data_handles_missing_keys(test_db, result_1, mocker): mocker.patch("covalent_dispatcher._db.write_result_to_db.workflow_db", test_db) mocker.patch("covalent_dispatcher._db.upsert.workflow_db", test_db) mock_store_file = mocker.patch( - "covalent_dispatcher._db.upsert.local_store.store_file", return_value=mock_digest + "covalent_dispatcher._db.upsert.local_store.store_file", return_value=(mock_digest, 2) ) mocker.patch( "covalent_dispatcher._db.upsert.Electron.meta_type.create", diff --git a/tests/covalent_dispatcher_tests/_object_store/local_test.py b/tests/covalent_dispatcher_tests/_object_store/local_test.py index 0823d3600..1b2f0ba7b 100644 --- a/tests/covalent_dispatcher_tests/_object_store/local_test.py +++ b/tests/covalent_dispatcher_tests/_object_store/local_test.py @@ -59,9 +59,11 @@ def test_store_and_load_file(): local_store.store_file(storage_path=temp_dir, filename="pickle.pkl", data=data) assert local_store.load_file(storage_path=temp_dir, filename="pickle.pkl") == data + # No file should be created in this case data = None local_store.store_file(storage_path=temp_dir, filename="pickle.txt", data=data) - assert local_store.load_file(storage_path=temp_dir, filename="pickle.txt") == "" + with pytest.raises(FileNotFoundError): + assert local_store.load_file(storage_path=temp_dir, filename="pickle.txt") data = b"test" local_store.store_file(storage_path=temp_dir, filename="pickle.mdb", data=data) diff --git a/tests/covalent_tests/dispatcher_plugins/local_test.py b/tests/covalent_tests/dispatcher_plugins/local_test.py index d34592ed7..d3c09c316 100644 --- a/tests/covalent_tests/dispatcher_plugins/local_test.py +++ b/tests/covalent_tests/dispatcher_plugins/local_test.py @@ -412,6 +412,8 @@ def workflow(a, b): num_assets = 0 # Populate the lattice asset schemas with dummy URLs for key, asset in manifest.lattice.assets: + if asset.size == 0: + continue num_assets += 1 asset.remote_uri = ( f"http://localhost:48008/api/v2/dispatches/{dispatch_id}/lattice/assets/dummy" @@ -420,11 +422,11 @@ def workflow(a, b): endpoint = f"/api/v2/dispatches/{dispatch_id}/lattice/assets/dummy" r = Response() r.status_code = 200 - mock_post = mocker.patch("covalent._api.apiclient.requests.Session.put", return_value=r) + mock_put = mocker.patch("covalent._api.apiclient.requests.Session.put", return_value=r) LocalDispatcher.upload_assets(manifest) - assert mock_post.call_count == num_assets + assert mock_put.call_count == num_assets def test_get_redispatch_request_body_norebuild(mocker): diff --git a/tests/functional_tests/local_executor_test.py b/tests/functional_tests/local_executor_test.py index 27db56c11..783c6d0ff 100644 --- a/tests/functional_tests/local_executor_test.py +++ b/tests/functional_tests/local_executor_test.py @@ -69,7 +69,7 @@ def workflow(a, b): dispatch_id = ct.dispatch(workflow)(a=1, b=2) workflow_result = rm.get_result(dispatch_id, wait=True) - assert workflow_result.error == "" + assert workflow_result.error is None assert workflow_result.status == Result.COMPLETED assert workflow_result.result == 3 assert workflow_result.get_node_result(node_id=0)["sublattice_result"].result == 3 diff --git a/tests/functional_tests/workflow_stack_test.py b/tests/functional_tests/workflow_stack_test.py index 7bab310bd..5d76993b3 100644 --- a/tests/functional_tests/workflow_stack_test.py +++ b/tests/functional_tests/workflow_stack_test.py @@ -118,7 +118,7 @@ def workflow(a, b): dispatch_id = ct.dispatch(workflow)(a=1, b=2) workflow_result = rm.get_result(dispatch_id, wait=True) - assert workflow_result.error == "" + assert workflow_result.error is None assert workflow_result.status == Result.COMPLETED assert workflow_result.result == 3 assert workflow_result.get_node_result(node_id=0)["sublattice_result"].result == 3 @@ -261,7 +261,7 @@ def workflow(file_path): dispatch_id = ct.dispatch(workflow)(file_path=tmp_path) res = ct.get_result(dispatch_id, wait=True) - assert res.error == "" + assert res.error is None assert res.result == (True, "Hello")