Skip to content

Commit

Permalink
Merge branch 'main' into conventional_commit
Browse files Browse the repository at this point in the history
  • Loading branch information
mkanoor authored Sep 12, 2023
2 parents 7c3b1b7 + 8a1c812 commit 416966f
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 8 deletions.
15 changes: 14 additions & 1 deletion ansible_rulebook/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def qsize(self):

# FIXME(cutwater): Replace parsed_args with clear interface
async def run(parsed_args: argparse.ArgumentParser) -> None:
file_monitor = None

if parsed_args.worker and parsed_args.websocket_address and parsed_args.id:
logger.info("Starting worker mode")
Expand All @@ -81,6 +82,14 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
startup_args.rulesets = load_rulebook(
parsed_args, startup_args.variables
)
if parsed_args.hot_reload is True and os.path.exists(
parsed_args.rulebook
):
logger.critical(
"HOT-RELOAD: Hot-reload was requested, "
+ "will monitor for rulebook file changes"
)
file_monitor = parsed_args.rulebook
if parsed_args.inventory:
startup_args.inventory = parsed_args.inventory
startup_args.project_data_file = parsed_args.project_tarball
Expand Down Expand Up @@ -119,13 +128,14 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
)
tasks.append(feedback_task)

await run_rulesets(
should_reload = await run_rulesets(
event_log,
ruleset_queues,
startup_args.variables,
startup_args.inventory,
parsed_args,
startup_args.project_data_file,
file_monitor,
)

await event_log.put(dict(type="Exit"))
Expand All @@ -151,6 +161,9 @@ async def run(parsed_args: argparse.ArgumentParser) -> None:
await job_template_runner.close_session()
if error_found:
raise Exception("One of the source plugins failed")
elif should_reload is True:
logger.critical("HOT-RELOAD! rules file changed, now restarting")
await run(parsed_args)


# TODO(cutwater): Maybe move to util.py
Expand Down
10 changes: 6 additions & 4 deletions ansible_rulebook/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from functools import partial
from pprint import pprint
from typing import Callable, Dict, List, Optional, Union
from urllib.parse import urljoin

import ansible_runner
import dpath
Expand Down Expand Up @@ -835,7 +836,7 @@ async def run_job_template(
rule_uuid=source_rule_uuid,
status=controller_job["status"],
run_at=controller_job["created"],
url=_controller_job_url(controller_job),
url=_controller_job_url(controller_job, "jobs"),
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
Expand Down Expand Up @@ -954,7 +955,7 @@ async def run_workflow_template(
rule_uuid=source_rule_uuid,
status=controller_job["status"],
run_at=controller_job["created"],
url=_controller_job_url(controller_job),
url=_controller_job_url(controller_job, "jobs/workflow"),
matching_events=_get_events(variables),
rule_run_at=rule_run_at,
)
Expand Down Expand Up @@ -1084,9 +1085,10 @@ def _embellish_internal_event(event: Dict, method_name: str) -> Dict:
)


def _controller_job_url(data: dict) -> str:
def _controller_job_url(data: dict, prefix: str) -> str:
if "id" in data:
return f"{job_template_runner.host}/#/jobs/{data['id']}/details"
href_slug = f"/#/{prefix}/{data['id']}/details"
return urljoin(job_template_runner.host, href_slug)
return ""


Expand Down
8 changes: 8 additions & 0 deletions ansible_rulebook/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ def get_parser() -> argparse.ArgumentParser:
"Default is sequential, actions will be run only after the "
"previous one ends",
)
parser.add_argument(
"--hot-reload",
help="Will perform hot-reload on rulebook file changes "
"(when running in non-worker mode)."
"This option is ignored in worker mode.",
default="false",
action="store_true",
)
return parser


Expand Down
57 changes: 55 additions & 2 deletions ansible_rulebook/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from drools.dispatch import establish_async_channel, handle_async_messages
from drools.ruleset import session_stats
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

import ansible_rulebook.rule_generator as rule_generator
from ansible_rulebook.collection import (
Expand All @@ -47,6 +49,7 @@
)

from .exception import (
HotReloadException,
SourceFilterNotFoundException,
SourcePluginMainMissingException,
SourcePluginNotAsyncioCompatibleException,
Expand Down Expand Up @@ -217,14 +220,50 @@ async def start_source(
)


class RulebookFileChangeHandler(FileSystemEventHandler):
modified = False

def on_modified(self, event):
logger.debug(f"Rulebook file {event.src_path} has been modified")
self.modified = True

def is_modified(self):
return self.modified


async def monitor_rulebook(rulebook_file):
event_handler = RulebookFileChangeHandler()
to_observe = os.path.abspath(rulebook_file)
observer = Observer()
observer.schedule(event_handler, to_observe, recursive=True)
observer.start()
try:
while not event_handler.is_modified():
await asyncio.sleep(1)
finally:
observer.stop()
observer.join()
# we need to check if the try-clause completed because
# while-loop terminated successfully, in such case we
# follow on the hot-reload use case, or if we got into
# this finally-clause because of other errors.
if event_handler.is_modified():
raise HotReloadException(
"Rulebook file changed, "
+ "raising exception so to asyncio.FIRST_EXCEPTION "
+ "in order to reload"
)


async def run_rulesets(
event_log: asyncio.Queue,
ruleset_queues: List[RuleSetQueue],
variables: Dict,
inventory: str = "",
parsed_args: argparse.ArgumentParser = None,
project_data_file: Optional[str] = None,
):
file_monitor: str = None,
) -> bool:
logger.info("run_ruleset")
rulesets_queue_plans = rule_generator.generate_rulesets(
ruleset_queues, variables, inventory
Expand Down Expand Up @@ -275,6 +314,11 @@ async def run_rulesets(
)
ruleset_tasks.append(ruleset_task)

monitor_task = None
if file_monitor:
monitor_task = asyncio.create_task(monitor_rulebook(file_monitor))
ruleset_tasks.append(monitor_task)

logger.info("Waiting for all ruleset tasks to end")
await asyncio.wait(ruleset_tasks, return_when=asyncio.FIRST_EXCEPTION)
async_task.cancel()
Expand All @@ -284,12 +328,21 @@ async def run_rulesets(
logger.info("Cancelling " + task.get_name())
task.cancel()

should_reload = False
if monitor_task and isinstance(
monitor_task.exception(), HotReloadException
):
logger.debug("Hot-reload, setting should_reload")
should_reload = True

logger.info("Waiting on gather")
asyncio.gather(*ruleset_tasks)
asyncio.gather(*ruleset_tasks, return_exceptions=True)
logger.info("Returning from run_rulesets")
if send_heartbeat_task:
send_heartbeat_task.cancel()

return should_reload


def meta_info_filter(source: EventSource) -> EventSourceFilter:
source_filter_name = "eda.builtin.insert_meta_info"
Expand Down
5 changes: 5 additions & 0 deletions ansible_rulebook/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class UnsupportedActionException(Exception):
pass


class HotReloadException(Exception):

pass


class InventoryNotFound(Exception):

pass
Expand Down
18 changes: 18 additions & 0 deletions tests/e2e/files/rulebooks/test_hot_reload.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
- name: Ruleset 1
hosts: all
sources:
- generic:
payload:
- action: "value_a"
rules:
- name: Matching for value_a
condition: event.action == "value_a"
action:
debug:
msg: "Rule 1: I matched for value_a"
- name: Matching for value_b
condition: event.action == "value_b"
action:
debug:
msg: "Rule 2: I have now matched for value_b"
56 changes: 56 additions & 0 deletions tests/e2e/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,59 @@ def test_terminate_process_sigint():
process.kill()

assert process.returncode == 130


@pytest.mark.e2e
def test_hot_reload():
"""
Execute a rulebook with hot-reload option,
check for first action being triggered,
then modify the content of the rulebook,
check for the other action being triggered.
"""

rulebook = utils.BASE_DATA_PATH / "rulebooks/test_hot_reload.yml"
cmd = utils.Command(rulebook=rulebook, hot_reload=True)

LOGGER.info(f"Running command: {cmd}")

process = subprocess.Popen(
cmd,
cwd=utils.BASE_DATA_PATH,
text=True,
stdout=subprocess.PIPE,
)

with open(rulebook, "rt") as file:
original_data = file.read()
found_rule_1_in_out = False
found_rule_2_in_out = False

start = time.time()
while line := process.stdout.readline():
if "Rule 1: I matched for value_a" in line:
found_rule_1_in_out = True
break
time.sleep(0.1)
if time.time() - start > DEFAULT_CMD_TIMEOUT:
process.kill()

assert found_rule_1_in_out

data = original_data.replace('- action: "value_a"', '- action: "value_b"')
with open(rulebook, "wt") as file:
file.write(data)

start = time.time()
while line := process.stdout.readline():
if "Rule 2: I have now matched for value_b" in line:
found_rule_2_in_out = True
break
time.sleep(0.1)
if time.time() - start > DEFAULT_CMD_TIMEOUT:
process.kill()

with open(rulebook, "wt") as file:
file.write(original_data)

assert found_rule_2_in_out
3 changes: 3 additions & 0 deletions tests/e2e/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Command:
verbosity: int = 0
heartbeat: int = 0
execution_strategy: Optional[str] = None
hot_reload: bool = False

def __post_init__(self):
# verbosity overrides verbose and debug
Expand Down Expand Up @@ -83,6 +84,8 @@ def to_list(self) -> List:
result.extend(["--heartbeat", str(self.heartbeat)])
if self.execution_strategy:
result.extend(["--execution-strategy", self.execution_strategy])
if self.hot_reload:
result.append("--hot-reload")

return result

Expand Down
2 changes: 1 addition & 1 deletion tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -2303,7 +2303,7 @@ async def test_79_workflow_job_template():
status="successful", id=945, created="dummy", artifacts=dict(a=1)
)
job_template_runner.host = "https://examples.com"
job_url = "https://examples.com/#/jobs/945/details"
job_url = "https://examples.com/#/jobs/workflow/945/details"
with SourceTask(rs.sources[0], "sources", {}, queue):
with patch(
"ansible_rulebook.builtin.job_template_runner."
Expand Down

0 comments on commit 416966f

Please sign in to comment.