diff --git a/covalent/_shared_files/schemas/electron.py b/covalent/_shared_files/schemas/electron.py index 5435dccac..3b79793cc 100644 --- a/covalent/_shared_files/schemas/electron.py +++ b/covalent/_shared_files/schemas/electron.py @@ -33,6 +33,7 @@ # electron metadata "executor", "executor_data", + "qelectron_data_exists", } ELECTRON_ASSET_KEYS = { diff --git a/covalent_dispatcher/_core/runner.py b/covalent_dispatcher/_core/runner.py index f3616a2df..c945dfa8d 100644 --- a/covalent_dispatcher/_core/runner.py +++ b/covalent_dispatcher/_core/runner.py @@ -102,6 +102,7 @@ async def _run_abstract_task( node_result = datasvc.generate_node_result( dispatch_id=dispatch_id, + node_name=node_name, node_id=node_id, start_time=timestamp, status=RESULT_STATUS.RUNNING, @@ -223,6 +224,7 @@ def qelectron_compatible_wrapper(node_id, dispatch_id, ser_user_fn, *args, **kwa node_result = datasvc.generate_node_result( dispatch_id=dispatch_id, node_id=node_id, + node_name=node_name, end_time=datetime.now(timezone.utc), status=status, output=output, @@ -238,6 +240,7 @@ def qelectron_compatible_wrapper(node_id, dispatch_id, ser_user_fn, *args, **kwa node_result = datasvc.generate_node_result( dispatch_id=dispatch_id, node_id=node_id, + node_name=node_name, end_time=datetime.now(timezone.utc), status=RESULT_STATUS.FAILED, error=error_msg, diff --git a/covalent_dispatcher/_dal/db_interfaces/electron_utils.py b/covalent_dispatcher/_dal/db_interfaces/electron_utils.py index c3faaef30..eceb626df 100644 --- a/covalent_dispatcher/_dal/db_interfaces/electron_utils.py +++ b/covalent_dispatcher/_dal/db_interfaces/electron_utils.py @@ -21,22 +21,6 @@ from covalent._shared_files.schemas.electron import ELECTRON_ASSET_KEYS, ELECTRON_METADATA_KEYS from covalent._shared_files.util_classes import Status -ATTRIBUTES = { - "name", - "start_time", - "end_time", - "status", - "sub_dispatch_id", - "function", - "function_string", - "output", - "value", - "error", - "stdout", - "stderr", - "metadata", -} - METADATA_KEYS = ELECTRON_METADATA_KEYS ASSET_KEYS = ELECTRON_ASSET_KEYS @@ -51,6 +35,7 @@ "status": "status", "executor": "executor", "executor_data": "executor_data", + "qelectron_data_exists": "qelectron_data_exists", } _db_meta_record_map = { @@ -64,21 +49,6 @@ _meta_record_map.update(_db_meta_record_map) -# Obsoleted by ElectronAsset table -_asset_record_map = { - "function": "function_filename", - "function_string": "function_string_filename", - "output": "results_filename", - "value": "value_filename", - "error": "error_filename", - "stdout": "stdout_filename", - "stderr": "stderr_filename", - "executor_data": "executor_data_filename", - "deps": "deps_filename", - "call_before": "call_before_filename", - "call_after": "call_after_filename", -} - def identity(x): return x diff --git a/covalent_dispatcher/_dal/result.py b/covalent_dispatcher/_dal/result.py index beec0dbf4..c6d743a73 100644 --- a/covalent_dispatcher/_dal/result.py +++ b/covalent_dispatcher/_dal/result.py @@ -205,6 +205,7 @@ def _update_node( error: Exception = None, stdout: str = None, stderr: str = None, + qelectron_data_exists: bool = None, ) -> bool: """ Update the node result in the transport graph. @@ -253,6 +254,7 @@ def _update_node( if end_time is not None: self.lattice.transport_graph.set_node_value(node_id, "end_time", end_time, session) + if output is not None: self.lattice.transport_graph.set_node_value(node_id, "output", output, session) @@ -265,6 +267,11 @@ def _update_node( if stderr is not None: self.lattice.transport_graph.set_node_value(node_id, "stderr", stderr, session) + if qelectron_data_exists is not None: + self.lattice.transport_graph.set_node_value( + node_id, "qelectron_data_exists", qelectron_data_exists, session + ) + # Handle postprocessing node tg = self.lattice.transport_graph if name.startswith(postprocess_prefix) and end_time is not None: diff --git a/covalent_migrations/versions/1142d81b29b8_schema_updates_for_new_dal.py b/covalent_migrations/versions/1142d81b29b8_schema_updates_for_new_dal.py index e4ce9b93b..795711bbb 100644 --- a/covalent_migrations/versions/1142d81b29b8_schema_updates_for_new_dal.py +++ b/covalent_migrations/versions/1142d81b29b8_schema_updates_for_new_dal.py @@ -17,7 +17,7 @@ """Schema updates for new DAL Revision ID: 1142d81b29b8 -Revises: f64ecaa040d5 +Revises: de0a6c0a3e3d Create Date: 2023-06-18 09:18:31.450740 """ @@ -28,7 +28,7 @@ # pragma: allowlist nextline secret revision = "1142d81b29b8" # pragma: allowlist nextline secret -down_revision = "f64ecaa040d5" +down_revision = "de0a6c0a3e3d" branch_labels = None depends_on = None diff --git a/covalent_ui/api/v1/data_layer/electron_dal.py b/covalent_ui/api/v1/data_layer/electron_dal.py index ad356b05e..7b69ecc89 100644 --- a/covalent_ui/api/v1/data_layer/electron_dal.py +++ b/covalent_ui/api/v1/data_layer/electron_dal.py @@ -31,7 +31,6 @@ from covalent._shared_files.qelectron_utils import QE_DB_DIRNAME from covalent.quantum.qserver.database import Database from covalent_dispatcher._core.execution import _get_task_inputs as get_task_inputs -from covalent_dispatcher._service.app import get_result from covalent_ui.api.v1.data_layer.lattice_dal import Lattices from covalent_ui.api.v1.database.schema.electron import Electron from covalent_ui.api.v1.database.schema.lattices import Lattice diff --git a/covalent_ui/api/v1/routes/end_points/electron_routes.py b/covalent_ui/api/v1/routes/end_points/electron_routes.py index 3cbaf2bc3..a88f5482b 100644 --- a/covalent_ui/api/v1/routes/end_points/electron_routes.py +++ b/covalent_ui/api/v1/routes/end_points/electron_routes.py @@ -95,7 +95,7 @@ def get_electron_details(dispatch_id: uuid.UUID, electron_id: int): ) -def _get_abstract_task_inputs(dispatch_id: str, node_id: int) -> dict: +async def _get_abstract_task_inputs(dispatch_id: str, node_id: int) -> dict: """Return placeholders for the required inputs for a task execution. Args: @@ -110,7 +110,7 @@ def _get_abstract_task_inputs(dispatch_id: str, node_id: int) -> dict: abstract_task_input = {"args": [], "kwargs": {}} - in_edges = core_graph.get_incoming_edges(dispatch_id, node_id) + in_edges = await core_graph.get_incoming_edges(dispatch_id, node_id) for edge in in_edges: parent = edge["source"] @@ -130,7 +130,7 @@ def _get_abstract_task_inputs(dispatch_id: str, node_id: int) -> dict: # Domain: data -def get_electron_inputs(dispatch_id: uuid.UUID, electron_id: int) -> str: +async def get_electron_inputs(dispatch_id: uuid.UUID, electron_id: int) -> str: """ Get Electron Inputs Args: @@ -140,7 +140,9 @@ def get_electron_inputs(dispatch_id: uuid.UUID, electron_id: int) -> str: Returns the inputs data from Result object """ - abstract_inputs = _get_abstract_task_inputs(dispatch_id=str(dispatch_id), node_id=electron_id) + abstract_inputs = await _get_abstract_task_inputs( + dispatch_id=str(dispatch_id), node_id=electron_id + ) # Resolve node ids to object strings input_assets = {"args": [], "kwargs": {}} @@ -166,7 +168,7 @@ def get_electron_inputs(dispatch_id: uuid.UUID, electron_id: int) -> str: @routes.get("/{dispatch_id}/electron/{electron_id}/details/{name}") -def get_electron_file(dispatch_id: uuid.UUID, electron_id: int, name: ElectronFileOutput): +async def get_electron_file(dispatch_id: uuid.UUID, electron_id: int, name: ElectronFileOutput): """ Get Electron details Args: @@ -194,7 +196,7 @@ def get_electron_file(dispatch_id: uuid.UUID, electron_id: int, name: ElectronFi ) handler = FileHandler(result["storage_path"]) if name == "inputs": - response, python_object = get_electron_inputs( + response, python_object = await get_electron_inputs( dispatch_id=dispatch_id, electron_id=electron_id ) return ElectronFileResponse(data=str(response), python_object=str(python_object))