Skip to content

Commit

Permalink
trigger-enhancements-for-dt (#1734)
Browse files Browse the repository at this point in the history
* trigger-enhancements-for-dt

* Changes for triggers rtd and test

* Restricted health check, reverted docker and base

* Changed license for database trigger

* Additional test cases for database trigger

* Update changelog

* Adding table monitoring on db trigger tutorial.

---------

Co-authored-by: ArunPsiog <[email protected]>
Co-authored-by: Prasy12 <[email protected]>
Co-authored-by: Sankalp Sanand <[email protected]>
  • Loading branch information
4 people authored Oct 5, 2023
1 parent 644bfb5 commit 265092f
Show file tree
Hide file tree
Showing 18 changed files with 838 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed strict version pins on `lmdbm`, `mpire`, `orjson`, and `pennylane`
- Changed license to Apache

### Added

- Documentation and test cases for database triggers.

### Docs

- Added federated learning showcase code
Expand Down
1 change: 1 addition & 0 deletions covalent/triggers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from .base import BaseTrigger # nopycln: import
from .database_trigger import DatabaseTrigger # nopycln: import
from .dir_trigger import DirTrigger # nopycln: import
from .sqlite_trigger import SQLiteTrigger # nopycln: import
from .time_trigger import TimeTrigger # nopycln: import
Expand Down
133 changes: 133 additions & 0 deletions covalent/triggers/database_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2023 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from functools import partial
from threading import Event
from typing import List

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from covalent._shared_files import logger

from .base import BaseTrigger

app_log = logger.app_log
log_stack_info = logger.log_stack_info


class DatabaseTrigger(BaseTrigger):
"""
Database trigger which can read for changes in a database
and trigger workflows based on record changes.
Args:
db_path: Connection string for the database
table_name: Name of the table to observe
poll_interval: Time in seconds to wait for before reading the database again
where_clauses: List of "WHERE" conditions, e.g. ["id > 2", "status = pending"], to check when
polling the database
trigger_after_n: Number of times the event must happen after which the workflow will be triggered.
e.g value of 2 means workflow will be triggered once the event has occurred twice.
lattice_dispatch_id: Lattice dispatch id of the workflow to be triggered
dispatcher_addr: Address of the dispatcher server
triggers_server_addr: Address of the triggers server
Attributes:
self.db_path: Connection string for the database
self.table_name: Name of the table to observe
self.poll_interval: Time in seconds to wait for before reading the database again
self.where_clauses: List of "WHERE" conditions, e.g. ["id > 2", "status = pending"], to check when
polling the database
self.trigger_after_n: Number of times the event must happen after which the workflow will be triggered.
e.g value of 2 means workflow will be triggered once the event has occurred twice.
self.stop_flag: Thread safe flag used to check whether the stop condition has been met
"""

def __init__(
self,
db_path: str,
table_name: str,
poll_interval: int = 1,
where_clauses: List[str] = None,
trigger_after_n: int = 1,
lattice_dispatch_id: str = None,
dispatcher_addr: str = None,
triggers_server_addr: str = None,
):
super().__init__(lattice_dispatch_id, dispatcher_addr, triggers_server_addr)

self.db_path = db_path
self.table_name = table_name
self.poll_interval = poll_interval

self.where_clauses = where_clauses

self.trigger_after_n = trigger_after_n

self.stop_flag = None

def observe(self) -> None:
"""
Keep performing the trigger action as long as
where conditions are met or until stop has being called
"""

app_log.debug("Inside DatabaseTrigger's observe")
event_count = 0

try:
self.engine = create_engine(self.db_path)

with Session(self.engine) as db:
sql_poll_cmd = f"SELECT * FROM {self.table_name}"

if self.where_clauses:
sql_poll_cmd += " WHERE "
sql_poll_cmd += " AND ".join(list(self.where_clauses))
sql_poll_cmd += ";"

execute_cmd = partial(db.execute, sql_poll_cmd)
app_log.debug(f"Poll command: {sql_poll_cmd}")

self.stop_flag = Event()
while not self.stop_flag.is_set():
# Read the DB with specified command
try:
app_log.debug("About to execute...")
if rows := execute_cmd().all():
event_count += 1
if event_count == self.trigger_after_n:
app_log.debug("Invoking trigger")
self.trigger()
event_count = 0

except Exception:
pass

time.sleep(self.poll_interval)
except Exception as e:
app_log.debug("Failed to observe:")
raise

def stop(self) -> None:
"""
Stop the running `self.observe()` method by setting the `self.stop_flag` flag.
"""

self.stop_flag.set()
3 changes: 2 additions & 1 deletion covalent/triggers/trigger_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# limitations under the License.


from covalent.triggers import BaseTrigger, DirTrigger, SQLiteTrigger, TimeTrigger
from covalent.triggers import BaseTrigger, DatabaseTrigger, DirTrigger, SQLiteTrigger, TimeTrigger


class TriggerLoader:
def __init__(self):
self.available_triggers = {
BaseTrigger.__name__: BaseTrigger,
DatabaseTrigger.__name__: DatabaseTrigger,
DirTrigger.__name__: DirTrigger,
TimeTrigger.__name__: TimeTrigger,
SQLiteTrigger.__name__: SQLiteTrigger,
Expand Down
1 change: 1 addition & 0 deletions covalent_dispatcher/_triggers_app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@

from .app import router as triggers_router # nopycln: import
from .app import triggers_only_app # nopycln: import
from .app import trigger_only_router
7 changes: 7 additions & 0 deletions covalent_dispatcher/_triggers_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
log_stack_info = logger.log_stack_info

router = APIRouter()
trigger_only_router = APIRouter()
triggers_only_app = FastAPI()

active_triggers = {}
Expand Down Expand Up @@ -76,6 +77,11 @@ def get_threadpool():
return ThreadPoolExecutor()


@trigger_only_router.get("/triggers/healthcheck")
async def healthcheck(request: Request):
return {"status": "ok"}


@router.get("/triggers/status")
def trigger_server_status(request: Request):
if disable_triggers:
Expand Down Expand Up @@ -135,3 +141,4 @@ async def stop_observe(request: Request):


triggers_only_app.include_router(router, prefix="/api", tags=["Triggers"])
triggers_only_app.include_router(trigger_only_router, prefix="/api", tags=["Triggers"])
4 changes: 3 additions & 1 deletion covalent_ui/api/v1/data_layer/summary_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ def get_summary_overview(self) -> Lattice:
latest_running_task_status=last_ran_job_status[0]
if last_ran_job_status is not None
else None,
total_dispatcher_duration=run_time[0] if run_time is not None else 0,
total_dispatcher_duration=int(run_time[0])
if run_time is not None and run_time[0] is not None
else 0,
total_jobs_failed=total_failed[0],
total_jobs_cancelled=total_jobs_cancelled[0],
total_jobs_new_object=total_jobs_new_object[0],
Expand Down
2 changes: 1 addition & 1 deletion covalent_ui/api/v1/models/dispatch_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DispatchModule(BaseModel):

dispatch_id: str
lattice_name: str
runtime: Optional[Union[int, None]]
runtime: Optional[Union[int, float, None]]
total_electrons: Optional[Union[int, None]]
total_electrons_completed: Optional[Union[int, None]]
started_at: Optional[Union[datetime, None]]
Expand Down
2 changes: 1 addition & 1 deletion covalent_ui/api/v1/models/electrons_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ElectronResponse(BaseModel):
status: Union[str, None] = None
started_at: Union[datetime, None] = None
ended_at: Union[datetime, None] = None
runtime: Union[int, None] = None
runtime: Union[int, float, None] = None
description: Union[str, None] = None
qelectron_data_exists: bool = False
qelectron: Union[dict, None] = None
Expand Down
2 changes: 1 addition & 1 deletion covalent_ui/api/v1/models/lattices_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LatticeDetailResponse(BaseModel):
ended_at: Union[datetime, None] = None
directory: Union[str, None] = None
description: Union[str, None] = None
runtime: Union[int, None] = None
runtime: Union[int, float, None] = None
updated_at: Union[datetime, None] = None


Expand Down
4 changes: 3 additions & 1 deletion covalent_ui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def get_home(request: Request, rest_of_path: str):
)
fastapi_app.mount("/", socketio_app)


if __name__ == "__main__":
ap = argparse.ArgumentParser()

Expand Down Expand Up @@ -116,7 +115,10 @@ def get_home(request: Request, rest_of_path: str):

app_name = "app:fastapi_app"
if args.triggers_only:
from covalent_dispatcher._triggers_app import trigger_only_router # nopycln: import

app_name = "app:triggers_only_app"
fastapi_app.include_router(trigger_only_router, prefix="/api", tags=["Triggers"])
elif args.no_triggers:
import covalent_dispatcher._triggers_app.app as tr_app

Expand Down
2 changes: 2 additions & 0 deletions doc/source/api/triggers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ Examples

- :doc:`Add a timed trigger to a workflow <../../how_to/execution/trigger_time>`
- :doc:`Add a directory trigger to a workflow <../../how_to/execution/trigger_dir>`
- :doc:`Add a sqlite trigger to a workflow <../../how_to/execution/trigger_sqlite>`
- :doc:`Add a database trigger to a workflow <../../how_to/execution/trigger_database>`
18 changes: 16 additions & 2 deletions doc/source/features/triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,24 @@ def my_workflow():
```


4. `DatabaseTrigger`: This trigger monitors the database for changes and performs the trigger action when changes occur. It is helpful for automating tasks in response to database updates. For example:

```{code-block} python
from covalent.triggers import DatabaseTrigger
import covalent as ct
database_trigger = DatabaseTrigger(db_path="db path",table_name='table name')
@ct.lattice(triggers=database_trigger)
def my_workflow():
...
```

These triggers can be easily integrated into your Covalent workflows to automate various tasks based on the desired conditions.

## Trigger How-to Guides

For further examples on how to use triggers, check out the Trigger how to guides:
- {doc}`How to add a directory trigger to a lattice <../how_to/coding/dir_trigger>`
- {doc}`How to add a time trigger to a lattice <../how_to/coding/time_trigger>`
- {doc}`How to add a directory trigger to a lattice <../../how_to/execution/trigger_dir>`
- {doc}`How to add a time trigger to a lattice <../../how_to/execution/trigger_time>`
- {doc}`How to add a sqlite trigger to a lattice <../../how_to/execution/trigger_sqlite>`
- {doc}`How to add a database trigger to a lattice <../../how_to/execution/trigger_database>`
Loading

0 comments on commit 265092f

Please sign in to comment.