Skip to content

Commit

Permalink
Merge branch 'develop' into streamlit_tutorial_update
Browse files Browse the repository at this point in the history
  • Loading branch information
kessler-frost authored Oct 11, 2023
2 parents 59ae855 + 265092f commit bc11c91
Show file tree
Hide file tree
Showing 35 changed files with 3,789 additions and 43 deletions.
13 changes: 10 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Added covalent version attribute to Remote Executors
- Removed unassigned variable names
- Contributing guidelines steps for installing for the first time
- Updated gitignore to ignore yarn files and folders for latest version of yarn

Expand All @@ -18,16 +20,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Changed `actions/checkout@v3` to `actions/checkout@v4` in CI
- Dependabot update to npm in changelog action
- Update tough-cookie to 4.1.3 version
- Added rich support to cli for better printing statements.
- Added rich support to cli for better printing statements.
- Changed semver from 5.7.1 to 5.7.2 in package.json
- Updated word-wrap to 1.2.4 version

### Changed

- Removed strict version pins on `lmdbm`, `mpire`, `orjson`, and `pennylane`
- Changed license to Apache

### Added

- Documentation and test cases for database triggers.

### Docs

- Added federated learning showcase code
- Updated tutorial for redispatching workflows with Streamlit

## [0.229.0-rc.0] - 2023-09-22
Expand All @@ -51,7 +59,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Co-authored-by: Prasanna Venkatesh <[email protected]>
- FilipBolt <[email protected]>


### Fixed

- Formatted executor block under Qelectron job details to handle any class-type values
Expand Down Expand Up @@ -85,7 +92,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Docs

- Fix autodoc for SSH, Slurm, AWS Braket, AWS Lambda, AWS EC2, AWS Batch, Google Batch
- Fix autodoc for SSH, Slurm, AWS Braket, AWS Lambda, AWS EC2, AWS Batch, Google Batch
- Updated documentation links in README
- Added tutorial for redispatching workflows with Streamlit

Expand Down
2 changes: 1 addition & 1 deletion covalent/_shared_files/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self) -> None:
self.generate_default_config()

try:
with open(self.config_file, "r") as f:
with open(self.config_file, "r"):
pass

self.update_config()
Expand Down
2 changes: 1 addition & 1 deletion covalent/_workflow/depsbash.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

def apply_bash_commands(commands):
for cmd in commands:
proc = subprocess.run(
subprocess.run(
cmd, stdin=subprocess.DEVNULL, shell=True, capture_output=True, check=True, text=True
)

Expand Down
12 changes: 6 additions & 6 deletions covalent/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,8 @@ def execute(
output: The result of the function execution.
"""

dispatch_info = DispatchInfo(dispatch_id)
fn_version = function.args[0].python_version
DispatchInfo(dispatch_id)
function.args[0].python_version
self._task_stdout = io.StringIO()
self._task_stderr = io.StringIO()

Expand All @@ -392,10 +392,10 @@ def execute(
self.setup(task_metadata=task_metadata)
result = self.run(function, args, kwargs, task_metadata)
job_status = RESULT_STATUS.COMPLETED
except TaskRuntimeError as err:
except TaskRuntimeError:
job_status = RESULT_STATUS.FAILED
result = None
except TaskCancelledError as err:
except TaskCancelledError:
job_status = RESULT_STATUS.CANCELLED
result = None
finally:
Expand Down Expand Up @@ -651,10 +651,10 @@ async def execute(
await self.setup(task_metadata=task_metadata)
result = await self.run(function, args, kwargs, task_metadata)
job_status = RESULT_STATUS.COMPLETED
except TaskCancelledError as err:
except TaskCancelledError:
job_status = RESULT_STATUS.CANCELLED
result = None
except TaskRuntimeError as err:
except TaskRuntimeError:
job_status = RESULT_STATUS.FAILED
result = None
finally:
Expand Down
3 changes: 3 additions & 0 deletions covalent/executor/executor_plugins/remote_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"poll_freq": 15,
"remote_cache": ".cache/covalent",
"credentials_file": "",
"covalent_version": "",
}


Expand All @@ -58,13 +59,15 @@ def __init__(
poll_freq: int = 15,
remote_cache: str = "",
credentials_file: str = "",
covalent_version: str = "",
**kwargs,
) -> None:
super().__init__(**kwargs)

self.poll_freq = poll_freq
self.remote_cache = remote_cache
self.credentials_file = credentials_file
self.covalent_version = covalent_version

@abstractmethod
async def _validate_credentials(self) -> bool:
Expand Down
1 change: 1 addition & 0 deletions covalent/triggers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

from .base import BaseTrigger # nopycln: import
from .database_trigger import DatabaseTrigger # nopycln: import
from .dir_trigger import DirTrigger # nopycln: import
from .sqlite_trigger import SQLiteTrigger # nopycln: import
from .time_trigger import TimeTrigger # nopycln: import
Expand Down
133 changes: 133 additions & 0 deletions covalent/triggers/database_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2023 Agnostiq Inc.
#
# This file is part of Covalent.
#
# Licensed under the Apache License 2.0 (the "License"). A copy of the
# License may be obtained with this software package or at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Use of this file is prohibited except in compliance with the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
from functools import partial
from threading import Event
from typing import List

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from covalent._shared_files import logger

from .base import BaseTrigger

app_log = logger.app_log
log_stack_info = logger.log_stack_info


class DatabaseTrigger(BaseTrigger):
"""
Database trigger which can read for changes in a database
and trigger workflows based on record changes.
Args:
db_path: Connection string for the database
table_name: Name of the table to observe
poll_interval: Time in seconds to wait for before reading the database again
where_clauses: List of "WHERE" conditions, e.g. ["id > 2", "status = pending"], to check when
polling the database
trigger_after_n: Number of times the event must happen after which the workflow will be triggered.
e.g value of 2 means workflow will be triggered once the event has occurred twice.
lattice_dispatch_id: Lattice dispatch id of the workflow to be triggered
dispatcher_addr: Address of the dispatcher server
triggers_server_addr: Address of the triggers server
Attributes:
self.db_path: Connection string for the database
self.table_name: Name of the table to observe
self.poll_interval: Time in seconds to wait for before reading the database again
self.where_clauses: List of "WHERE" conditions, e.g. ["id > 2", "status = pending"], to check when
polling the database
self.trigger_after_n: Number of times the event must happen after which the workflow will be triggered.
e.g value of 2 means workflow will be triggered once the event has occurred twice.
self.stop_flag: Thread safe flag used to check whether the stop condition has been met
"""

def __init__(
self,
db_path: str,
table_name: str,
poll_interval: int = 1,
where_clauses: List[str] = None,
trigger_after_n: int = 1,
lattice_dispatch_id: str = None,
dispatcher_addr: str = None,
triggers_server_addr: str = None,
):
super().__init__(lattice_dispatch_id, dispatcher_addr, triggers_server_addr)

self.db_path = db_path
self.table_name = table_name
self.poll_interval = poll_interval

self.where_clauses = where_clauses

self.trigger_after_n = trigger_after_n

self.stop_flag = None

def observe(self) -> None:
"""
Keep performing the trigger action as long as
where conditions are met or until stop has being called
"""

app_log.debug("Inside DatabaseTrigger's observe")
event_count = 0

try:
self.engine = create_engine(self.db_path)

with Session(self.engine) as db:
sql_poll_cmd = f"SELECT * FROM {self.table_name}"

if self.where_clauses:
sql_poll_cmd += " WHERE "
sql_poll_cmd += " AND ".join(list(self.where_clauses))
sql_poll_cmd += ";"

execute_cmd = partial(db.execute, sql_poll_cmd)
app_log.debug(f"Poll command: {sql_poll_cmd}")

self.stop_flag = Event()
while not self.stop_flag.is_set():
# Read the DB with specified command
try:
app_log.debug("About to execute...")
if rows := execute_cmd().all():
event_count += 1
if event_count == self.trigger_after_n:
app_log.debug("Invoking trigger")
self.trigger()
event_count = 0

except Exception:
pass

time.sleep(self.poll_interval)
except Exception as e:
app_log.debug("Failed to observe:")
raise

def stop(self) -> None:
"""
Stop the running `self.observe()` method by setting the `self.stop_flag` flag.
"""

self.stop_flag.set()
3 changes: 2 additions & 1 deletion covalent/triggers/trigger_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
# limitations under the License.


from covalent.triggers import BaseTrigger, DirTrigger, SQLiteTrigger, TimeTrigger
from covalent.triggers import BaseTrigger, DatabaseTrigger, DirTrigger, SQLiteTrigger, TimeTrigger


class TriggerLoader:
def __init__(self):
self.available_triggers = {
BaseTrigger.__name__: BaseTrigger,
DatabaseTrigger.__name__: DatabaseTrigger,
DirTrigger.__name__: DirTrigger,
TimeTrigger.__name__: TimeTrigger,
SQLiteTrigger.__name__: SQLiteTrigger,
Expand Down
4 changes: 2 additions & 2 deletions covalent_dispatcher/_cli/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def _graceful_start(
try:
requests.get(dispatcher_addr, timeout=1)
up = True
except requests.exceptions.ConnectionError as err:
except requests.exceptions.ConnectionError:
time.sleep(1)

Path(get_config("dispatcher.cache_dir")).mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -643,7 +643,7 @@ def status() -> None:
try:
response = requests.get(f"http://localhost:{port}/api/triggers/status", timeout=1)
trigger_status = response.json()["status"]
except requests.exceptions.ConnectionError as err:
except requests.exceptions.ConnectionError:
trigger_status = "stopped"

if trigger_status == "running":
Expand Down
1 change: 1 addition & 0 deletions covalent_dispatcher/_triggers_app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@

from .app import router as triggers_router # nopycln: import
from .app import triggers_only_app # nopycln: import
from .app import trigger_only_router
7 changes: 7 additions & 0 deletions covalent_dispatcher/_triggers_app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
log_stack_info = logger.log_stack_info

router = APIRouter()
trigger_only_router = APIRouter()
triggers_only_app = FastAPI()

active_triggers = {}
Expand Down Expand Up @@ -76,6 +77,11 @@ def get_threadpool():
return ThreadPoolExecutor()


@trigger_only_router.get("/triggers/healthcheck")
async def healthcheck(request: Request):
return {"status": "ok"}


@router.get("/triggers/status")
def trigger_server_status(request: Request):
if disable_triggers:
Expand Down Expand Up @@ -135,3 +141,4 @@ async def stop_observe(request: Request):


triggers_only_app.include_router(router, prefix="/api", tags=["Triggers"])
triggers_only_app.include_router(trigger_only_router, prefix="/api", tags=["Triggers"])
5 changes: 3 additions & 2 deletions covalent_ui/api/v1/data_layer/summary_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def get_summary(
Return:
List of top most Lattices and count
"""
result = None

status_filters = self.get_filters(status_filter)

Expand Down Expand Up @@ -234,7 +233,9 @@ def get_summary_overview(self) -> Lattice:
latest_running_task_status=last_ran_job_status[0]
if last_ran_job_status is not None
else None,
total_dispatcher_duration=run_time[0] if run_time is not None else 0,
total_dispatcher_duration=int(run_time[0])
if run_time is not None and run_time[0] is not None
else 0,
total_jobs_failed=total_failed[0],
total_jobs_cancelled=total_jobs_cancelled[0],
total_jobs_new_object=total_jobs_new_object[0],
Expand Down
2 changes: 1 addition & 1 deletion covalent_ui/api/v1/models/dispatch_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class DispatchModule(BaseModel):

dispatch_id: str
lattice_name: str
runtime: Optional[Union[int, None]]
runtime: Optional[Union[int, float, None]]
total_electrons: Optional[Union[int, None]]
total_electrons_completed: Optional[Union[int, None]]
started_at: Optional[Union[datetime, None]]
Expand Down
2 changes: 1 addition & 1 deletion covalent_ui/api/v1/models/electrons_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class ElectronResponse(BaseModel):
status: Union[str, None] = None
started_at: Union[datetime, None] = None
ended_at: Union[datetime, None] = None
runtime: Union[int, None] = None
runtime: Union[int, float, None] = None
description: Union[str, None] = None
qelectron_data_exists: bool = False
qelectron: Union[dict, None] = None
Expand Down
2 changes: 1 addition & 1 deletion covalent_ui/api/v1/models/lattices_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class LatticeDetailResponse(BaseModel):
ended_at: Union[datetime, None] = None
directory: Union[str, None] = None
description: Union[str, None] = None
runtime: Union[int, None] = None
runtime: Union[int, float, None] = None
updated_at: Union[datetime, None] = None


Expand Down
4 changes: 3 additions & 1 deletion covalent_ui/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ def get_home(request: Request, rest_of_path: str):
)
fastapi_app.mount("/", socketio_app)


if __name__ == "__main__":
ap = argparse.ArgumentParser()

Expand Down Expand Up @@ -116,7 +115,10 @@ def get_home(request: Request, rest_of_path: str):

app_name = "app:fastapi_app"
if args.triggers_only:
from covalent_dispatcher._triggers_app import trigger_only_router # nopycln: import

app_name = "app:triggers_only_app"
fastapi_app.include_router(trigger_only_router, prefix="/api", tags=["Triggers"])
elif args.no_triggers:
import covalent_dispatcher._triggers_app.app as tr_app

Expand Down
Loading

0 comments on commit bc11c91

Please sign in to comment.