Skip to content

Commit

Permalink
some fixes to make dispatching work
Browse files Browse the repository at this point in the history
  • Loading branch information
kessler-frost committed Oct 12, 2023
1 parent e9b8d39 commit 1933955
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 40 deletions.
1 change: 1 addition & 0 deletions covalent/_shared_files/schemas/electron.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# electron metadata
"executor",
"executor_data",
"qelectron_data_exists",
}

ELECTRON_ASSET_KEYS = {
Expand Down
3 changes: 3 additions & 0 deletions covalent_dispatcher/_core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def _run_abstract_task(

node_result = datasvc.generate_node_result(
dispatch_id=dispatch_id,
node_name=node_name,
node_id=node_id,
start_time=timestamp,
status=RESULT_STATUS.RUNNING,
Expand Down Expand Up @@ -223,6 +224,7 @@ def qelectron_compatible_wrapper(node_id, dispatch_id, ser_user_fn, *args, **kwa
node_result = datasvc.generate_node_result(
dispatch_id=dispatch_id,
node_id=node_id,
node_name=node_name,
end_time=datetime.now(timezone.utc),
status=status,
output=output,
Expand All @@ -238,6 +240,7 @@ def qelectron_compatible_wrapper(node_id, dispatch_id, ser_user_fn, *args, **kwa
node_result = datasvc.generate_node_result(
dispatch_id=dispatch_id,
node_id=node_id,
node_name=node_name,
end_time=datetime.now(timezone.utc),
status=RESULT_STATUS.FAILED,
error=error_msg,
Expand Down
32 changes: 1 addition & 31 deletions covalent_dispatcher/_dal/db_interfaces/electron_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,6 @@
from covalent._shared_files.schemas.electron import ELECTRON_ASSET_KEYS, ELECTRON_METADATA_KEYS
from covalent._shared_files.util_classes import Status

ATTRIBUTES = {
"name",
"start_time",
"end_time",
"status",
"sub_dispatch_id",
"function",
"function_string",
"output",
"value",
"error",
"stdout",
"stderr",
"metadata",
}

METADATA_KEYS = ELECTRON_METADATA_KEYS
ASSET_KEYS = ELECTRON_ASSET_KEYS

Expand All @@ -51,6 +35,7 @@
"status": "status",
"executor": "executor",
"executor_data": "executor_data",
"qelectron_data_exists": "qelectron_data_exists",
}

_db_meta_record_map = {
Expand All @@ -64,21 +49,6 @@

_meta_record_map.update(_db_meta_record_map)

# Obsoleted by ElectronAsset table
_asset_record_map = {
"function": "function_filename",
"function_string": "function_string_filename",
"output": "results_filename",
"value": "value_filename",
"error": "error_filename",
"stdout": "stdout_filename",
"stderr": "stderr_filename",
"executor_data": "executor_data_filename",
"deps": "deps_filename",
"call_before": "call_before_filename",
"call_after": "call_after_filename",
}


def identity(x):
return x
Expand Down
7 changes: 7 additions & 0 deletions covalent_dispatcher/_dal/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def _update_node(
error: Exception = None,
stdout: str = None,
stderr: str = None,
qelectron_data_exists: bool = None,
) -> bool:
"""
Update the node result in the transport graph.
Expand Down Expand Up @@ -253,6 +254,7 @@ def _update_node(

if end_time is not None:
self.lattice.transport_graph.set_node_value(node_id, "end_time", end_time, session)

if output is not None:
self.lattice.transport_graph.set_node_value(node_id, "output", output, session)

Expand All @@ -265,6 +267,11 @@ def _update_node(
if stderr is not None:
self.lattice.transport_graph.set_node_value(node_id, "stderr", stderr, session)

if qelectron_data_exists is not None:
self.lattice.transport_graph.set_node_value(
node_id, "qelectron_data_exists", qelectron_data_exists, session
)

# Handle postprocessing node
tg = self.lattice.transport_graph
if name.startswith(postprocess_prefix) and end_time is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""Schema updates for new DAL
Revision ID: 1142d81b29b8
Revises: f64ecaa040d5
Revises: de0a6c0a3e3d
Create Date: 2023-06-18 09:18:31.450740
"""
Expand All @@ -28,7 +28,7 @@
# pragma: allowlist nextline secret
revision = "1142d81b29b8"
# pragma: allowlist nextline secret
down_revision = "f64ecaa040d5"
down_revision = "de0a6c0a3e3d"
branch_labels = None
depends_on = None

Expand Down
1 change: 0 additions & 1 deletion covalent_ui/api/v1/data_layer/electron_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from covalent._shared_files.qelectron_utils import QE_DB_DIRNAME
from covalent.quantum.qserver.database import Database
from covalent_dispatcher._core.execution import _get_task_inputs as get_task_inputs
from covalent_dispatcher._service.app import get_result
from covalent_ui.api.v1.data_layer.lattice_dal import Lattices
from covalent_ui.api.v1.database.schema.electron import Electron
from covalent_ui.api.v1.database.schema.lattices import Lattice
Expand Down
14 changes: 8 additions & 6 deletions covalent_ui/api/v1/routes/end_points/electron_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def get_electron_details(dispatch_id: uuid.UUID, electron_id: int):
)


def _get_abstract_task_inputs(dispatch_id: str, node_id: int) -> dict:
async def _get_abstract_task_inputs(dispatch_id: str, node_id: int) -> dict:
"""Return placeholders for the required inputs for a task execution.
Args:
Expand All @@ -110,7 +110,7 @@ def _get_abstract_task_inputs(dispatch_id: str, node_id: int) -> dict:

abstract_task_input = {"args": [], "kwargs": {}}

in_edges = core_graph.get_incoming_edges(dispatch_id, node_id)
in_edges = await core_graph.get_incoming_edges(dispatch_id, node_id)
for edge in in_edges:
parent = edge["source"]

Expand All @@ -130,7 +130,7 @@ def _get_abstract_task_inputs(dispatch_id: str, node_id: int) -> dict:


# Domain: data
def get_electron_inputs(dispatch_id: uuid.UUID, electron_id: int) -> str:
async def get_electron_inputs(dispatch_id: uuid.UUID, electron_id: int) -> str:
"""
Get Electron Inputs
Args:
Expand All @@ -140,7 +140,9 @@ def get_electron_inputs(dispatch_id: uuid.UUID, electron_id: int) -> str:
Returns the inputs data from Result object
"""

abstract_inputs = _get_abstract_task_inputs(dispatch_id=str(dispatch_id), node_id=electron_id)
abstract_inputs = await _get_abstract_task_inputs(
dispatch_id=str(dispatch_id), node_id=electron_id
)

# Resolve node ids to object strings
input_assets = {"args": [], "kwargs": {}}
Expand All @@ -166,7 +168,7 @@ def get_electron_inputs(dispatch_id: uuid.UUID, electron_id: int) -> str:


@routes.get("/{dispatch_id}/electron/{electron_id}/details/{name}")
def get_electron_file(dispatch_id: uuid.UUID, electron_id: int, name: ElectronFileOutput):
async def get_electron_file(dispatch_id: uuid.UUID, electron_id: int, name: ElectronFileOutput):
"""
Get Electron details
Args:
Expand Down Expand Up @@ -194,7 +196,7 @@ def get_electron_file(dispatch_id: uuid.UUID, electron_id: int, name: ElectronFi
)
handler = FileHandler(result["storage_path"])
if name == "inputs":
response, python_object = get_electron_inputs(
response, python_object = await get_electron_inputs(
dispatch_id=dispatch_id, electron_id=electron_id
)
return ElectronFileResponse(data=str(response), python_object=str(python_object))
Expand Down

0 comments on commit 1933955

Please sign in to comment.