From f5fc7fb163624ff5a13539c65825af53527da65a Mon Sep 17 00:00:00 2001 From: Casey Jao Date: Fri, 30 Jun 2023 09:39:35 -0400 Subject: [PATCH] Mem (2/3): redirect dispatcher to in-memory runner Make API endpoints restful Cancel all dispatches upon shutdown --- covalent/_api/apiclient.py | 19 +++ covalent/_dispatcher_plugins/local.py | 8 +- covalent/_results_manager/results_manager.py | 6 +- covalent_dispatcher/_core/dispatcher.py | 30 ++++- covalent_dispatcher/_core/runner.py | 2 +- covalent_dispatcher/_dal/utils/uri_filters.py | 10 +- covalent_dispatcher/_service/app.py | 122 +++++++++++++----- covalent_dispatcher/_service/assets.py | 71 +++++++++- covalent_ui/api/v1/routes/routes.py | 5 +- .../_core/dispatcher_test.py | 86 +++++++++++- .../_service/app_test.py | 85 +++++++----- .../_service/assets_test.py | 63 ++++++--- .../dispatcher_plugins/local_test.py | 18 +-- .../results_manager_test.py | 20 +-- 14 files changed, 411 insertions(+), 134 deletions(-) diff --git a/covalent/_api/apiclient.py b/covalent/_api/apiclient.py index aae20d0fa7..ddbe3cc5c5 100644 --- a/covalent/_api/apiclient.py +++ b/covalent/_api/apiclient.py @@ -95,6 +95,25 @@ def post(self, endpoint: str, **kwargs): return r + def delete(self, endpoint: str, **kwargs): + headers = CovalentAPIClient.get_extra_headers() + url = self.dispatcher_addr + endpoint + try: + with requests.Session() as session: + if self.adapter: + session.mount("http://", self.adapter) + + r = session.delete(url, headers=headers, **kwargs) + + if self.auto_raise: + r.raise_for_status() + except requests.exceptions.ConnectionError: + message = f"The Covalent server cannot be reached at {url}. Local servers can be started using `covalent start` in the terminal. If you are using a remote Covalent server, contact your systems administrator to report an outage." + print(message) + raise + + return r + @classmethod def get_extra_headers(headers: Dict) -> Dict: # This is expected to be a JSONified dictionary diff --git a/covalent/_dispatcher_plugins/local.py b/covalent/_dispatcher_plugins/local.py index 0b9c498d72..74f76d6f68 100644 --- a/covalent/_dispatcher_plugins/local.py +++ b/covalent/_dispatcher_plugins/local.py @@ -249,8 +249,8 @@ def start( if dispatcher_addr is None: dispatcher_addr = format_server_url() - endpoint = f"/api/v1/dispatch/start/{dispatch_id}" - r = APIClient(dispatcher_addr).put(endpoint) + endpoint = f"/api/v1/dispatch/{dispatch_id}" + r = APIClient(dispatcher_addr).post(endpoint) r.raise_for_status() return r.content.decode("utf-8").strip().replace('"', "") @@ -546,7 +546,7 @@ def register_manifest( else: stripped = manifest - endpoint = "/api/v1/dispatch/register" + endpoint = "/api/v1/dispatch" if parent_dispatch_id: endpoint = f"{endpoint}?parent_dispatch_id={parent_dispatch_id}" @@ -578,7 +578,7 @@ def register_derived_manifest( # We don't yet support pulling assets for redispatch stripped = strip_local_uris(manifest) - endpoint = f"/api/v1/dispatch/register/{dispatch_id}" + endpoint = f"/api/v1/dispatch/{dispatch_id}/redispatch" params = {"reuse_previous_results": reuse_previous_results} r = APIClient(dispatcher_addr).post(endpoint, data=stripped.json(), params=params) diff --git a/covalent/_results_manager/results_manager.py b/covalent/_results_manager/results_manager.py index a4df3df11e..f3f27183b0 100644 --- a/covalent/_results_manager/results_manager.py +++ b/covalent/_results_manager/results_manager.py @@ -134,12 +134,12 @@ def cancel(dispatch_id: str, task_ids: List[int] = None, dispatcher_addr: str = task_ids = [] api_client = CovalentAPIClient(dispatcher_addr) - endpoint = "/api/v1/dispatch/cancel" + endpoint = f"/api/v1/dispatch/{dispatch_id}" if isinstance(task_ids, int): task_ids = [task_ids] - r = api_client.post(endpoint, json={"dispatch_id": dispatch_id, "task_ids": task_ids}) + r = api_client.delete(endpoint, params={"task_ids": task_ids}) return r.content.decode("utf-8").strip().replace('"', "") @@ -176,7 +176,7 @@ def _get_result_export_from_dispatcher( adapter = HTTPAdapter(max_retries=Retry(total=retries, backoff_factor=1)) api_client = CovalentAPIClient(dispatcher_addr, adapter=adapter, auto_raise=False) - endpoint = "/api/v1/dispatch/export/" + dispatch_id + endpoint = f"/api/v1/dispatch/{dispatch_id}" response = api_client.get( endpoint, params={"wait": wait, "status_only": status_only}, diff --git a/covalent_dispatcher/_core/dispatcher.py b/covalent_dispatcher/_core/dispatcher.py index ec20553419..e159a8a237 100644 --- a/covalent_dispatcher/_core/dispatcher.py +++ b/covalent_dispatcher/_core/dispatcher.py @@ -35,7 +35,9 @@ from covalent._shared_files.util_classes import RESULT_STATUS from . import data_manager as datasvc -from . import runner_ng +from . import runner + +# from . import runner_ng from .data_modules import job_manager as jbmgr from .dispatcher_modules.caches import _pending_parents, _sorted_task_groups, _unresolved_tasks @@ -224,13 +226,28 @@ async def _submit_task_group(dispatch_id: str, sorted_nodes: List[int], task_gro app_log.debug(f"Using new runner for task group {task_group_id}") known_nodes = list(set(known_nodes)) - coro = runner_ng.run_abstract_task_group( + + task_spec = task_specs[0] + abstract_inputs = {"args": task_spec["args_ids"], "kwargs": task_spec["kwargs_ids"]} + + # Temporarily redirect to in-memory runner (this is incompatible with task packing) + if len(task_specs) > 1: + raise RuntimeError("Task packing is not supported yet.") + + coro = runner.run_abstract_task( dispatch_id=dispatch_id, - task_group_id=task_group_id, - task_seq=task_specs, - known_nodes=known_nodes, + node_id=task_group_id, + node_name=node_name, + abstract_inputs=abstract_inputs, selected_executor=[selected_executor, selected_executor_data], ) + # coro = runner_ng.run_abstract_task_group( + # dispatch_id=dispatch_id, + # task_group_id=task_group_id, + # task_seq=task_specs, + # known_nodes=known_nodes, + # selected_executor=[selected_executor, selected_executor_data], + # ) asyncio.create_task(coro) else: @@ -337,7 +354,8 @@ async def cancel_dispatch(dispatch_id: str, task_ids: List[int] = []) -> None: app_log.debug(f"Cancelling dispatch {dispatch_id}") await jbmgr.set_cancel_requested(dispatch_id, task_ids) - await runner_ng.cancel_tasks(dispatch_id, task_ids) + await runner.cancel_tasks(dispatch_id, task_ids) + # await runner_ng.cancel_tasks(dispatch_id, task_ids) # Recursively cancel running sublattice dispatches attrs = await datasvc.electron.get_bulk(dispatch_id, task_ids, ["sub_dispatch_id"]) diff --git a/covalent_dispatcher/_core/runner.py b/covalent_dispatcher/_core/runner.py index 615df4de47..8cf65c69f3 100644 --- a/covalent_dispatcher/_core/runner.py +++ b/covalent_dispatcher/_core/runner.py @@ -32,7 +32,7 @@ from covalent._shared_files.config import get_config from covalent._shared_files.util_classes import RESULT_STATUS from covalent._workflow import DepsBash, DepsCall, DepsPip -from covalent.executor.utils.wrappers import wrapper_fn +from covalent.executor.base import wrapper_fn from . import data_manager as datasvc from .runner_modules import executor_proxy diff --git a/covalent_dispatcher/_dal/utils/uri_filters.py b/covalent_dispatcher/_dal/utils/uri_filters.py index 13558eab54..50cc8c7271 100644 --- a/covalent_dispatcher/_dal/utils/uri_filters.py +++ b/covalent_dispatcher/_dal/utils/uri_filters.py @@ -46,12 +46,14 @@ class URIFilterPolicy(enum.Enum): def _srv_asset_uri( uri: str, attrs: dict, scope: AssetScope, dispatch_id: str, node_id: Optional[int], key: str ) -> str: - base_uri = SERVER_URL + f"/api/v1/assets/{dispatch_id}/{scope.value}" + base_uri = SERVER_URL + f"/api/v1/dispatch/{dispatch_id}" - if scope == AssetScope.DISPATCH or scope == AssetScope.LATTICE: - uri = base_uri + f"/{key}" + if scope == AssetScope.DISPATCH: + uri = f"{base_uri}/assets/{key}" + elif scope == AssetScope.LATTICE: + uri = f"{base_uri}/lattice/assets/{key}" else: - uri = base_uri + f"/{node_id}/{key}" + uri = f"{base_uri}/electron/{node_id}/assets/{key}" return uri diff --git a/covalent_dispatcher/_service/app.py b/covalent_dispatcher/_service/app.py index 99c44fa94c..a02f3e2f2c 100644 --- a/covalent_dispatcher/_service/app.py +++ b/covalent_dispatcher/_service/app.py @@ -24,10 +24,10 @@ import asyncio import json from contextlib import asynccontextmanager -from typing import Optional, Union +from typing import List, Optional, Union from uuid import UUID -from fastapi import APIRouter, FastAPI, HTTPException, Request +from fastapi import APIRouter, FastAPI, HTTPException, Query, Request from fastapi.responses import JSONResponse import covalent_dispatcher.entry_point as dispatcher @@ -35,18 +35,16 @@ from covalent._shared_files.schemas.result import ResultSchema from covalent._shared_files.util_classes import RESULT_STATUS from covalent_dispatcher._core import dispatcher as core_dispatcher -from covalent_dispatcher._core import runner_ng as core_runner from .._dal.exporters.result import export_result_manifest from .._dal.result import Result, get_result_object +from .._db.datastore import workflow_db from .._db.dispatchdb import DispatchDB from .heartbeat import Heartbeat from .models import ExportResponseSchema -app_log = logger.app_log -log_stack_info = logger.log_stack_info +# from covalent_dispatcher._core import runner_ng as core_runner -router: APIRouter = APIRouter() app_log = logger.app_log log_stack_info = logger.log_stack_info @@ -65,9 +63,9 @@ async def lifespan(app: FastAPI): _background_tasks.add(fut) fut.add_done_callback(_background_tasks.discard) - # Runner event queue and listener - core_runner._job_events = asyncio.Queue() - core_runner._job_event_listener = asyncio.create_task(core_runner._listen_for_job_events()) + # # Runner event queue and listener + # core_runner._job_events = asyncio.Queue() + # core_runner._job_event_listener = asyncio.create_task(core_runner._listen_for_job_events()) # Dispatcher event queue and listener core_dispatcher._global_status_queue = asyncio.Queue() @@ -77,8 +75,32 @@ async def lifespan(app: FastAPI): yield + # Cancel all scheduled and running dispatches + for status in [ + RESULT_STATUS.NEW_OBJECT, + RESULT_STATUS.RUNNING, + ]: + await cancel_all_with_status(status) + core_dispatcher._global_event_listener.cancel() - core_runner._job_event_listener.cancel() + # core_runner._job_event_listener.cancel() + + Heartbeat.stop() + + +async def cancel_all_with_status(status: RESULT_STATUS): + """Cancel all dispatches with the specified status.""" + + with workflow_db.session() as session: + records = Result.get_db_records( + session, + keys=["dispatch_id"], + equality_filters={"status": str(status)}, + membership_filters={}, + ) + for record in records: + dispatch_id = record.attrs["dispatch_id"] + await dispatcher.cancel_running_dispatch(dispatch_id) @router.post("/dispatch/submit") @@ -106,26 +128,24 @@ async def submit(request: Request) -> UUID: ) from e -@router.post("/dispatch/cancel") -async def cancel(request: Request) -> str: +@router.delete("/dispatch/{dispatch_id}") +async def cancel(dispatch_id: str, task_ids: List[int] = Query([])) -> str: """ Function to accept the cancel request of a dispatch. Args: - None + dispatch_id: ID of the dispatch + task_ids: (Query) Optional list of specific task ids to cancel. + An empty list will cause all tasks to be cancelled. Returns: Fast API Response object confirming that the dispatch has been cancelled. """ - data = await request.json() - - dispatch_id = data["dispatch_id"] - task_ids = data["task_ids"] - await dispatcher.cancel_running_dispatch(dispatch_id, task_ids) + print("DEBUG: task_ids", task_ids) if task_ids: return f"Cancelled tasks {task_ids} in dispatch {dispatch_id}." else: @@ -138,10 +158,19 @@ def db_path() -> str: return json.dumps(db_path) -@router.post("/dispatch/register") +@router.post("/dispatch", status_code=201) async def register( manifest: ResultSchema, parent_dispatch_id: Union[str, None] = None ) -> ResultSchema: + """Register a dispatch in the database. + + Args: + manifest: Declares all metadata and assets in the workflow + parent_dispatch_id: The parent dispatch id if registering a sublattice dispatch + + Returns: + The manifest with `dispatch_id` and remote URIs for each asset populated. + """ try: return await dispatcher.register_dispatch(manifest, parent_dispatch_id) except Exception as e: @@ -152,12 +181,23 @@ async def register( ) from e -@router.post("/dispatch/register/{dispatch_id}") +@router.post("/dispatch/{dispatch_id}/redispatch", status_code=201) async def register_redispatch( manifest: ResultSchema, dispatch_id: str, reuse_previous_results: bool = False, ): + """Register a redispatch in the database. + + Args: + manifest: Declares all metadata and assets in the workflow + dispatch_id: The original dispatch's id. + reuse_previous_results: Whether to try reusing the results of + previously completed electrons. + + Returns: + The manifest with `dispatch_id` and remote URIs for each asset populated. + """ try: return await dispatcher.register_redispatch( manifest, @@ -172,25 +212,43 @@ async def register_redispatch( ) from e -@router.put("/dispatch/start/{dispatch_id}") +@router.post("/dispatch/{dispatch_id}", status_code=202) async def start(dispatch_id: str): - try: - fut = asyncio.create_task(dispatcher.start_dispatch(dispatch_id)) - _background_tasks.add(fut) - fut.add_done_callback(_background_tasks.discard) + """Start a previously registered (re-)dispatch. - return dispatch_id - except Exception as e: - raise HTTPException( - status_code=400, - detail=f"Failed to start workflow: {e}", - ) from e + Args: + `dispatch_id`: The dispatch's unique id. + Returns: + `dispatch_id` + """ + fut = asyncio.create_task(dispatcher.start_dispatch(dispatch_id)) + _background_tasks.add(fut) + fut.add_done_callback(_background_tasks.discard) -@router.get("/dispatch/export/{dispatch_id}") + return dispatch_id + + +@router.get("/dispatch/{dispatch_id}") async def export_result( dispatch_id: str, wait: Optional[bool] = False, status_only: Optional[bool] = False ) -> ExportResponseSchema: + """Export all metadata about a registered dispatch + + Args: + `dispatch_id`: The dispatch's unique id. + + Returns: + { + id: `dispatch_id`, + status: status, + result_export: manifest for the result + } + + The manifest `result_export` has the same schema as that which is + submitted to `/register`. + + """ loop = asyncio.get_running_loop() return await loop.run_in_executor( None, diff --git a/covalent_dispatcher/_service/assets.py b/covalent_dispatcher/_service/assets.py index a34442c84b..e4840ab394 100644 --- a/covalent_dispatcher/_service/assets.py +++ b/covalent_dispatcher/_service/assets.py @@ -66,7 +66,7 @@ LRU_CACHE_SIZE = get_config("dispatcher.asset_cache_size") -@router.get("/assets/{dispatch_id}/node/{node_id}/{key}") +@router.get("/dispatch/{dispatch_id}/electron/{node_id}/assets/{key}") def get_node_asset( dispatch_id: str, node_id: int, @@ -74,6 +74,17 @@ def get_node_asset( representation: Union[AssetRepresentation, None] = None, Range: Union[str, None] = Header(default=None, regex=range_regex), ): + """Returns an asset for an electron. + + Args: + dispatch_id: The dispatch's unique id. + node_id: The id of the electron. + key: The name of the asset + representation: (optional) the representation ("string" or "pickle") of a `TransportableObject` + range: (optional) range request header + + If `representation` is specified, it will override the range request. + """ start_byte = 0 end_byte = -1 @@ -116,13 +127,23 @@ def get_node_asset( raise -@router.get("/assets/{dispatch_id}/dispatch/{key}") +@router.get("/dispatch/{dispatch_id}/assets/{key}") def get_dispatch_asset( dispatch_id: str, key: DispatchAssetKey, representation: Union[AssetRepresentation, None] = None, Range: Union[str, None] = Header(default=None, regex=range_regex), ): + """Returns a dynamic asset for a workflow + + Args: + dispatch_id: The dispatch's unique id. + key: The name of the asset + representation: (optional) the representation ("string" or "pickle") of a `TransportableObject` + range: (optional) range request header + + If `representation` is specified, it will override the range request. + """ start_byte = 0 end_byte = -1 @@ -162,13 +183,23 @@ def get_dispatch_asset( raise -@router.get("/assets/{dispatch_id}/lattice/{key}") +@router.get("/dispatch/{dispatch_id}/lattice/assets/{key}") def get_lattice_asset( dispatch_id: str, key: LatticeAssetKey, representation: Union[AssetRepresentation, None] = None, Range: Union[str, None] = Header(default=None, regex=range_regex), ): + """Returns a static asset for a workflow + + Args: + dispatch_id: The dispatch's unique id. + key: The name of the asset + representation: (optional) the representation ("string" or "pickle") of a `TransportableObject` + range: (optional) range request header + + If `representation` is specified, it will override the range request. + """ start_byte = 0 end_byte = -1 @@ -209,7 +240,7 @@ def get_lattice_asset( raise e -@router.post("/assets/{dispatch_id}/node/{node_id}/{key}") +@router.post("/dispatch/{dispatch_id}/electron/{node_id}/assets/{key}") def upload_node_asset( dispatch_id: str, node_id: int, @@ -218,6 +249,16 @@ def upload_node_asset( content_length: int = Header(), digest: Union[str, None] = Header(default=None, regex=digest_regex), ): + """Upload an electron asset. + + Args: + dispatch_id: The dispatch's unique id. + node_id: The electron id. + key: The name of the asset + asset_file: (body) The file to be uploaded + content_length: (header) + digest: (header) + """ app_log.debug(f"Requested asset {key} for node {dispatch_id}:{node_id}") try: @@ -243,7 +284,7 @@ def upload_node_asset( raise -@router.post("/assets/{dispatch_id}/dispatch/{key}") +@router.post("/dispatch/{dispatch_id}/assets/{key}") def upload_dispatch_asset( dispatch_id: str, key: DispatchAssetKey, @@ -251,6 +292,15 @@ def upload_dispatch_asset( content_length: int = Header(), digest: Union[str, None] = Header(default=None, regex=digest_regex), ): + """Upload a dispatch asset. + + Args: + dispatch_id: The dispatch's unique id. + key: The name of the asset + asset_file: (body) The file to be uploaded + content_length: (header) + digest: (header) + """ try: result_object = get_cached_result_object(dispatch_id) @@ -272,7 +322,7 @@ def upload_dispatch_asset( raise -@router.post("/assets/{dispatch_id}/lattice/{key}") +@router.post("/dispatch/{dispatch_id}/lattice/assets/{key}") def upload_lattice_asset( dispatch_id: str, key: LatticeAssetKey, @@ -280,6 +330,15 @@ def upload_lattice_asset( content_length: int = Header(), digest: Union[str, None] = Header(default=None, regex=digest_regex), ): + """Upload a lattice asset. + + Args: + dispatch_id: The dispatch's unique id. + key: The name of the asset + asset_file: (body) The file to be uploaded + content_length: (header) + digest: (header) + """ try: result_object = get_cached_result_object(dispatch_id) diff --git a/covalent_ui/api/v1/routes/routes.py b/covalent_ui/api/v1/routes/routes.py index d9b4de8f96..9ccbde8f90 100644 --- a/covalent_ui/api/v1/routes/routes.py +++ b/covalent_ui/api/v1/routes/routes.py @@ -22,7 +22,7 @@ from fastapi import APIRouter -from covalent_dispatcher._service import app, assets, runnersvc +from covalent_dispatcher._service import app, assets from covalent_dispatcher._triggers_app.app import router as tr_router from covalent_ui.api.v1.routes.end_points import ( electron_routes, @@ -46,4 +46,5 @@ routes.include_router(tr_router, prefix="/api", tags=["Triggers"]) routes.include_router(app.router, prefix="/api/v1", tags=["Dispatcher"]) routes.include_router(assets.router, prefix="/api/v1", tags=["Assets"]) -routes.include_router(runnersvc.router, prefix="/api/v1", tags=["Runner"]) +# This will be enabled in the next patch +# routes.include_router(runnersvc.router, prefix="/api/v1", tags=["Runner"]) diff --git a/tests/covalent_dispatcher_tests/_core/dispatcher_test.py b/tests/covalent_dispatcher_tests/_core/dispatcher_test.py index 634fd59321..e1d190a860 100644 --- a/tests/covalent_dispatcher_tests/_core/dispatcher_test.py +++ b/tests/covalent_dispatcher_tests/_core/dispatcher_test.py @@ -514,10 +514,11 @@ async def test_submit_initial_tasks(mocker): @pytest.mark.asyncio -async def test_submit_task_group(mocker): +async def test_submit_task_group_single(mocker): + """Test submitting a singleton task groups""" dispatch_id = "dispatch_1" gid = 2 - nodes = [4, 3, 2] + nodes = [2] mock_get_abs_input = mocker.patch( "covalent_dispatcher._core.dispatcher._get_abstract_task_inputs", @@ -554,15 +555,75 @@ async def get_electron_attrs(dispatch_id, node_id, keys): "covalent_dispatcher._core.dispatcher.datasvc.update_node_result", ) + # This will be removed in the next patch mock_run_abs_task = mocker.patch( - "covalent_dispatcher._core.dispatcher.runner_ng.run_abstract_task_group", + "covalent_dispatcher._core.dispatcher.runner.run_abstract_task", ) + # mock_run_abs_task = mocker.patch( + # "covalent_dispatcher._core.dispatcher.runner_ng.run_abstract_task_group", + # ) await _submit_task_group(dispatch_id, nodes, gid) mock_run_abs_task.assert_called() assert mock_get_abs_input.await_count == len(nodes) +# Temporary only because the current runner does not support +# nontrivial task groups. +@pytest.mark.asyncio +async def test_submit_task_group_multiple(mocker): + """Check that submitting multiple tasks errors out""" + dispatch_id = "dispatch_1" + gid = 2 + nodes = [4, 3, 2] + + mock_get_abs_input = mocker.patch( + "covalent_dispatcher._core.dispatcher._get_abstract_task_inputs", + return_value={"args": [], "kwargs": {}}, + ) + + mock_attrs = { + "name": "task", + "value": 5, + "executor": "local", + "executor_data": {}, + } + + mock_statuses = [ + {"status": Result.NEW_OBJ}, + {"status": Result.NEW_OBJ}, + {"status": Result.NEW_OBJ}, + ] + + async def get_electron_attrs(dispatch_id, node_id, keys): + return {key: mock_attrs[key] for key in keys} + + mocker.patch( + "covalent_dispatcher._core.dispatcher.datasvc.electron.get", + get_electron_attrs, + ) + + mocker.patch( + "covalent_dispatcher._core.dispatcher.datasvc.electron.get_bulk", + return_value=mock_statuses, + ) + + mocker.patch( + "covalent_dispatcher._core.dispatcher.datasvc.update_node_result", + ) + + # This will be removed in the next patch + mock_run_abs_task = mocker.patch( + "covalent_dispatcher._core.dispatcher.runner.run_abstract_task", + ) + # mock_run_abs_task = mocker.patch( + # "covalent_dispatcher._core.dispatcher.runner_ng.run_abstract_task_group", + # ) + + with pytest.raises(RuntimeError): + await _submit_task_group(dispatch_id, nodes, gid) + + @pytest.mark.asyncio async def test_submit_task_group_skips_reusable(mocker): """Check that submit_task_group skips reusable groups""" @@ -605,9 +666,13 @@ async def get_electron_attrs(dispatch_id, node_id, keys): "covalent_dispatcher._core.dispatcher.datasvc.update_node_result", ) + # Will be removed next patch mock_run_abs_task = mocker.patch( - "covalent_dispatcher._core.dispatcher.runner_ng.run_abstract_task_group", + "covalent_dispatcher._core.dispatcher.runner.run_abstract_task", ) + # mock_run_abs_task = mocker.patch( + # "covalent_dispatcher._core.dispatcher.runner_ng.run_abstract_task_group", + # ) await _submit_task_group(dispatch_id, nodes, gid) mock_run_abs_task.assert_not_called() @@ -641,9 +706,14 @@ async def get_electron_attrs(dispatch_id, node_id, keys): "covalent_dispatcher._core.dispatcher.datasvc.update_node_result", ) + # Will be removed next patch mock_run_abs_task = mocker.patch( - "covalent_dispatcher._core.dispatcher.runner_ng.run_abstract_task_group", + "covalent_dispatcher._core.dispatcher.runner.run_abstract_task", ) + # mock_run_abs_task = mocker.patch( + # "covalent_dispatcher._core.dispatcher.runner_ng.run_abstract_task_group", + # ) + await _submit_task_group(dispatch_id, [node_id], node_id) mock_run_abs_task.assert_not_called() @@ -693,7 +763,8 @@ async def test_cancel_dispatch(mocker): "covalent_dispatcher._core.dispatcher.jbmgr.set_cancel_requested" ) - mock_runner = mocker.patch("covalent_dispatcher._core.dispatcher.runner_ng") + mock_runner = mocker.patch("covalent_dispatcher._core.dispatcher.runner") + # mock_runner = mocker.patch("covalent_dispatcher._core.dispatcher.runner_ng") mock_runner.cancel_tasks = AsyncMock() res._initialize_nodes() @@ -759,7 +830,8 @@ async def test_cancel_dispatch_with_task_ids(mocker): "covalent_dispatcher._core.dispatcher.jbmgr.set_cancel_requested" ) - mock_runner = mocker.patch("covalent_dispatcher._core.dispatcher.runner_ng") + mock_runner = mocker.patch("covalent_dispatcher._core.dispatcher.runner") + # mock_runner = mocker.patch("covalent_dispatcher._core.dispatcher.runner_ng") mock_runner.cancel_tasks = AsyncMock() async def mock_get_nodes(dispatch_id): diff --git a/tests/covalent_dispatcher_tests/_service/app_test.py b/tests/covalent_dispatcher_tests/_service/app_test.py index 5c563a5bfc..3383149782 100644 --- a/tests/covalent_dispatcher_tests/_service/app_test.py +++ b/tests/covalent_dispatcher_tests/_service/app_test.py @@ -35,7 +35,7 @@ from covalent._dispatcher_plugins.local import LocalDispatcher from covalent._shared_files.util_classes import RESULT_STATUS from covalent_dispatcher._db.dispatchdb import DispatchDB -from covalent_dispatcher._service.app import _try_get_result_object +from covalent_dispatcher._service.app import _try_get_result_object, cancel_all_with_status from covalent_ui.app import fastapi_app as fast_app DISPATCH_ID = "f34671d1-48f2-41ce-89d9-9a8cb5c60e5d" @@ -114,6 +114,7 @@ async def test_submit(mocker, client): run_dispatcher_mock = mocker.patch( "covalent_dispatcher.entry_point.make_dispatch", return_value=DISPATCH_ID ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") response = client.post("/api/v1/dispatch/submit", data=mock_data) assert response.json() == DISPATCH_ID run_dispatcher_mock.assert_called_once_with(mock_data) @@ -124,6 +125,7 @@ async def test_submit_exception(mocker, client): """Test the submit endpoint.""" mock_data = json.dumps({}).encode("utf-8") mocker.patch("covalent_dispatcher.entry_point.make_dispatch", side_effect=Exception("mock")) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") response = client.post("/api/v1/dispatch/submit", data=mock_data) assert response.status_code == 400 assert response.json()["detail"] == "Failed to submit workflow: mock" @@ -134,8 +136,10 @@ def test_cancel_dispatch(mocker, app, client): Test cancelling dispatch """ mocker.patch("covalent_dispatcher.entry_point.cancel_running_dispatch") - response = client.post( - "/api/v1/dispatch/cancel", data=json.dumps({"dispatch_id": DISPATCH_ID, "task_ids": []}) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + response = client.delete( + f"/api/v1/dispatch/{DISPATCH_ID}", + params={"task_ids": []}, ) assert response.json() == f"Dispatch {DISPATCH_ID} cancelled." @@ -145,37 +149,26 @@ def test_cancel_tasks(mocker, app, client): Test cancelling tasks within a lattice after dispatch """ mocker.patch("covalent_dispatcher.entry_point.cancel_running_dispatch") - response = client.post( - "/api/v1/dispatch/cancel", - data=json.dumps({"dispatch_id": DISPATCH_ID, "task_ids": [0, 1]}), + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + response = client.delete( + f"/api/v1/dispatch/{DISPATCH_ID}", + params={"task_ids": [0, 1]}, ) assert response.json() == f"Cancelled tasks [0, 1] in dispatch {DISPATCH_ID}." -@pytest.mark.asyncio -async def test_cancel(mocker, client): - """Test the cancel endpoint.""" - cancel_running_dispatch_mock = mocker.patch( - "covalent_dispatcher.entry_point.cancel_running_dispatch", return_value=DISPATCH_ID - ) - response = client.post( - "/api/v1/dispatch/cancel", data=json.dumps({"dispatch_id": DISPATCH_ID, "task_ids": []}) - ) - assert response.json() == f"Dispatch {DISPATCH_ID} cancelled." - cancel_running_dispatch_mock.assert_called_once_with(DISPATCH_ID, []) - - @pytest.mark.asyncio async def test_cancel_exception(mocker, client): """Test the cancel endpoint.""" cancel_running_dispatch_mock = mocker.patch( "covalent_dispatcher.entry_point.cancel_running_dispatch", side_effect=Exception("mock") ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") with pytest.raises(Exception): - response = client.post( - "/api/v1/dispatch/cancel", - data=json.dumps({"dispatch_id": DISPATCH_ID, "task_ids": []}), + response = client.delete( + f"/api/v1/dispatch/{DISPATCH_ID}", + params={"task_ids": []}, ) assert response.status_code == 400 assert response.json()["detail"] == "Failed to cancel workflow: mock" @@ -195,7 +188,8 @@ def test_register(mocker, app, client, mock_manifest): mock_register_dispatch = mocker.patch( "covalent_dispatcher._service.app.dispatcher.register_dispatch", return_value=mock_manifest ) - resp = client.post("/api/v1/dispatch/register", data=mock_manifest.json()) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + resp = client.post("/api/v1/dispatch", data=mock_manifest.json()) assert resp.json() == json.loads(mock_manifest.json()) mock_register_dispatch.assert_awaited_with(mock_manifest, None) @@ -205,7 +199,8 @@ def test_register_exception(mocker, app, client, mock_manifest): mock_register_dispatch = mocker.patch( "covalent_dispatcher._service.app.dispatcher.register_dispatch", side_effect=RuntimeError() ) - resp = client.post("/api/v1/dispatch/register", data=mock_manifest.json()) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + resp = client.post("/api/v1/dispatch", data=mock_manifest.json()) assert resp.status_code == 400 @@ -213,8 +208,9 @@ def test_register_sublattice(mocker, app, client, mock_manifest): mock_register_dispatch = mocker.patch( "covalent_dispatcher._service.app.dispatcher.register_dispatch", return_value=mock_manifest ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") resp = client.post( - "/api/v1/dispatch/register", + "/api/v1/dispatch", data=mock_manifest.json(), params={"parent_dispatch_id": "parent_dispatch"}, ) @@ -229,7 +225,8 @@ def test_register_redispatch(mocker, app, client, mock_manifest): "covalent_dispatcher._service.app.dispatcher.register_redispatch", return_value=mock_manifest, ) - resp = client.post(f"/api/v1/dispatch/register/{dispatch_id}", data=mock_manifest.json()) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + resp = client.post(f"/api/v1/dispatch/{dispatch_id}/redispatch", data=mock_manifest.json()) mock_register_redispatch.assert_awaited_with(mock_manifest, dispatch_id, False) assert resp.json() == json.loads(mock_manifest.json()) @@ -240,8 +237,9 @@ def test_register_redispatch_reuse(mocker, app, client, mock_manifest): "covalent_dispatcher._service.app.dispatcher.register_redispatch", return_value=mock_manifest, ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") resp = client.post( - f"/api/v1/dispatch/register/{dispatch_id}", + f"/api/v1/dispatch/{dispatch_id}/redispatch", data=mock_manifest.json(), params={"reuse_previous_results": True}, ) @@ -255,7 +253,8 @@ def test_register_redispatch_exception(mocker, app, client, mock_manifest): "covalent_dispatcher._service.app.dispatcher.register_redispatch", side_effect=RuntimeError(), ) - resp = client.post(f"/api/v1/dispatch/register/{dispatch_id}", data=mock_manifest.json()) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + resp = client.post(f"/api/v1/dispatch/{dispatch_id}/redispatch", data=mock_manifest.json()) assert resp.status_code == 400 @@ -263,7 +262,8 @@ def test_start(mocker, app, client): dispatch_id = "test_start" mock_start = mocker.patch("covalent_dispatcher._service.app.dispatcher.start_dispatch") mock_create_task = mocker.patch("asyncio.create_task") - resp = client.put(f"/api/v1/dispatch/start/{dispatch_id}") + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + resp = client.post(f"/api/v1/dispatch/{dispatch_id}") assert resp.json() == dispatch_id @@ -277,7 +277,8 @@ def test_export_result_nowait(mocker, app, client, mock_manifest): mock_export = mocker.patch( "covalent_dispatcher._service.app.export_result_manifest", return_value=mock_manifest ) - resp = client.get(f"/api/v1/dispatch/export/{dispatch_id}") + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}") assert resp.status_code == 200 assert resp.json()["id"] == dispatch_id assert resp.json()["status"] == str(RESULT_STATUS.NEW_OBJECT) @@ -294,7 +295,8 @@ def test_export_result_wait_not_ready(mocker, app, client, mock_manifest): mock_export = mocker.patch( "covalent_dispatcher._service.app.export_result_manifest", return_value=mock_manifest ) - resp = client.get(f"/api/v1/dispatch/export/{dispatch_id}", params={"wait": True}) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}", params={"wait": True}) assert resp.status_code == 503 @@ -303,7 +305,8 @@ def test_export_result_bad_dispatch_id(mocker, app, client, mock_manifest): mock_result_object = MagicMock() mock_result_object.get_value = MagicMock(return_value=str(RESULT_STATUS.NEW_OBJECT)) mocker.patch("covalent_dispatcher._service.app._try_get_result_object", return_value=None) - resp = client.get(f"/api/v1/dispatch/export/{dispatch_id}") + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}") assert resp.status_code == 404 @@ -313,6 +316,7 @@ def test_try_get_result_object(mocker, app, client, mock_manifest): mocker.patch( "covalent_dispatcher._service.app.get_result_object", return_value=mock_result_object ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") assert _try_get_result_object(dispatch_id) == mock_result_object @@ -320,4 +324,21 @@ def test_try_get_result_object_not_found(mocker, app, client, mock_manifest): dispatch_id = "test_try_get_result_object" mock_result_object = MagicMock() mocker.patch("covalent_dispatcher._service.app.get_result_object", side_effect=KeyError()) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") assert _try_get_result_object(dispatch_id) is None + + +@pytest.mark.asyncio +async def test_cancel_all_with_status(mocker, test_db): + mock_rec = MagicMock() + mock_rec.attrs = {"dispatch_id": "mock_dispatch"} + + mocker.patch("covalent_dispatcher._service.app.workflow_db", test_db) + mocker.patch("covalent_dispatcher._dal.result.Result.get_db_records", return_value=[mock_rec]) + mock_cancel = mocker.patch( + "covalent_dispatcher._service.app.dispatcher.cancel_running_dispatch" + ) + + await cancel_all_with_status(RESULT_STATUS.RUNNING) + + mock_cancel.assert_awaited_with("mock_dispatch") diff --git a/tests/covalent_dispatcher_tests/_service/assets_test.py b/tests/covalent_dispatcher_tests/_service/assets_test.py index 4d3a16c2da..8590d2c980 100644 --- a/tests/covalent_dispatcher_tests/_service/assets_test.py +++ b/tests/covalent_dispatcher_tests/_service/assets_test.py @@ -133,8 +133,9 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in mock_generate_file_slice = mocker.patch( "covalent_dispatcher._service.assets._generate_file_slice", mock_generator ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") - resp = client.get(f"/api/v1/assets/{dispatch_id}/node/{node_id}/{key}") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/electron/{node_id}/assets/{key}") assert resp.text == "Hi" assert (INTERNAL_URI, 0, -1, 65536) == mock_generator.calls[0] @@ -173,8 +174,11 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in ) headers = {"Range": "bytes=0-6"} + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") - resp = client.get(f"/api/v1/assets/{dispatch_id}/node/{node_id}/{key}", headers=headers) + resp = client.get( + f"/api/v1/dispatch/{dispatch_id}/electron/{node_id}/assets/{key}", headers=headers + ) assert resp.text == test_str[0:6] assert (INTERNAL_URI, 0, 6, 65536) == mock_generator.calls[0] @@ -222,8 +226,11 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in ) params = {"representation": rep} + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") - resp = client.get(f"/api/v1/assets/{dispatch_id}/node/{node_id}/{key}", params=params) + resp = client.get( + f"/api/v1/dispatch/{dispatch_id}/electron/{node_id}/assets/{key}", params=params + ) assert resp.text == test_str[start_byte:end_byte] assert (INTERNAL_URI, start_byte, end_byte, 65536) == mock_generator.calls[0] @@ -236,12 +243,12 @@ def test_get_node_asset_bad_dispatch_id(mocker, client): key = "output" node_id = 0 dispatch_id = "test_get_node_asset" - + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") mocker.patch( "covalent_dispatcher._service.assets.get_cached_result_object", side_effect=HTTPException(status_code=400), ) - resp = client.get(f"/api/v1/assets/{dispatch_id}/node/{node_id}/{key}") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/electron/{node_id}/assets/{key}") assert resp.status_code == 400 @@ -270,8 +277,9 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in mock_generate_file_slice = mocker.patch( "covalent_dispatcher._service.assets._generate_file_slice", mock_generator ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") - resp = client.get(f"/api/v1/assets/{dispatch_id}/lattice/{key}") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/lattice/assets/{key}") assert resp.text == "Hi" assert (INTERNAL_URI, 0, -1, 65536) == mock_generator.calls[0] @@ -307,9 +315,10 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in mock_generate_file_slice = mocker.patch( "covalent_dispatcher._service.assets._generate_file_slice", mock_generator ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") headers = {"Range": "bytes=0-6"} - resp = client.get(f"/api/v1/assets/{dispatch_id}/lattice/{key}", headers=headers) + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/lattice/assets/{key}", headers=headers) assert resp.text == test_str[0:6] assert (INTERNAL_URI, 0, 6, 65536) == mock_generator.calls[0] @@ -354,10 +363,11 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in mocker.patch( "covalent_dispatcher._service.assets._get_tobj_pickle_offsets", return_value=(6, 12) ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") params = {"representation": rep} - resp = client.get(f"/api/v1/assets/{dispatch_id}/lattice/{key}", params=params) + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/lattice/assets/{key}", params=params) assert resp.text == test_str[start_byte:end_byte] assert (INTERNAL_URI, start_byte, end_byte, 65536) == mock_generator.calls[0] @@ -375,8 +385,9 @@ def test_get_lattice_asset_bad_dispatch_id(mocker, client): "covalent_dispatcher._service.assets.get_cached_result_object", side_effect=HTTPException(status_code=400), ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") - resp = client.get(f"/api/v1/assets/{dispatch_id}/lattice/{key}") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/lattice/assets/{key}") assert resp.status_code == 400 @@ -405,8 +416,9 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in mock_generate_file_slice = mocker.patch( "covalent_dispatcher._service.assets._generate_file_slice", mock_generator ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") - resp = client.get(f"/api/v1/assets/{dispatch_id}/dispatch/{key}") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/assets/{key}") assert resp.text == "Hi" assert (INTERNAL_URI, 0, -1, 65536) == mock_generator.calls[0] @@ -439,12 +451,13 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in mocker.patch( "covalent_dispatcher._service.assets.get_result_object", return_value=mock_result_object ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") mock_generate_file_slice = mocker.patch( "covalent_dispatcher._service.assets._generate_file_slice", mock_generator ) headers = {"Range": "bytes=0-6"} - resp = client.get(f"/api/v1/assets/{dispatch_id}/dispatch/{key}", headers=headers) + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/assets/{key}", headers=headers) assert resp.text == test_str[0:6] assert (INTERNAL_URI, 0, 6, 65536) == mock_generator.calls[0] @@ -489,10 +502,11 @@ def __call__(self, file_url: str, start_byte: int, end_byte: int, chunk_size: in mocker.patch( "covalent_dispatcher._service.assets._get_tobj_pickle_offsets", return_value=(6, 12) ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") params = {"representation": rep} - resp = client.get(f"/api/v1/assets/{dispatch_id}/dispatch/{key}", params=params) + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/assets/{key}", params=params) assert resp.text == test_str[start_byte:end_byte] assert (INTERNAL_URI, start_byte, end_byte, 65536) == mock_generator.calls[0] @@ -510,8 +524,9 @@ def test_get_dispatch_asset_bad_dispatch_id(mocker, client): "covalent_dispatcher._service.assets.get_cached_result_object", side_effect=HTTPException(status_code=400), ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") - resp = client.get(f"/api/v1/assets/{dispatch_id}/dispatch/{key}") + resp = client.get(f"/api/v1/dispatch/{dispatch_id}/assets/{key}") assert resp.status_code == 400 @@ -530,6 +545,7 @@ def test_post_node_asset(test_db, mocker, client, mock_result_object): ) mock_copy = mocker.patch("covalent_dispatcher._service.assets._copy_file_obj") + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") with tempfile.NamedTemporaryFile("w") as writer: writer.write(f"{dispatch_id}") @@ -538,7 +554,9 @@ def test_post_node_asset(test_db, mocker, client, mock_result_object): files = {"asset_file": open(writer.name, "rb")} headers = {"Digest": "sha=0af"} resp = client.post( - f"/api/v1/assets/{dispatch_id}/node/{node_id}/{key}", files=files, headers=headers + f"/api/v1/dispatch/{dispatch_id}/electron/{node_id}/assets/{key}", + files=files, + headers=headers, ) mock_node = mock_result_object.lattice.transport_graph.get_node(node_id) mock_node.update_assets.assert_called() @@ -559,13 +577,16 @@ def test_post_node_asset_bad_dispatch_id(mocker, client): "covalent_dispatcher._service.assets.get_cached_result_object", side_effect=HTTPException(status_code=400), ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") with tempfile.NamedTemporaryFile("w") as writer: writer.write(f"{dispatch_id}") writer.flush() files = {"asset_file": open(writer.name, "rb")} - resp = client.post(f"/api/v1/assets/{dispatch_id}/node/{node_id}/{key}", files=files) + resp = client.post( + f"/api/v1/dispatch/{dispatch_id}/electron/{node_id}/assets/{key}", files=files + ) assert resp.status_code == 400 @@ -581,6 +602,7 @@ def test_post_lattice_asset(mocker, client, test_db, mock_result_object): mocker.patch( "covalent_dispatcher._service.assets.get_result_object", return_value=mock_result_object ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") mock_copy = mocker.patch("covalent_dispatcher._service.assets._copy_file_obj") @@ -589,7 +611,7 @@ def test_post_lattice_asset(mocker, client, test_db, mock_result_object): writer.flush() files = {"asset_file": open(writer.name, "rb")} - resp = client.post(f"/api/v1/assets/{dispatch_id}/lattice/{key}", files=files) + resp = client.post(f"/api/v1/dispatch/{dispatch_id}/lattice/assets/{key}", files=files) mock_lattice = mock_result_object.lattice mock_lattice.update_assets.assert_called() assert resp.status_code == 200 @@ -608,13 +630,14 @@ def test_post_lattice_asset_bad_dispatch_id(mocker, client): "covalent_dispatcher._service.assets.get_cached_result_object", side_effect=HTTPException(status_code=400), ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") with tempfile.NamedTemporaryFile("w") as writer: writer.write(f"{dispatch_id}") writer.flush() files = {"asset_file": open(writer.name, "rb")} - resp = client.post(f"/api/v1/assets/{dispatch_id}/lattice/{key}", files=files) + resp = client.post(f"/api/v1/dispatch/{dispatch_id}/lattice/assets/{key}", files=files) assert resp.status_code == 400 @@ -632,13 +655,14 @@ def test_post_dispatch_asset(mocker, client, test_db, mock_result_object): ) mock_copy = mocker.patch("covalent_dispatcher._service.assets._copy_file_obj") + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") with tempfile.NamedTemporaryFile("w") as writer: writer.write(f"{dispatch_id}") writer.flush() files = {"asset_file": open(writer.name, "rb")} - resp = client.post(f"/api/v1/assets/{dispatch_id}/dispatch/{key}", files=files) + resp = client.post(f"/api/v1/dispatch/{dispatch_id}/assets/{key}", files=files) mock_result_object.update_assets.assert_called() assert resp.status_code == 200 @@ -656,13 +680,14 @@ def test_post_dispatch_asset_bad_dispatch_id(mocker, client): "covalent_dispatcher._service.assets.get_cached_result_object", side_effect=HTTPException(status_code=400), ) + mocker.patch("covalent_dispatcher._service.app.cancel_all_with_status") with tempfile.NamedTemporaryFile("w") as writer: writer.write(f"{dispatch_id}") writer.flush() files = {"asset_file": open(writer.name, "rb")} - resp = client.post(f"/api/v1/assets/{dispatch_id}/dispatch/{key}", files=files) + resp = client.post(f"/api/v1/dispatch/{dispatch_id}/assets/{key}", files=files) assert resp.status_code == 400 diff --git a/tests/covalent_tests/dispatcher_plugins/local_test.py b/tests/covalent_tests/dispatcher_plugins/local_test.py index dcf196eb84..0f909ab6c1 100644 --- a/tests/covalent_tests/dispatcher_plugins/local_test.py +++ b/tests/covalent_tests/dispatcher_plugins/local_test.py @@ -58,7 +58,7 @@ def test_dispatch_when_no_server_is_running(mocker): # the test suite is using another port, thus, with the dummy address below # the covalent server is not running in some sense. dummy_dispatcher_addr = "http://localhost:12345" - endpoint = "/api/v1/dispatch/register" + endpoint = "/api/v1/dispatch" url = dummy_dispatcher_addr + endpoint message = f"The Covalent server cannot be reached at {url}. Local servers can be started using `covalent start` in the terminal. If you are using a remote Covalent server, contact your systems administrator to report an outage." @@ -238,18 +238,18 @@ def test_dispatcher_start(mocker): r.url = "http://dummy" r.reason = "dummy reason" - mocker.patch("covalent._api.apiclient.requests.Session.put", return_value=r) + mocker.patch("covalent._api.apiclient.requests.Session.post", return_value=r) with pytest.raises(HTTPError, match="404 Client Error: dummy reason for url: http://dummy"): LocalDispatcher.start(dispatch_id) # test when api doesn't raise an implicit error r = Response() - r.status_code = 200 + r.status_code = 202 r.url = "http://dummy" r._content = dispatch_id.encode("utf-8") - mocker.patch("covalent._api.apiclient.requests.Session.put", return_value=r) + mocker.patch("covalent._api.apiclient.requests.Session.post", return_value=r) assert LocalDispatcher.start(dispatch_id) == dispatch_id @@ -348,7 +348,7 @@ def workflow(a, b): manifest.metadata.dispatch_id = dispatch_id r = Response() - r.status_code = 200 + r.status_code = 201 r.json = MagicMock(return_value=manifest.dict()) mocker.patch("covalent._api.apiclient.requests.Session.post", return_value=r) @@ -382,7 +382,7 @@ def workflow(a, b): manifest.metadata.dispatch_id = dispatch_id r = Response() - r.status_code = 200 + r.status_code = 201 r.json = MagicMock(return_value=manifest.dict()) mocker.patch("covalent._api.apiclient.requests.Session.post", return_value=r) @@ -417,9 +417,11 @@ def workflow(a, b): # Populate the lattice asset schemas with dummy URLs for key, asset in manifest.lattice.assets: num_assets += 1 - asset.remote_uri = f"http://localhost:48008/api/v1/assets/{dispatch_id}/lattice/dummy" + asset.remote_uri = ( + f"http://localhost:48008/api/v1/dispatch/{dispatch_id}/lattice/assets/dummy" + ) - endpoint = f"/api/v1/assets/{dispatch_id}/lattice/dummy" + endpoint = f"/api/v1/dispatch/{dispatch_id}/lattice/assets/dummy" r = Response() r.status_code = 200 mock_post = mocker.patch("covalent._api.apiclient.requests.Session.post", return_value=r) diff --git a/tests/covalent_tests/results_manager_tests/results_manager_test.py b/tests/covalent_tests/results_manager_tests/results_manager_test.py index 176510af57..7c08547835 100644 --- a/tests/covalent_tests/results_manager_tests/results_manager_test.py +++ b/tests/covalent_tests/results_manager_tests/results_manager_test.py @@ -85,27 +85,27 @@ def workflow(x, y): def test_cancel_with_single_task_id(mocker): - mock_request_post = mocker.patch( - "covalent._api.apiclient.requests.Session.post", + mock_request_delete = mocker.patch( + "covalent._api.apiclient.requests.Session.delete", ) cancel(dispatch_id="dispatch", task_ids=1) - mock_request_post.assert_called_once() - mock_request_post.return_value.raise_for_status.assert_called_once() + mock_request_delete.assert_called_once() + mock_request_delete.return_value.raise_for_status.assert_called_once() def test_cancel_with_multiple_task_ids(mocker): mock_task_ids = [0, 1] - mock_request_post = mocker.patch( - "covalent._api.apiclient.requests.Session.post", + mock_request_delete = mocker.patch( + "covalent._api.apiclient.requests.Session.delete", ) cancel(dispatch_id="dispatch", task_ids=[1, 2, 3]) - mock_request_post.assert_called_once() - mock_request_post.return_value.raise_for_status.assert_called_once() + mock_request_delete.assert_called_once() + mock_request_delete.return_value.raise_for_status.assert_called_once() def test_result_export(mocker): @@ -128,7 +128,7 @@ def test_result_export(mocker): # "covalent._results_manager.results_manager.CovalentAPIClient", return_value=mock_client # ) - endpoint = f"/api/v1/dispatch/export/{dispatch_id}" + endpoint = f"/api/v1/dispatch/{dispatch_id}" assert mock_body == _get_result_export_from_dispatcher( dispatch_id, wait=False, status_only=True ) @@ -285,7 +285,7 @@ def test_get_status_only(mocker): def test_download_asset(mocker): dispatch_id = "test_download_asset" - remote_uri = f"http://localhost:48008/api/v1/assets/dispatch/{dispatch_id}/result" + remote_uri = f"http://localhost:48008/api/v1/dispatch/{dispatch_id}/assets/result" mock_client = MagicMock() mock_response = MagicMock() mock_response.status_code = 200