From a1727c443129d265d81c72b63390ea1208123429 Mon Sep 17 00:00:00 2001 From: Sankalp Sanand Date: Wed, 27 Sep 2023 09:23:52 -0400 Subject: [PATCH] more ui stuff, fixed requirements --- VERSION | 2 +- covalent/__init__.py | 3 + covalent_ui/api/main.py | 3 +- covalent_ui/api/v1/data_layer/electron_dal.py | 274 +++++++++++++++++- covalent_ui/api/v1/data_layer/graph_dal.py | 29 +- covalent_ui/api/v1/data_layer/lattice_dal.py | 45 +-- covalent_ui/api/v1/data_layer/summary_dal.py | 49 ++-- covalent_ui/webapp/package.json | 1 + requirements.txt | 2 +- 9 files changed, 325 insertions(+), 83 deletions(-) diff --git a/VERSION b/VERSION index 6976b3c3b..0d0147166 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.220.0-1 +0.220.0-2 diff --git a/covalent/__init__.py b/covalent/__init__.py index 6feff347c..66c4736ed 100644 --- a/covalent/__init__.py +++ b/covalent/__init__.py @@ -38,6 +38,9 @@ lattice, ) from ._workflow.electron import wait # nopycln: import +from ._workflow.qelectron import qelectron # nopycln: import +from .executor.utils import get_context # nopycln: import +from .quantum import QCluster # nopycln: import __all__ = [s for s in dir() if not s.startswith("_")] diff --git a/covalent_ui/api/main.py b/covalent_ui/api/main.py index e01ad864b..c9e5f6ea1 100644 --- a/covalent_ui/api/main.py +++ b/covalent_ui/api/main.py @@ -35,7 +35,6 @@ from covalent._shared_files import logger from covalent._shared_files.config import get_config from covalent_ui.api.v1.routes import routes -from covalent_ui.heartbeat import lifespan file_descriptor = None child_process_id = None @@ -50,7 +49,7 @@ app_log = logger.app_log log_to_file = get_config("sdk.enable_logging").upper() == "TRUE" -app = FastAPI(lifespan=lifespan) +app = FastAPI() sio = socketio.AsyncServer( cors_allowed_origins="*", async_mode="asgi", logger=False, engineio_logger=False ) diff --git a/covalent_ui/api/v1/data_layer/electron_dal.py b/covalent_ui/api/v1/data_layer/electron_dal.py index 865e2912f..3d24cfdfb 100644 --- a/covalent_ui/api/v1/data_layer/electron_dal.py +++ b/covalent_ui/api/v1/data_layer/electron_dal.py @@ -14,12 +14,33 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import timezone +import codecs +import pickle +import uuid +from datetime import timedelta +from pathlib import Path +from fastapi import HTTPException +from fastapi.responses import JSONResponse +from sqlalchemy import extract, select from sqlalchemy.sql import func +from covalent._results_manager.results_manager import get_result +from covalent._shared_files import logger +from covalent._shared_files.config import get_config +from covalent._shared_files.qelectron_utils import QE_DB_DIRNAME +from covalent.quantum.qserver.database import Database +from covalent_dispatcher._core.execution import _get_task_inputs as get_task_inputs +from covalent_dispatcher._service.app import get_result +from covalent_ui.api.v1.data_layer.lattice_dal import Lattices from covalent_ui.api.v1.database.schema.electron import Electron from covalent_ui.api.v1.database.schema.lattices import Lattice +from covalent_ui.api.v1.models.electrons_model import JobDetailsResponse, JobsResponse +from covalent_ui.api.v1.utils.file_handle import validate_data +from covalent_ui.api.v1.utils.models_helper import JobsSortBy, SortDirection + +app_log = logger.app_log +RESULTS_DIR = Path(get_config("dispatcher")["results_dir"]).resolve() class Electrons: @@ -28,6 +49,193 @@ class Electrons: def __init__(self, db_con) -> None: self.db_con = db_con + def electron_exist(self, electron_id: int) -> bool: + return self.db_con.execute( + select(Electron).where(Electron.transport_graph_node_id == electron_id) + ).fetchone() + + def validate_dispatch_and_electron( + self, dispatch_id: uuid.UUID, electron_id: int, response: JobsResponse + ) -> (bool, JobsResponse): + validated = True + if not Lattices(db_con=self.db_con).dispatch_exist(dispatch_id=dispatch_id): + validated = False + response.data = None + response.msg = f"Dispatch ID {dispatch_id} does not exist" + return (validated, response) + if not self.electron_exist(electron_id=electron_id): + validated = False + response.data = None + response.msg = f"Electron ID {electron_id} does not exist" + return (validated, response) + + return (validated, response) + + def get_jobs( + self, + dispatch_id: uuid.UUID, + electron_id: int, + sort_by: JobsSortBy, + sort_direction: SortDirection, + count, + offset, + ) -> JobsResponse: + try: + jobs_response = JobsResponse() + jobs_response.data = None + jobs_response.msg = None + (validated, jobs_response) = self.validate_dispatch_and_electron( + dispatch_id=dispatch_id, electron_id=electron_id, response=jobs_response + ) + if not validated: + return jobs_response + try: + jobs = _qelectron_get_db(str(dispatch_id), electron_id) + if not jobs: + jobs_response.data = [] + jobs_response.msg = f"Job details for {dispatch_id} dispatch with {electron_id} node do not exist." + return jobs_response + except Exception as exc: + app_log.debug(f"Unable to process get jobs \n {exc}") + jobs_response.data = [] + jobs_response.msg = ( + f"Jobs for {dispatch_id} dispatch with {electron_id} node do not exist." + ) + return jobs_response + jobs_list = list( + map( + lambda circuit: { + "job_id": circuit["circuit_id"], + "start_time": circuit["save_time"], + "executor": circuit["result_metadata"]["executor_name"], + "status": "COMPLETED" + if len(circuit["result"]) != 0 and len(circuit["result_metadata"]) != 0 + else "RUNNING", + }, + jobs.values(), + ) + ) + jobs_list.sort( + reverse=sort_direction == SortDirection.DESCENDING, key=lambda d: d[sort_by.value] + ) + result = ( + jobs_list[offset : count + offset] if count is not None else jobs_list[offset:] + ) + + jobs_response.data = result + return jobs_response + except Exception as exc: + app_log.debug(f"Unable to process get jobs \n {exc}") + jobs_response.msg = "Something went wrong. Please check the log." + jobs_response.data = None + return jobs_response + + def get_job_detail(self, dispatch_id, electron_id, job_id) -> JobDetailsResponse: + try: + job_detail_response = JobDetailsResponse() + job_detail_response.data = None + job_detail_response.msg = None + (validated, job_detail_response) = self.validate_dispatch_and_electron( + dispatch_id=dispatch_id, electron_id=electron_id, response=job_detail_response + ) + if not validated: + return job_detail_response + try: + jobs = _qelectron_get_db(dispatch_id=str(dispatch_id), node_id=electron_id) + selected_job = jobs[job_id] + except Exception as exc: + app_log.debug(f"Unable to process get jobs \n {exc}") + job_detail_response.data = [] + job_detail_response.msg = ( + f"Job details for {dispatch_id} dispatch with {electron_id} node do not exist." + ) + return job_detail_response + if not selected_job: + job_detail_response.data = {} + job_detail_response.msg = ( + f"Dispatch ID {dispatch_id} or Electron ID does not exist" + ) + return job_detail_response + if "result" not in selected_job: + job_detail_response.data = {} + job_detail_response.msg = ( + f"Dispatch ID {dispatch_id} or Electron ID does not exist" + ) + return job_detail_response + selected_job["result"] = str(selected_job["result"])[1:-1] + job_overview = { + "overview": { + "job_name": selected_job["circuit_name"] + if "circuit_name" in selected_job + else None, + "backend": selected_job["result_metadata"]["executor_backend_name"] + if "result_metadata" in selected_job + and "executor_backend_name" in selected_job["result_metadata"] + else None, + "time_elapsed": selected_job["execution_time"] + if "execution_time" in selected_job + else None, + "result": selected_job["result"] if "result" in selected_job else None, + "status": "COMPLETED" + if len(selected_job["result"]) != 0 + and len(selected_job["result_metadata"]) != 0 + else "RUNNING", + "start_time": selected_job["save_time"] + if "save_time" in selected_job + else None, + }, + "circuit": { + "total_qbits": None, + "depth": None, + "circuit_diagram": selected_job["circuit_diagram"] + if "circuit_diagram" in selected_job + else None, + }, + "executor": { + "name": selected_job["qexecutor"]["name"] + if "qexecutor" in selected_job and "name" in selected_job["qexecutor"] + else None, + "executor": str(selected_job["qexecutor"]) + if "qexecutor" in selected_job + else None, + }, + } + + job_overview["overview"]["end_time"] = ( + selected_job["save_time"] + timedelta(seconds=selected_job["execution_time"]) + if job_overview["overview"]["start_time"] + and job_overview["overview"]["time_elapsed"] + else None + ) + if selected_job["qnode_specs"] is not None: + job_overview["circuit"]["total_qbits"] = ( + selected_job["qnode_specs"]["num_used_wires"] + if "num_used_wires" in selected_job["qnode_specs"] + else None + ) + job_overview["circuit"]["depth"] = ( + selected_job["qnode_specs"]["depth"] + if "depth" in selected_job["qnode_specs"] + else None + ) + gate_sizes = ( + selected_job["qnode_specs"]["gate_sizes"] + if "gate_sizes" in selected_job["qnode_specs"] + else None + ) + if gate_sizes: + for gate_wires, gate_count in gate_sizes.items(): + job_overview["circuit"][f"qbit{gate_wires}_gates"] = gate_count + + job_detail_response.data = job_overview + job_detail_response.msg = "" + return job_detail_response + except Exception as exc: + app_log.debug(f"Unable to process get job details \n {exc}") + job_detail_response.msg = "Something went wrong. Please check the log." + job_detail_response.data = None + return job_detail_response + def get_electrons_id(self, dispatch_id, electron_id) -> Electron: """ Read electron table by electron id @@ -57,15 +265,17 @@ def get_electrons_id(self, dispatch_id, electron_id) -> Electron: Electron.error_filename, Electron.name, Electron.status, + Electron.job_id, + Electron.qelectron_data_exists, Electron.started_at.label("started_at"), Electron.completed_at.label("completed_at"), ( ( - func.strftime( - "%s", - func.IFNULL(Electron.completed_at, func.datetime.now(timezone.utc)), + func.coalesce( + extract("epoch", Electron.completed_at), + extract("epoch", func.now()), ) - - func.strftime("%s", Electron.started_at) + - extract("epoch", Electron.started_at) ) * 1000 ).label("runtime"), @@ -78,3 +288,57 @@ def get_electrons_id(self, dispatch_id, electron_id) -> Electron: .first() ) return data + + def get_total_quantum_calls(self, dispatch_id, node_id, is_qa_electron: bool): + if not is_qa_electron: + return None + qdb_path = _path_to_qelectron_db(dispatch_id=str(dispatch_id)) + return len( + Database(qdb_path).get_circuit_ids(dispatch_id=str(dispatch_id), node_id=node_id) + ) + + def get_avg_quantum_calls(self, dispatch_id, node_id, is_qa_electron: bool): + if not is_qa_electron: + return None + jobs = _qelectron_get_db(dispatch_id=str(dispatch_id), node_id=node_id) + time = [jobs[value]["execution_time"] for value in jobs] + return sum(time) / len(time) + + def get_electron_inputs(self, dispatch_id: uuid.UUID, electron_id: int) -> str: + """ + Get Electron Inputs + Args: + dispatch_id: Dispatch id of lattice/sublattice + electron_id: Transport graph node id of a electron + Returns: + Returns the inputs data from Result object + """ + + result = get_result(dispatch_id=str(dispatch_id), wait=False) + if isinstance(result, JSONResponse) and result.status_code == 404: + raise HTTPException(status_code=400, detail=result) + result_object = pickle.loads(codecs.decode(result["result"].encode(), "base64")) + electron_result = self.get_electrons_id(dispatch_id, electron_id) + inputs = get_task_inputs( + node_id=electron_id, node_name=electron_result.name, result_object=result_object + ) + return validate_data(inputs) + + +def _path_to_qelectron_db(dispatch_id: str) -> Path: + """Construct path to the QElectron database in Covalent's results directory.""" + + # This is NOT the QServer's data but rather the qdb stored on Covalent's server. + qdb_path = RESULTS_DIR / dispatch_id / QE_DB_DIRNAME + qdb_path = qdb_path.resolve().absolute() + + if not qdb_path.exists(): + app_log.error(f"Expected QElectron database at {qdb_path}.") + + return qdb_path + + +def _qelectron_get_db(dispatch_id: str, node_id: int) -> dict: + """Return the QElectron jobs dictionary for a given node.""" + qdb_path = _path_to_qelectron_db(dispatch_id) + return Database(qdb_path).get_db(dispatch_id=dispatch_id, node_id=node_id) diff --git a/covalent_ui/api/v1/data_layer/graph_dal.py b/covalent_ui/api/v1/data_layer/graph_dal.py index 9b5cb2216..a8845c8a1 100644 --- a/covalent_ui/api/v1/data_layer/graph_dal.py +++ b/covalent_ui/api/v1/data_layer/graph_dal.py @@ -17,7 +17,6 @@ """Graph Data Layer""" from uuid import UUID -from fastapi import HTTPException from sqlalchemy import text from sqlalchemy.orm import Session @@ -49,16 +48,17 @@ def get_nodes(self, parent_lattice_id: int): electrons.completed_at, electrons.status, electrons.type, + electrons.qelectron_data_exists, electrons.executor as executor_label, - (case when electrons.type == 'sublattice' + (case when electrons.type = 'sublattice' then (select lattices.dispatch_id from lattices - where lattices.electron_id == electrons.id) + where lattices.electron_id = electrons.id) else Null END ) as sublattice_dispatch_id - from electrons join lattices on electrons.parent_lattice_id == lattices.id - where lattices.id == :a + from electrons join lattices on electrons.parent_lattice_id = lattices.id + where lattices.id = :a """ ) result = self.db_con.execute(sql, {"a": parent_lattice_id}).fetchall() @@ -87,21 +87,6 @@ def get_links(self, parent_lattice_id: int): .all() ) - def check_error(self, data): - """ - Helper method to rise exception if data is None - - Args: - data: list of queried data - Return: - data - Rise: - Http Exception with status code 400 and details - """ - if data is None: - raise HTTPException(status_code=400, detail=["Something went wrong"]) - return data - def get_graph(self, dispatch_id: UUID): """ Get graph data from parent lattice id @@ -118,7 +103,7 @@ def get_graph(self, dispatch_id: UUID): ) if parent_lattice_id is not None: parrent_id = parent_lattice_id[0] - nodes = self.check_error(self.get_nodes(parrent_id)) - links = self.check_error(self.get_links(parrent_id)) + nodes = self.get_nodes(parrent_id) + links = self.get_links(parrent_id) return {"dispatch_id": str(dispatch_id), "nodes": nodes, "links": links} return None diff --git a/covalent_ui/api/v1/data_layer/lattice_dal.py b/covalent_ui/api/v1/data_layer/lattice_dal.py index 6aecda076..3813617df 100644 --- a/covalent_ui/api/v1/data_layer/lattice_dal.py +++ b/covalent_ui/api/v1/data_layer/lattice_dal.py @@ -16,10 +16,10 @@ """Lattice Data Layer""" -from datetime import timezone from typing import List from uuid import UUID +from sqlalchemy import extract, select from sqlalchemy.orm import Session from sqlalchemy.sql import desc, func @@ -34,6 +34,11 @@ class Lattices: def __init__(self, db_con: Session) -> None: self.db_con = db_con + def dispatch_exist(self, dispatch_id: UUID) -> bool: + return self.db_con.execute( + select(Lattice).where(Lattice.dispatch_id == str(dispatch_id)) + ).fetchone() + def get_lattices_id(self, dispatch_id: UUID) -> LatticeDetailResponse: """ Get lattices from dispatch id @@ -53,20 +58,20 @@ def get_lattices_id(self, dispatch_id: UUID) -> LatticeDetailResponse: Lattice.results_filename, Lattice.docstring_filename, Lattice.started_at.label("start_time"), - func.IFNULL((Lattice.completed_at), None).label("end_time"), + func.coalesce((Lattice.completed_at), None).label("end_time"), Lattice.electron_num.label("total_electrons"), Lattice.completed_electron_num.label("total_electrons_completed"), ( ( - func.strftime( - "%s", - func.IFNULL(Lattice.completed_at, func.datetime.now(timezone.utc)), + func.coalesce( + extract("epoch", Lattice.completed_at), + extract("epoch", func.now()), ) - - func.strftime("%s", Lattice.started_at) + - extract("epoch", Lattice.started_at) ) * 1000 ).label("runtime"), - func.IFNULL((Lattice.updated_at), None).label("updated_at"), + func.coalesce((Lattice.updated_at), None).label("updated_at"), ) .filter(Lattice.dispatch_id == str(dispatch_id), Lattice.is_active.is_not(False)) .first() @@ -108,21 +113,6 @@ def get_lattices_id_storage_file(self, dispatch_id: UUID): .first() ) - def get_lattice_id_by_dispatch_id(self, dispatch_id: UUID): - """ - Get top lattice id from dispatch id - Args: - dispatch_id: UUID of dispatch - Returns: - Top most lattice id - """ - data = ( - self.db_con.query(Lattice.id) - .filter(Lattice.dispatch_id == str(dispatch_id), Lattice.electron_id.is_(None)) - .first() - ) - return data[0] - def get_sub_lattice_details(self, sort_by, sort_direction, dispatch_id) -> List[Lattice]: """ Get summary of sub lattices @@ -139,11 +129,10 @@ def get_sub_lattice_details(self, sort_by, sort_direction, dispatch_id) -> List[ Lattice.name.label("lattice_name"), ( ( - func.strftime( - "%s", - func.IFNULL(Lattice.completed_at, func.datetime.now(timezone.utc)), + func.coalesce( + extract("epoch", Lattice.completed_at), extract("epoch", func.now()) ) - - func.strftime("%s", Lattice.started_at) + - extract("epoch", Lattice.started_at) ) * 1000 ).label("runtime"), @@ -151,9 +140,7 @@ def get_sub_lattice_details(self, sort_by, sort_direction, dispatch_id) -> List[ Lattice.completed_electron_num.label("total_electrons_completed"), Lattice.status.label("status"), Lattice.started_at.label("started_at"), - func.IFNULL((Lattice.completed_at), None).label("ended_at"), - Lattice.updated_at.label("updated_at"), - Lattice.updated_at.label("updated_at"), + func.coalesce((Lattice.completed_at), None).label("ended_at"), Lattice.updated_at.label("updated_at"), ) .filter( diff --git a/covalent_ui/api/v1/data_layer/summary_dal.py b/covalent_ui/api/v1/data_layer/summary_dal.py index a56c57260..0442d1a7d 100644 --- a/covalent_ui/api/v1/data_layer/summary_dal.py +++ b/covalent_ui/api/v1/data_layer/summary_dal.py @@ -16,7 +16,6 @@ import uuid from datetime import datetime, timezone -from sqlite3 import InterfaceError from typing import List from sqlalchemy import case, extract, update @@ -32,6 +31,7 @@ DeleteDispatchesRequest, DeleteDispatchesResponse, DispatchDashBoardResponse, + DispatchModule, DispatchResponse, SortDirection, ) @@ -78,7 +78,7 @@ def get_summary( Lattice.electron_num.label("total_electrons"), Lattice.completed_electron_num.label("total_electrons_completed"), Lattice.started_at.label("started_at"), - func.IFNULL(Lattice.completed_at, None).label("ended_at"), + func.coalesce(Lattice.completed_at, None).label("ended_at"), Lattice.status.label("status"), Lattice.updated_at.label("updated_at"), ).filter( @@ -114,7 +114,7 @@ def get_summary( else sort_by.value ) - result = data.offset(offset).limit(count).all() + results = data.offset(offset).limit(count).all() counter = ( self.db_con.query(func.count(Lattice.id)) @@ -129,7 +129,9 @@ def get_summary( ) .first() ) - return DispatchResponse(items=result, total_count=counter[0]) + return DispatchResponse( + items=[DispatchModule.from_orm(result) for result in results], total_count=counter[0] + ) def get_summary_overview(self) -> Lattice: """ @@ -250,7 +252,7 @@ def delete_dispatches(self, data: DeleteDispatchesRequest): success = [] failure = [] message = "No dispatches were deleted" - if len(data.dispatches) == 0: + if data.dispatches is None or len(data.dispatches) == 0: return DeleteDispatchesResponse( success_items=success, failure_items=failure, @@ -317,7 +319,7 @@ def delete_dispatches(self, data: DeleteDispatchesRequest): self.db_con.execute(update_lattice) self.db_con.commit() success.append(dispatch_id) - except InterfaceError: + except Exception: failure.append(dispatch_id) if len(success) > 0: message = "Dispatch(es) have been deleted successfully!" @@ -340,23 +342,24 @@ def delete_all_dispatches(self, data: DeleteAllDispatchesRequest): """ success = [] failure = [] + dispatches = [] status_filters = self.get_filters(data.status_filter) - filter_dispatches = ( - self.db_con.query(Lattice.id, Lattice.dispatch_id) - .filter( - or_( - Lattice.name.ilike(f"%{data.search_string}%"), - Lattice.dispatch_id.ilike(f"%{data.search_string}%"), - ), - Lattice.status.in_(status_filters), - Lattice.is_active.is_not(False), + try: + filter_dispatches = ( + self.db_con.query(Lattice.id, Lattice.dispatch_id) + .filter( + or_( + Lattice.name.ilike(f"%{data.search_string}%"), + Lattice.dispatch_id.ilike(f"%{data.search_string}%"), + ), + Lattice.status.in_(status_filters), + Lattice.is_active.is_not(False), + ) + .all() ) - .all() - ) - dispatch_ids = [o.id for o in filter_dispatches] - dispatches = [uuid.UUID(o.dispatch_id) for o in filter_dispatches] - if len(dispatches) >= 1: - try: + dispatch_ids = [o.id for o in filter_dispatches] + dispatches = [uuid.UUID(o.dispatch_id) for o in filter_dispatches] + if len(dispatches) >= 1: electron_ids = ( self.db_con.query(Electron.id) .filter( @@ -410,8 +413,8 @@ def delete_all_dispatches(self, data: DeleteAllDispatchesRequest): ) self.db_con.commit() success = dispatches - except InterfaceError: - failure = dispatches + except Exception: + failure = dispatches if (len(failure) == 0 and len(success) == 0) or (len(failure) > 0 and len(success) == 0): message = "No dispatches were deleted" elif len(failure) > 0 and len(success) > 0: diff --git a/covalent_ui/webapp/package.json b/covalent_ui/webapp/package.json index 31a8326a2..902d3a8ad 100644 --- a/covalent_ui/webapp/package.json +++ b/covalent_ui/webapp/package.json @@ -31,6 +31,7 @@ "react-router-dom": "^6.0.2", "react-scripts": "4.0.3", "react-syntax-highlighter": "^15.4.5", + "react-virtualized": "^9.22.5", "react-tooltip": "^4.2.21", "redux-localstorage": "^0.4.1", "socket.io-client": "^4.4.0", diff --git a/requirements.txt b/requirements.txt index a0096f66d..d8f9f29e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,7 +11,7 @@ mpire==2.7.1 networkx==2.8.6 orjson==3.8.10 pennylane==0.31.1 -psutil>=5.9.0,<=5.9.2 +psutil>=5.9.0 pydantic>=1.10.1,<=1.10.2 python-socketio==5.7.1 requests>=2.24.0,<=2.28.1