From 44061cb8c7092cb5f7f76e250939ef53ede5fda7 Mon Sep 17 00:00:00 2001 From: ArunPsiog Date: Mon, 25 Sep 2023 21:02:30 +0530 Subject: [PATCH] Changes for triggers rtd and test --- doc/source/api/triggers.rst | 2 + doc/source/features/triggers.md | 18 +- .../how_to/execution/trigger_database.ipynb | 183 ++++++++++++++++++ doc/source/how_to/execution/trigger_dir.ipynb | 6 +- .../how_to/execution/trigger_sqlite.ipynb | 183 ++++++++++++++++++ .../how_to/execution/trigger_time.ipynb | 29 ++- .../triggers/database_trigger_test.py | 121 ++++++++++++ 7 files changed, 524 insertions(+), 18 deletions(-) create mode 100644 doc/source/how_to/execution/trigger_database.ipynb create mode 100644 doc/source/how_to/execution/trigger_sqlite.ipynb create mode 100644 tests/covalent_tests/triggers/database_trigger_test.py diff --git a/doc/source/api/triggers.rst b/doc/source/api/triggers.rst index 90218f922..145891811 100644 --- a/doc/source/api/triggers.rst +++ b/doc/source/api/triggers.rst @@ -14,3 +14,5 @@ Examples - :doc:`Add a timed trigger to a workflow <../../how_to/execution/trigger_time>` - :doc:`Add a directory trigger to a workflow <../../how_to/execution/trigger_dir>` +- :doc:`Add a sqlite trigger to a workflow <../../how_to/execution/trigger_sqlite>` +- :doc:`Add a database trigger to a workflow <../../how_to/execution/trigger_database>` diff --git a/doc/source/features/triggers.md b/doc/source/features/triggers.md index 40dd95d1e..242b7635b 100644 --- a/doc/source/features/triggers.md +++ b/doc/source/features/triggers.md @@ -201,10 +201,24 @@ def my_workflow(): ``` + +4. `DatabaseTrigger`: This trigger monitors the database for changes and performs the trigger action when changes occur. It is helpful for automating tasks in response to database updates. For example: + +```{code-block} python +from covalent.triggers import DatabaseTrigger +import covalent as ct +database_trigger = DatabaseTrigger(db_path="db path",table_name='table name') +@ct.lattice(triggers=database_trigger) +def my_workflow(): + ... +``` + These triggers can be easily integrated into your Covalent workflows to automate various tasks based on the desired conditions. ## Trigger How-to Guides For further examples on how to use triggers, check out the Trigger how to guides: -- {doc}`How to add a directory trigger to a lattice <../how_to/coding/dir_trigger>` -- {doc}`How to add a time trigger to a lattice <../how_to/coding/time_trigger>` +- {doc}`How to add a directory trigger to a lattice <../../how_to/execution/trigger_dir>` +- {doc}`How to add a time trigger to a lattice <../../how_to/execution/trigger_time>` +- {doc}`How to add a sqlite trigger to a lattice <../../how_to/execution/trigger_sqlite>` +- {doc}`How to add a database trigger to a lattice <../../how_to/execution/trigger_database>` diff --git a/doc/source/how_to/execution/trigger_database.ipynb b/doc/source/how_to/execution/trigger_database.ipynb new file mode 100644 index 000000000..09d34375a --- /dev/null +++ b/doc/source/how_to/execution/trigger_database.ipynb @@ -0,0 +1,183 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Adding a Database Trigger to a Lattice\n", + "\n", + "This example illustrates how to use a covalent.trigger.DatabaseTrigger to trigger workflow dispatches automatically at a specified interval.\n", + "\n", + "## Prerequisites\n", + "\n", + "Import Covalent and the trigger." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "import covalent as ct\n", + "from covalent.triggers import DatabaseTrigger" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Procedure\n", + "\n", + "1. Create a `Database Trigger` object that performs a trigger." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "database_trigger = DatabaseTrigger(db_path='path/to/your/database',table_name='table name')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "2. Create a workflow:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "@ct.lattice\n", + "@ct.electron\n", + "def my_workflow():\n", + " return 42" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "3. Dispatch `my_workflow`, disabling its first execution using the `disable_run` parameter in `ct.dispatch`." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "5041f1fc-8943-4b96-9f26-a3c7f35cabef\n" + ] + } + ], + "source": [ + "dispatch_id = ct.dispatch(my_workflow)()\n", + "print(dispatch_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "4. Attach the trigger to the `dispatch_id` and register it with the trigger server." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "database_trigger.lattice_dispatch_id = dispatch_id\n", + "database_trigger.register()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "5. Monitor the Covalent UI. Watch the Dashboard for new dispatches of `my_workflow`.\n", + "\n", + "6. In the Covalent UI, observe that a new `my_workflow` is dispatched every five seconds.\n", + "\n", + "7. To disable triggers on the dispatch, use the `ct.stop_triggers` function." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-09-22 07:37:27,218] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:\n", + "[2023-09-22 07:37:27,220] [DEBUG] local.py: Line 336 in stop_triggers: 5041f1fc-8943-4b96-9f26-a3c7f35cabef\n" + ] + } + ], + "source": [ + "ct.stop_triggers(dispatch_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note that the `stop_triggers` function disables all triggers attached to the specified dispatch. \n", + "\n", + "## See Also\n", + "\n", + "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)\n", + "\n", + "[Adding a TimeTrigger to a Lattice](./trigger_time.ipynb)\n", + "\n", + "[Adding a SQLite Trigger to a Lattice](./trigger_sqlite.ipynb)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "###### " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.17" + }, + "vscode": { + "interpreter": { + "hash": "ffe78875ce1aa6161f50f6a6dec2555e7255bbdb44cc39b93c0dfc1daa8da522" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/doc/source/how_to/execution/trigger_dir.ipynb b/doc/source/how_to/execution/trigger_dir.ipynb index e56de2c74..ae764a907 100644 --- a/doc/source/how_to/execution/trigger_dir.ipynb +++ b/doc/source/how_to/execution/trigger_dir.ipynb @@ -137,7 +137,11 @@ "\n", "## See Also\n", "\n", - "[Adding a Time Trigger to a Lattice](./trigger_time.ipynb)" + "[Adding a Time Trigger to a Lattice](./trigger_time.ipynb)\n", + "\n", + "[Adding a Database Trigger to a Lattice](./trigger_database.ipynb)\n", + "\n", + "[Adding a SQLite Trigger to a Lattice](./trigger_sqlite.ipynb)" ] }, { diff --git a/doc/source/how_to/execution/trigger_sqlite.ipynb b/doc/source/how_to/execution/trigger_sqlite.ipynb new file mode 100644 index 000000000..8ed6a557a --- /dev/null +++ b/doc/source/how_to/execution/trigger_sqlite.ipynb @@ -0,0 +1,183 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Adding a SQLite Trigger to a Lattice\n", + "\n", + "This example illustrates how to use a covalent.trigger.SQLiteTrigger to trigger workflow dispatches automatically at a specified interval.\n", + "\n", + "## Prerequisites\n", + "\n", + "Import Covalent and the trigger." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "import covalent as ct\n", + "from covalent.triggers import SQLiteTrigger" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Procedure\n", + "\n", + "1. Create a `SQLiteTrigger` object that performs a trigger." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "sqlite_trigger = SQLiteTrigger(db_path='path/to/your/database.sqlite',table_name='table name')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "2. Create a workflow:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "@ct.lattice\n", + "@ct.electron\n", + "def my_workflow():\n", + " return 42" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "3. Dispatch `my_workflow`, disabling its first execution using the `disable_run` parameter in `ct.dispatch`." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "5041f1fc-8943-4b96-9f26-a3c7f35cabef\n" + ] + } + ], + "source": [ + "dispatch_id = ct.dispatch(my_workflow)()\n", + "print(dispatch_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "4. Attach the trigger to the `dispatch_id` and register it with the trigger server." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "sqlite_trigger.lattice_dispatch_id = dispatch_id\n", + "sqlite_trigger.register()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "5. Monitor the Covalent UI. Watch the Dashboard for new dispatches of `my_workflow`.\n", + "\n", + "6. In the Covalent UI, observe that a new `my_workflow` is dispatched every five seconds.\n", + "\n", + "7. To disable triggers on the dispatch, use the `ct.stop_triggers` function." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-09-22 07:37:27,218] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:\n", + "[2023-09-22 07:37:27,220] [DEBUG] local.py: Line 336 in stop_triggers: 5041f1fc-8943-4b96-9f26-a3c7f35cabef\n" + ] + } + ], + "source": [ + "ct.stop_triggers(dispatch_id)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Note that the `stop_triggers` function disables all triggers attached to the specified dispatch. \n", + "\n", + "## See Also\n", + "\n", + "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)\n", + "\n", + "[Adding a Time Trigger to a Lattice](./trigger_time.ipynb)\n", + "\n", + "[Adding a Database Trigger to a Lattice](./trigger_database.ipynb)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "###### " + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.17" + }, + "vscode": { + "interpreter": { + "hash": "ffe78875ce1aa6161f50f6a6dec2555e7255bbdb44cc39b93c0dfc1daa8da522" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/doc/source/how_to/execution/trigger_time.ipynb b/doc/source/how_to/execution/trigger_time.ipynb index e4db1bcde..b2735840c 100644 --- a/doc/source/how_to/execution/trigger_time.ipynb +++ b/doc/source/how_to/execution/trigger_time.ipynb @@ -76,7 +76,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "4b33912e-2362-4774-8fc0-acddb1c80593\n" + "44f56fcf-96dc-4089-84d4-069fd13e3e58\n" ] } ], @@ -115,20 +115,15 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 6, "metadata": {}, "outputs": [ { - "ename": "HTTPError", - "evalue": "500 Server Error: Internal Server Error for url: http://localhost:48008/api/triggers/stop_observe", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mHTTPError\u001b[0m Traceback (most recent call last)", - "Cell \u001b[0;32mIn[8], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mct\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mstop_triggers\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdispatch_id\u001b[49m\u001b[43m)\u001b[49m\n", - "File \u001b[0;32m/opt/miniconda3/lib/python3.9/site-packages/covalent/_dispatcher_plugins/local.py:312\u001b[0m, in \u001b[0;36mLocalDispatcher.stop_triggers\u001b[0;34m(dispatch_ids, triggers_server_addr)\u001b[0m\n\u001b[1;32m 309\u001b[0m dispatch_ids \u001b[38;5;241m=\u001b[39m [dispatch_ids]\n\u001b[1;32m 311\u001b[0m r \u001b[38;5;241m=\u001b[39m requests\u001b[38;5;241m.\u001b[39mpost(stop_triggers_url, json\u001b[38;5;241m=\u001b[39mdispatch_ids)\n\u001b[0;32m--> 312\u001b[0m \u001b[43mr\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mraise_for_status\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 314\u001b[0m app_log\u001b[38;5;241m.\u001b[39mdebug(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mTriggers for following dispatch_ids have stopped observing:\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m d_id \u001b[38;5;129;01min\u001b[39;00m dispatch_ids:\n", - "File \u001b[0;32m/opt/miniconda3/lib/python3.9/site-packages/requests/models.py:1021\u001b[0m, in \u001b[0;36mResponse.raise_for_status\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 1016\u001b[0m http_error_msg \u001b[38;5;241m=\u001b[39m (\n\u001b[1;32m 1017\u001b[0m \u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mstatus_code\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m Server Error: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mreason\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m for url: \u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39murl\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\n\u001b[1;32m 1018\u001b[0m )\n\u001b[1;32m 1020\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m http_error_msg:\n\u001b[0;32m-> 1021\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m HTTPError(http_error_msg, response\u001b[38;5;241m=\u001b[39m\u001b[38;5;28mself\u001b[39m)\n", - "\u001b[0;31mHTTPError\u001b[0m: 500 Server Error: Internal Server Error for url: http://localhost:48008/api/triggers/stop_observe" + "name": "stdout", + "output_type": "stream", + "text": [ + "[2023-09-25 08:51:25,893] [DEBUG] local.py: Line 334 in stop_triggers: Triggers for following dispatch_ids have stopped observing:\n", + "[2023-09-25 08:51:25,894] [DEBUG] local.py: Line 336 in stop_triggers: 44f56fcf-96dc-4089-84d4-069fd13e3e58\n" ] } ], @@ -144,7 +139,11 @@ "\n", "## See Also\n", "\n", - "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)" + "[Adding a Directory Trigger to a Lattice](./trigger_dir.ipynb)\n", + "\n", + "[Adding a SQLite Trigger to a Lattice](./trigger_sqlite.ipynb)\n", + "\n", + "[Adding a Database Trigger to a Lattice](./trigger_database.ipynb)" ] }, { @@ -171,7 +170,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.12" + "version": "3.8.17" }, "vscode": { "interpreter": { @@ -180,5 +179,5 @@ } }, "nbformat": 4, - "nbformat_minor": 2 + "nbformat_minor": 4 } diff --git a/tests/covalent_tests/triggers/database_trigger_test.py b/tests/covalent_tests/triggers/database_trigger_test.py new file mode 100644 index 000000000..bfab6603b --- /dev/null +++ b/tests/covalent_tests/triggers/database_trigger_test.py @@ -0,0 +1,121 @@ +# Copyright 2023 Agnostiq Inc. +# +# This file is part of Covalent. +# +# Licensed under the GNU Affero General Public License 3.0 (the "License"). +# A copy of the License may be obtained with this software package or at +# +# https://www.gnu.org/licenses/agpl-3.0.en.html +# +# Use of this file is prohibited except in compliance with the License. Any +# modifications or derivative works of this file must retain this copyright +# notice, and modified files must contain a notice indicating that they have +# been altered from the originals. +# +# Covalent is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the License for more details. +# +# Relief from the License may be granted by purchasing a commercial license. + + +import pytest + +from covalent.triggers.database_trigger import DatabaseTrigger + + +@pytest.fixture +def database_trigger(): + """ + Fixture to obtain a sample Database Trigger instance for testing + """ + + db_path = "test_db_path" + table_name = "test_table_name" + poll_interval = 1 + where_clauses = ["id > 2", "status = pending"] + trigger_after_n = 1 + return DatabaseTrigger(db_path, table_name, poll_interval, where_clauses, trigger_after_n) + + +def test_sqlite_trigger_init(database_trigger): + """ + Testing whether Database Trigger initializes as expected + """ + + assert database_trigger.db_path == "test_db_path" + assert database_trigger.table_name == "test_table_name" + assert database_trigger.poll_interval == 1 + assert database_trigger.where_clauses == ["id > 2", "status = pending"] + assert database_trigger.trigger_after_n == 1 + + +@pytest.mark.parametrize( + "where_clauses", + [ + ["id > 2", "status = pending"], + None, + ], +) +def test_database_trigger_observe(mocker, where_clauses, database_trigger): + """ + Test the observe method of Database Trigger + """ + + database_trigger.where_clauses = where_clauses + database_trigger.trigger = mocker.MagicMock() + + mock_db_engine = mocker.patch("covalent.triggers.database_trigger.create_engine") + mock_session = mocker.patch("covalent.triggers.database_trigger.Session") + mock_event = mocker.patch("covalent.triggers.database_trigger.Event") + mock_sleep = mocker.patch("covalent.triggers.database_trigger.time.sleep") + + mock_event.return_value.is_set.side_effect = [False, True] + database_trigger.observe() + + sql_poll_cmd = "SELECT * FROM test_table_name" + if where_clauses: + sql_poll_cmd += " WHERE " + sql_poll_cmd += " AND ".join(list(where_clauses)) + sql_poll_cmd += ";" + + mock_db_engine.assert_called_once_with("test_db_path") + mock_session.assert_called_once_with(mock_db_engine("test_db_path")) + mock_event.assert_called_once() + mock_sql_execute = mocker.patch.object(mock_session, "execute", autospec=True) + mock_sql_execute.assert_called_once_with(sql_poll_cmd) + mock_sleep.assert_called_once_with(1) + + +# def test_database_trigger_exception(mocker, database_trigger): +# """ +# Test the observe method of Database trigger when an OperationalError is raised +# """ + +# database_trigger.trigger = mocker.MagicMock() +# mock_event = mocker.patch("covalent.triggers.database_trigger.Event") +# mock_sleep = mocker.patch("covalent.triggers.database_trigger.time.sleep") +# mock_event.return_value.is_set.side_effect = [False, True] +# mock_session = mocker.patch('covalent.triggers.database_trigger.Session') +# import sqlalchemy +# from sqlalchemy.exc import ArgumentError +# mock_session.side_effect = ArgumentError + +# # Call the 'observer' method +# try: +# database_trigger.observe() +# except sqlalchemy.exc.ArgumentError as exc: +# assert str(exc) == "Could not parse SQLAlchemy URL from string 'test_db_path'" + + +def test_database_trigger_stop(mocker, database_trigger): + """ + Test the stop method of Database Trigger + """ + + mock_stop_flag = mocker.MagicMock() + database_trigger.stop_flag = mock_stop_flag + + database_trigger.stop() + + mock_stop_flag.set.assert_called_once()