diff --git a/CHANGELOG.md b/CHANGELOG.md index 31985d52b..d00d693db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Removed strict version pins on `lmdbm`, `mpire`, `orjson`, and `pennylane` - Changed license to Apache +### Added + +- Documentation and test cases for database triggers. + ### Docs - Added federated learning showcase code diff --git a/covalent/triggers/__init__.py b/covalent/triggers/__init__.py index 130b609ee..311157ed4 100644 --- a/covalent/triggers/__init__.py +++ b/covalent/triggers/__init__.py @@ -15,6 +15,7 @@ # limitations under the License. from .base import BaseTrigger # nopycln: import +from .database_trigger import DatabaseTrigger # nopycln: import from .dir_trigger import DirTrigger # nopycln: import from .sqlite_trigger import SQLiteTrigger # nopycln: import from .time_trigger import TimeTrigger # nopycln: import diff --git a/covalent/triggers/database_trigger.py b/covalent/triggers/database_trigger.py new file mode 100644 index 000000000..d2a53402e --- /dev/null +++ b/covalent/triggers/database_trigger.py @@ -0,0 +1,133 @@ +# Copyright 2023 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the Apache License 2.0 (the "License"). A copy of the +# License may be obtained with this software package or at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Use of this file is prohibited except in compliance with the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from functools import partial +from threading import Event +from typing import List + +from sqlalchemy import create_engine +from sqlalchemy.orm import Session + +from covalent._shared_files import logger + +from .base import BaseTrigger + +app_log = logger.app_log +log_stack_info = logger.log_stack_info + + +class DatabaseTrigger(BaseTrigger): + """ + Database trigger which can read for changes in a database + and trigger workflows based on record changes. + + Args: + db_path: Connection string for the database + table_name: Name of the table to observe + poll_interval: Time in seconds to wait for before reading the database again + where_clauses: List of "WHERE" conditions, e.g. ["id > 2", "status = pending"], to check when + polling the database + trigger_after_n: Number of times the event must happen after which the workflow will be triggered. + e.g value of 2 means workflow will be triggered once the event has occurred twice. + lattice_dispatch_id: Lattice dispatch id of the workflow to be triggered + dispatcher_addr: Address of the dispatcher server + triggers_server_addr: Address of the triggers server + + Attributes: + self.db_path: Connection string for the database + self.table_name: Name of the table to observe + self.poll_interval: Time in seconds to wait for before reading the database again + self.where_clauses: List of "WHERE" conditions, e.g. ["id > 2", "status = pending"], to check when + polling the database + self.trigger_after_n: Number of times the event must happen after which the workflow will be triggered. + e.g value of 2 means workflow will be triggered once the event has occurred twice. + self.stop_flag: Thread safe flag used to check whether the stop condition has been met + + """ + + def __init__( + self, + db_path: str, + table_name: str, + poll_interval: int = 1, + where_clauses: List[str] = None, + trigger_after_n: int = 1, + lattice_dispatch_id: str = None, + dispatcher_addr: str = None, + triggers_server_addr: str = None, + ): + super().__init__(lattice_dispatch_id, dispatcher_addr, triggers_server_addr) + + self.db_path = db_path + self.table_name = table_name + self.poll_interval = poll_interval + + self.where_clauses = where_clauses + + self.trigger_after_n = trigger_after_n + + self.stop_flag = None + + def observe(self) -> None: + """ + Keep performing the trigger action as long as + where conditions are met or until stop has being called + """ + + app_log.debug("Inside DatabaseTrigger's observe") + event_count = 0 + + try: + self.engine = create_engine(self.db_path) + + with Session(self.engine) as db: + sql_poll_cmd = f"SELECT * FROM {self.table_name}" + + if self.where_clauses: + sql_poll_cmd += " WHERE " + sql_poll_cmd += " AND ".join(list(self.where_clauses)) + sql_poll_cmd += ";" + + execute_cmd = partial(db.execute, sql_poll_cmd) + app_log.debug(f"Poll command: {sql_poll_cmd}") + + self.stop_flag = Event() + while not self.stop_flag.is_set(): + # Read the DB with specified command + try: + app_log.debug("About to execute...") + if rows := execute_cmd().all(): + event_count += 1 + if event_count == self.trigger_after_n: + app_log.debug("Invoking trigger") + self.trigger() + event_count = 0 + + except Exception: + pass + + time.sleep(self.poll_interval) + except Exception as e: + app_log.debug("Failed to observe:") + raise + + def stop(self) -> None: + """ + Stop the running `self.observe()` method by setting the `self.stop_flag` flag. + """ + + self.stop_flag.set() diff --git a/covalent/triggers/trigger_loader.py b/covalent/triggers/trigger_loader.py index 0ae88a16b..54a0ce528 100644 --- a/covalent/triggers/trigger_loader.py +++ b/covalent/triggers/trigger_loader.py @@ -15,13 +15,14 @@ # limitations under the License. -from covalent.triggers import BaseTrigger, DirTrigger, SQLiteTrigger, TimeTrigger +from covalent.triggers import BaseTrigger, DatabaseTrigger, DirTrigger, SQLiteTrigger, TimeTrigger class TriggerLoader: def __init__(self): self.available_triggers = { BaseTrigger.__name__: BaseTrigger, + DatabaseTrigger.__name__: DatabaseTrigger, DirTrigger.__name__: DirTrigger, TimeTrigger.__name__: TimeTrigger, SQLiteTrigger.__name__: SQLiteTrigger, diff --git a/covalent_dispatcher/_triggers_app/__init__.py b/covalent_dispatcher/_triggers_app/__init__.py index 435979579..a5b66619e 100644 --- a/covalent_dispatcher/_triggers_app/__init__.py +++ b/covalent_dispatcher/_triggers_app/__init__.py @@ -16,3 +16,4 @@ from .app import router as triggers_router # nopycln: import from .app import triggers_only_app # nopycln: import +from .app import trigger_only_router diff --git a/covalent_dispatcher/_triggers_app/app.py b/covalent_dispatcher/_triggers_app/app.py index d4d92b0a6..4a4c8546d 100644 --- a/covalent_dispatcher/_triggers_app/app.py +++ b/covalent_dispatcher/_triggers_app/app.py @@ -33,6 +33,7 @@ log_stack_info = logger.log_stack_info router = APIRouter() +trigger_only_router = APIRouter() triggers_only_app = FastAPI() active_triggers = {} @@ -76,6 +77,11 @@ def get_threadpool(): return ThreadPoolExecutor() +@trigger_only_router.get("/triggers/healthcheck") +async def healthcheck(request: Request): + return {"status": "ok"} + + @router.get("/triggers/status") def trigger_server_status(request: Request): if disable_triggers: @@ -135,3 +141,4 @@ async def stop_observe(request: Request): triggers_only_app.include_router(router, prefix="/api", tags=["Triggers"]) +triggers_only_app.include_router(trigger_only_router, prefix="/api", tags=["Triggers"]) diff --git a/covalent_ui/api/v1/data_layer/summary_dal.py b/covalent_ui/api/v1/data_layer/summary_dal.py index 175a1aea9..892db0406 100644 --- a/covalent_ui/api/v1/data_layer/summary_dal.py +++ b/covalent_ui/api/v1/data_layer/summary_dal.py @@ -233,7 +233,9 @@ def get_summary_overview(self) -> Lattice: latest_running_task_status=last_ran_job_status[0] if last_ran_job_status is not None else None, - total_dispatcher_duration=run_time[0] if run_time is not None else 0, + total_dispatcher_duration=int(run_time[0]) + if run_time is not None and run_time[0] is not None + else 0, total_jobs_failed=total_failed[0], total_jobs_cancelled=total_jobs_cancelled[0], total_jobs_new_object=total_jobs_new_object[0], diff --git a/covalent_ui/api/v1/models/dispatch_model.py b/covalent_ui/api/v1/models/dispatch_model.py index 3dc387399..1084ab01b 100644 --- a/covalent_ui/api/v1/models/dispatch_model.py +++ b/covalent_ui/api/v1/models/dispatch_model.py @@ -43,7 +43,7 @@ class DispatchModule(BaseModel): dispatch_id: str lattice_name: str - runtime: Optional[Union[int, None]] + runtime: Optional[Union[int, float, None]] total_electrons: Optional[Union[int, None]] total_electrons_completed: Optional[Union[int, None]] started_at: Optional[Union[datetime, None]] diff --git a/covalent_ui/api/v1/models/electrons_model.py b/covalent_ui/api/v1/models/electrons_model.py index 1138f492f..c9692e512 100644 --- a/covalent_ui/api/v1/models/electrons_model.py +++ b/covalent_ui/api/v1/models/electrons_model.py @@ -60,7 +60,7 @@ class ElectronResponse(BaseModel): status: Union[str, None] = None started_at: Union[datetime, None] = None ended_at: Union[datetime, None] = None - runtime: Union[int, None] = None + runtime: Union[int, float, None] = None description: Union[str, None] = None qelectron_data_exists: bool = False qelectron: Union[dict, None] = None diff --git a/covalent_ui/api/v1/models/lattices_model.py b/covalent_ui/api/v1/models/lattices_model.py index f7a6e3395..3a3c9fae0 100644 --- a/covalent_ui/api/v1/models/lattices_model.py +++ b/covalent_ui/api/v1/models/lattices_model.py @@ -49,7 +49,7 @@ class LatticeDetailResponse(BaseModel): ended_at: Union[datetime, None] = None directory: Union[str, None] = None description: Union[str, None] = None - runtime: Union[int, None] = None + runtime: Union[int, float, None] = None updated_at: Union[datetime, None] = None diff --git a/covalent_ui/app.py b/covalent_ui/app.py index 0ec52ed14..aa2c830a5 100644 --- a/covalent_ui/app.py +++ b/covalent_ui/app.py @@ -58,7 +58,6 @@ def get_home(request: Request, rest_of_path: str): ) fastapi_app.mount("/", socketio_app) - if __name__ == "__main__": ap = argparse.ArgumentParser() @@ -116,7 +115,10 @@ def get_home(request: Request, rest_of_path: str): app_name = "app:fastapi_app" if args.triggers_only: + from covalent_dispatcher._triggers_app import trigger_only_router # nopycln: import + app_name = "app:triggers_only_app" + fastapi_app.include_router(trigger_only_router, prefix="/api", tags=["Triggers"]) elif args.no_triggers: import covalent_dispatcher._triggers_app.app as tr_app diff --git a/doc/source/api/triggers.rst b/doc/source/api/triggers.rst index 90218f922..145891811 100644 --- a/doc/source/api/triggers.rst +++ b/doc/source/api/triggers.rst @@ -14,3 +14,5 @@ Examples - :doc:`Add a timed trigger to a workflow <../../how_to/execution/trigger_time>` - :doc:`Add a directory trigger to a workflow <../../how_to/execution/trigger_dir>` +- :doc:`Add a sqlite trigger to a workflow <../../how_to/execution/trigger_sqlite>` +- :doc:`Add a database trigger to a workflow <../../how_to/execution/trigger_database>` diff --git a/doc/source/features/triggers.md b/doc/source/features/triggers.md index 40dd95d1e..242b7635b 100644 --- a/doc/source/features/triggers.md +++ b/doc/source/features/triggers.md @@ -201,10 +201,24 @@ def my_workflow(): ``` + +4. `DatabaseTrigger`: This trigger monitors the database for changes and performs the trigger action when changes occur. It is helpful for automating tasks in response to database updates. For example: + +```{code-block} python +from covalent.triggers import DatabaseTrigger +import covalent as ct +database_trigger = DatabaseTrigger(db_path="db path",table_name='table name') +@ct.lattice(triggers=database_trigger) +def my_workflow(): + ... +``` + These triggers can be easily integrated into your Covalent workflows to automate various tasks based on the desired conditions. ## Trigger How-to Guides For further examples on how to use triggers, check out the Trigger how to guides: -- {doc}`How to add a directory trigger to a lattice <../how_to/coding/dir_trigger>` -- {doc}`How to add a time trigger to a lattice <../how_to/coding/time_trigger>` +- {doc}`How to add a directory trigger to a lattice <../../how_to/execution/trigger_dir>` +- {doc}`How to add a time trigger to a lattice <../../how_to/execution/trigger_time>` +- {doc}`How to add a sqlite trigger to a lattice <../../how_to/execution/trigger_sqlite>` +- {doc}`How to add a database trigger to a lattice <../../how_to/execution/trigger_database>` diff --git a/doc/source/how_to/execution/trigger_database.ipynb b/doc/source/how_to/execution/trigger_database.ipynb new file mode 100644 index 000000000..bce8da792 --- /dev/null +++ b/doc/source/how_to/execution/trigger_database.ipynb @@ -0,0 +1,309 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Adding a Database Trigger to a Lattice\n", + "\n", + "This example illustrates how to use `covalent.trigger.DatabaseTrigger` to trigger the workflow dispatches automatically after the successful execution of table reads with the conditions for N number of times.\n", + "\n", + "## Prerequisites\n", + " \n", + "1. Install the recommended SQL drivers that support SQLAlchemy. \n", + "2. Create an environment variable named `COVALENT_DATABASE_URL` and set the desired database file or URL. For the PostgreSQL instance, the database connection URL will be similar to the below code snippet,\n", + "```\n", + " export COVALENT_DATABASE_URL=postgresql+pg8000://:@:/\n", + "```\n", + "3. To migrate tables, use `covalent db migrate` to create the required tables in the mentioned database.\n", + "4. Then start covalent using `covalent start`. Now, the covalent server points to the new database.\n", + "5. Import the Covalent and the trigger." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import covalent as ct\n", + "from covalent.triggers import DatabaseTrigger" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Procedure\n", + "\n", + "1. Create a new table `test_db_trigger`. " + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2023-10-04 08:34:30,374 INFO sqlalchemy.engine.Engine select pg_catalog.version()\n", + "2023-10-04 08:34:30,377 INFO sqlalchemy.engine.Engine [raw sql] ()\n", + "2023-10-04 08:34:30,385 INFO sqlalchemy.engine.Engine select current_schema()\n", + "2023-10-04 08:34:30,389 INFO sqlalchemy.engine.Engine [raw sql] ()\n", + "2023-10-04 08:34:30,391 INFO sqlalchemy.engine.Engine show standard_conforming_strings\n", + "2023-10-04 08:34:30,392 INFO sqlalchemy.engine.Engine [raw sql] ()\n", + "2023-10-04 08:34:30,397 INFO sqlalchemy.engine.Engine BEGIN (implicit)\n", + "2023-10-04 08:34:30,405 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s\n", + "2023-10-04 08:34:30,406 INFO sqlalchemy.engine.Engine [generated in 0.00120s] ('test_db_trigger',)\n", + "2023-10-04 08:34:30,410 INFO sqlalchemy.engine.Engine \n", + "CREATE TABLE test_db_trigger (\n", + "\ttrigger_at TIMESTAMP WITHOUT TIME ZONE NOT NULL, \n", + "\tPRIMARY KEY (trigger_at)\n", + ")\n", + "\n", + "\n", + "2023-10-04 08:34:30,411 INFO sqlalchemy.engine.Engine [no key 0.00107s] ()\n", + "2023-10-04 08:34:30,448 INFO sqlalchemy.engine.Engine COMMIT\n" + ] + } + ], + "source": [ + "\n", + "db_path = \"postgresql+pg8000://postgres:sam@localhost:5432/aqdb\"\n", + "table_name = 'test_db_trigger'\n", + "\n", + "#create table\n", + "\n", + "from sqlalchemy import Table, Column, MetaData, DateTime, create_engine\n", + "meta = MetaData()\n", + "engine = create_engine(db_path, echo=True)\n", + "test_db_trigger = Table(\n", + "table_name, meta,\n", + "Column('trigger_at', DateTime, primary_key = True),\n", + ")\n", + "meta.create_all(engine)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "2. Load sample data into the newly created table" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2023-10-04 08:34:33,407 INFO sqlalchemy.engine.Engine INSERT INTO test_db_trigger (trigger_at) VALUES (%s)\n", + "2023-10-04 08:34:33,409 INFO sqlalchemy.engine.Engine [generated in 0.00208s] ((datetime.datetime(2023, 10, 4, 14, 4, 33, 406624),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406636),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406638),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406639),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406641),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406643),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406645),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406646),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406648),), (datetime.datetime(2023, 10, 4, 14, 4, 33, 406650),))\n", + "2023-10-04 08:34:33,423 INFO sqlalchemy.engine.Engine COMMIT\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/tmp/ipykernel_146652/3174457459.py:6: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to \"sqlalchemy<2.0\". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)\n", + " result = conn.execute(insert(test_db_trigger),[*values])#{\"trigger_at\": trigger_at}\n" + ] + } + ], + "source": [ + "# load sample data.\n", + "from sqlalchemy import insert\n", + "from datetime import datetime\n", + "with engine.connect() as conn:\n", + " values = [{\"trigger_at\": datetime.now()} for _ in range(10)]\n", + " result = conn.execute(insert(test_db_trigger),[*values])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "3. Create a `Database Trigger` object that performs a trigger. We can parse parameters such as `db_path`, `table_name`, `trigger_after_n`, and `poll_interval`. For this illustration, we will use the `PostgreSQL` database." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "database_trigger = DatabaseTrigger(db_path='postgresql+pg8000://postgres:sam@localhost:5432/aqdb',\n", + " table_name=table_name,\n", + " trigger_after_n=2,\n", + " poll_interval=3)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "4. Create a workflow:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "@ct.lattice\n", + "@ct.electron\n", + "def my_workflow():\n", + " return 42" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "5. Dispatch `my_workflow`, disabling its first execution using the `disable_run` parameter in `ct.dispatch`." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "de35492d-4f51-473e-b814-ad203939f85a\n" + ] + } + ], + "source": [ + "dispatch_id = ct.dispatch(my_workflow)()\n", + "print(dispatch_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "6. Attach the trigger to the `dispatch_id` and register it with the trigger server with the where clause to filter dispatches with lattice name `my_workflow`." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "database_trigger.lattice_dispatch_id = dispatch_id\n", + "triggered_at = values[-1][\"trigger_at\"]\n", + "database_trigger.where_clauses = [f\"trigger_at = '{str(triggered_at)}'\"]\n", + "database_trigger.register()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "7. Monitor the Covalent UI. Watch the Dashboard for new dispatches of `my_workflow`.\n", + "\n", + "8. In the Covalent UI, observe that a new `my_workflow` is dispatched after reading the table two times and with a polling interval of 3 seconds.\n", + "\n", + "9. To disable triggers on the dispatch, use the `ct.stop_triggers` function and drop the `test_db_trigger` table." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-10-04 08:36:10,622] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:\n", + "[2023-10-04 08:36:10,627] [DEBUG] local.py: Line 336 in stop_triggers: de35492d-4f51-473e-b814-ad203939f85a\n", + "2023-10-04 08:36:10,630 INFO sqlalchemy.engine.Engine BEGIN (implicit)\n", + "2023-10-04 08:36:10,633 INFO sqlalchemy.engine.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%s\n", + "2023-10-04 08:36:10,637 INFO sqlalchemy.engine.Engine [cached since 100.2s ago] ('test_db_trigger',)\n", + "2023-10-04 08:36:10,643 INFO sqlalchemy.engine.Engine \n", + "DROP TABLE test_db_trigger\n", + "2023-10-04 08:36:10,645 INFO sqlalchemy.engine.Engine [no key 0.00186s] ()\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2023-10-04 08:36:11,073 INFO sqlalchemy.engine.Engine COMMIT\n" + ] + } + ], + "source": [ + "ct.stop_triggers(dispatch_id)\n", + "meta.drop_all(engine, tables=[test_db_trigger], checkfirst=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note that the `stop_triggers` function disables all triggers attached to the specified dispatch. \n", + "\n", + "## See Also\n", + "\n", + "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)\n", + "\n", + "[Adding a TimeTrigger to a Lattice](./trigger_time.ipynb)\n", + "\n", + "[Adding a SQLite Trigger to a Lattice](./trigger_sqlite.ipynb)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "###### " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.17" + }, + "vscode": { + "interpreter": { + "hash": "ffe78875ce1aa6161f50f6a6dec2555e7255bbdb44cc39b93c0dfc1daa8da522" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/doc/source/how_to/execution/trigger_dir.ipynb b/doc/source/how_to/execution/trigger_dir.ipynb index e56de2c74..ae764a907 100644 --- a/doc/source/how_to/execution/trigger_dir.ipynb +++ b/doc/source/how_to/execution/trigger_dir.ipynb @@ -137,7 +137,11 @@ "\n", "## See Also\n", "\n", - "[Adding a Time Trigger to a Lattice](./trigger_time.ipynb)" + "[Adding a Time Trigger to a Lattice](./trigger_time.ipynb)\n", + "\n", + "[Adding a Database Trigger to a Lattice](./trigger_database.ipynb)\n", + "\n", + "[Adding a SQLite Trigger to a Lattice](./trigger_sqlite.ipynb)" ] }, { diff --git a/doc/source/how_to/execution/trigger_sqlite.ipynb b/doc/source/how_to/execution/trigger_sqlite.ipynb new file mode 100644 index 000000000..8ed6a557a --- /dev/null +++ b/doc/source/how_to/execution/trigger_sqlite.ipynb @@ -0,0 +1,183 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Adding a SQLite Trigger to a Lattice\n", + "\n", + "This example illustrates how to use a covalent.trigger.SQLiteTrigger to trigger workflow dispatches automatically at a specified interval.\n", + "\n", + "## Prerequisites\n", + "\n", + "Import Covalent and the trigger." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "import covalent as ct\n", + "from covalent.triggers import SQLiteTrigger" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Procedure\n", + "\n", + "1. Create a `SQLiteTrigger` object that performs a trigger." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "sqlite_trigger = SQLiteTrigger(db_path='path/to/your/database.sqlite',table_name='table name')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "2. Create a workflow:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "@ct.lattice\n", + "@ct.electron\n", + "def my_workflow():\n", + " return 42" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "3. Dispatch `my_workflow`, disabling its first execution using the `disable_run` parameter in `ct.dispatch`." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "5041f1fc-8943-4b96-9f26-a3c7f35cabef\n" + ] + } + ], + "source": [ + "dispatch_id = ct.dispatch(my_workflow)()\n", + "print(dispatch_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "4. Attach the trigger to the `dispatch_id` and register it with the trigger server." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "sqlite_trigger.lattice_dispatch_id = dispatch_id\n", + "sqlite_trigger.register()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "5. Monitor the Covalent UI. Watch the Dashboard for new dispatches of `my_workflow`.\n", + "\n", + "6. In the Covalent UI, observe that a new `my_workflow` is dispatched every five seconds.\n", + "\n", + "7. To disable triggers on the dispatch, use the `ct.stop_triggers` function." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-09-22 07:37:27,218] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:\n", + "[2023-09-22 07:37:27,220] [DEBUG] local.py: Line 336 in stop_triggers: 5041f1fc-8943-4b96-9f26-a3c7f35cabef\n" + ] + } + ], + "source": [ + "ct.stop_triggers(dispatch_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note that the `stop_triggers` function disables all triggers attached to the specified dispatch. \n", + "\n", + "## See Also\n", + "\n", + "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)\n", + "\n", + "[Adding a Time Trigger to a Lattice](./trigger_time.ipynb)\n", + "\n", + "[Adding a Database Trigger to a Lattice](./trigger_database.ipynb)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "###### " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.17" + }, + "vscode": { + "interpreter": { + "hash": "ffe78875ce1aa6161f50f6a6dec2555e7255bbdb44cc39b93c0dfc1daa8da522" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/doc/source/how_to/execution/trigger_time.ipynb b/doc/source/how_to/execution/trigger_time.ipynb index e4db1bcde..b2735840c 100644 --- a/doc/source/how_to/execution/trigger_time.ipynb +++ b/doc/source/how_to/execution/trigger_time.ipynb @@ -76,7 +76,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "4b33912e-2362-4774-8fc0-acddb1c80593\n" + "44f56fcf-96dc-4089-84d4-069fd13e3e58\n" ] } ], @@ -115,20 +115,15 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 6, "metadata": {}, "outputs": [ { - "ename": "HTTPError", - "evalue": "500 Server Error: Internal Server Error for url: http://localhost:48008/api/triggers/stop_observe", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mHTTPError\u001b[0m Traceback (most recent call last)", - "Cell \u001b[0;32mIn[8], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mct\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mstop_triggers\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdispatch_id\u001b[49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m/opt/miniconda3/lib/python3.9/site-packages/covalent/_dispatcher_plugins/local.py:312\u001b[0m, in \u001b[0;36mLocalDispatcher.stop_triggers\u001b[0;34m(dispatch_ids, triggers_server_addr)\u001b[0m\n\u001b[1;32m 309\u001b[0m dispatch_ids \u001b[38;5;241m=\u001b[39m [dispatch_ids]\n\u001b[1;32m 311\u001b[0m r \u001b[38;5;241m=\u001b[39m requests\u001b[38;5;241m.\u001b[39mpost(stop_triggers_url, json\u001b[38;5;241m=\u001b[39mdispatch_ids)\n\u001b[0;32m--> 312\u001b[0m \u001b[43mr\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mraise_for_status\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 314\u001b[0m app_log\u001b[38;5;241m.\u001b[39mdebug(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mTriggers for following dispatch_ids have stopped observing:\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m d_id \u001b[38;5;129;01min\u001b[39;00m dispatch_ids:\n", - "File \u001b[0;32m/opt/miniconda3/lib/python3.9/site-packages/requests/models.py:1021\u001b[0m, in \u001b[0;36mResponse.raise_for_status\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1016\u001b[0m http_error_msg \u001b[38;5;241m=\u001b[39m (\n\u001b[1;32m 1017\u001b[0m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mstatus_code\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m Server Error: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mreason\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m for url: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39murl\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 1018\u001b[0m )\n\u001b[1;32m 1020\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m http_error_msg:\n\u001b[0;32m-> 1021\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m HTTPError(http_error_msg, response\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m)\n", - "\u001b[0;31mHTTPError\u001b[0m: 500 Server Error: Internal Server Error for url: http://localhost:48008/api/triggers/stop_observe" + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-09-25 08:51:25,893] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:\n", + "[2023-09-25 08:51:25,894] [DEBUG] local.py: Line 336 in stop_triggers: 44f56fcf-96dc-4089-84d4-069fd13e3e58\n" ] } ], @@ -144,7 +139,11 @@ "\n", "## See Also\n", "\n", - "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)" + "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)\n", + "\n", + "[Adding a SQLite Trigger to a Lattice](./trigger_sqlite.ipynb)\n", + "\n", + "[Adding a Database Trigger to a Lattice](./trigger_database.ipynb)" ] }, { @@ -171,7 +170,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.12" + "version": "3.8.17" }, "vscode": { "interpreter": { @@ -180,5 +179,5 @@ } }, "nbformat": 4, - "nbformat_minor": 2 + "nbformat_minor": 4 } diff --git a/tests/covalent_tests/triggers/database_trigger_test.py b/tests/covalent_tests/triggers/database_trigger_test.py new file mode 100644 index 000000000..660a78aba --- /dev/null +++ b/tests/covalent_tests/triggers/database_trigger_test.py @@ -0,0 +1,152 @@ +# Copyright 2023 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the Apache License 2.0 (the "License"). A copy of the +# License may be obtained with this software package or at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Use of this file is prohibited except in compliance with the License. +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from covalent.triggers.database_trigger import DatabaseTrigger + + +@pytest.fixture +def database_trigger(): + """ + Fixture to obtain a sample Database Trigger instance for testing + """ + + db_path = "test_db_path" + table_name = "test_table_name" + poll_interval = 1 + where_clauses = ["id > 2", "status = pending"] + trigger_after_n = 1 + return DatabaseTrigger(db_path, table_name, poll_interval, where_clauses, trigger_after_n) + + +def test_sqlite_trigger_init(database_trigger): + """ + Testing whether Database Trigger initializes as expected + """ + + assert database_trigger.db_path == "test_db_path" + assert database_trigger.table_name == "test_table_name" + assert database_trigger.poll_interval == 1 + assert database_trigger.where_clauses == ["id > 2", "status = pending"] + assert database_trigger.trigger_after_n == 1 + + +@pytest.mark.parametrize( + "where_clauses", + [ + ["id > 2", "status = pending"], + None, + ], +) +def test_database_trigger_observe(mocker, where_clauses, database_trigger): + """ + Test the observe method of Database Trigger + """ + + database_trigger.where_clauses = where_clauses + database_trigger.trigger = mocker.MagicMock() + + mock_db_engine = mocker.patch("covalent.triggers.database_trigger.create_engine") + mock_session = mocker.patch("covalent.triggers.database_trigger.Session") + mock_event = mocker.patch("covalent.triggers.database_trigger.Event") + mock_sleep = mocker.patch("covalent.triggers.database_trigger.time.sleep") + + mock_event.return_value.is_set.side_effect = [False, True] + database_trigger.observe() + + sql_poll_cmd = "SELECT * FROM test_table_name" + if where_clauses: + sql_poll_cmd += " WHERE " + sql_poll_cmd += " AND ".join(list(where_clauses)) + sql_poll_cmd += ";" + + mock_db_engine.assert_called_once_with("test_db_path") + mock_session.assert_called_once_with(mock_db_engine("test_db_path")) + mock_event.assert_called_once() + mock_sql_execute = mocker.patch.object(mock_session, "execute", autospec=True) + mock_sql_execute.assert_called_once_with(sql_poll_cmd) + mock_sleep.assert_called_once_with(1) + + +@pytest.mark.parametrize( + "where_clauses", + [ + ["id > 2", "status = COMPLETED"], + None, + ], +) +def test_database_trigger_exception(mocker, where_clauses, database_trigger): + """ + Test the observe method of Database trigger when an OperationalError is raised + """ + + mock_db_engine = mocker.patch("covalent.triggers.database_trigger.create_engine") + mock_session = mocker.patch("covalent.triggers.database_trigger.Session") + mock_event = mocker.patch("covalent.triggers.database_trigger.Event") + mock_sleep = mocker.patch("covalent.triggers.database_trigger.time.sleep") + + mock_event.return_value.is_set.side_effect = [False, True] + database_trigger.observe() + + sql_poll_cmd = "SELECT * FROM test_table_name" + if where_clauses: + sql_poll_cmd += " WHERE " + sql_poll_cmd += " AND ".join(list(where_clauses)) + sql_poll_cmd += ";" + + mock_db_engine.assert_called_once_with("test_db_path") + mock_session.assert_called_once_with(mock_db_engine("test_db_path")) + mock_event.assert_called_once() + mock_sql_execute = mocker.patch.object(mock_session, "execute", autospec=True) + mock_sql_execute.assert_called_once_with(sql_poll_cmd) + mock_sleep.assert_called_once_with(1) + + +@pytest.mark.parametrize( + "where_clauses", + [ + ["id > 2", "status = FAILED"], + None, + ], +) +def test_database_trigger_exception_session(mocker, where_clauses, database_trigger): + """ + Test the observe method of Database trigger when an OperationalError is raised + """ + database_trigger.trigger = mocker.MagicMock() + mock_event = mocker.patch("covalent.triggers.database_trigger.Event") + mock_event.return_value.is_set.side_effect = [False, True] + import sqlalchemy + + # Call the 'observer' method + try: + database_trigger.observe() + except sqlalchemy.exc.ArgumentError as exc: + assert str(exc) == "Could not parse SQLAlchemy URL from string 'test_db_path'" + + +def test_database_trigger_stop(mocker, database_trigger): + """ + Test the stop method of Database Trigger + """ + + mock_stop_flag = mocker.MagicMock() + database_trigger.stop_flag = mock_stop_flag + + database_trigger.stop() + + mock_stop_flag.set.assert_called_once()