diff --git a/ansible_rulebook/app.py b/ansible_rulebook/app.py index 346a5f29..5cb3fd3f 100644 --- a/ansible_rulebook/app.py +++ b/ansible_rulebook/app.py @@ -62,6 +62,7 @@ def qsize(self): # FIXME(cutwater): Replace parsed_args with clear interface async def run(parsed_args: argparse.ArgumentParser) -> None: + file_monitor = None if parsed_args.worker and parsed_args.websocket_address and parsed_args.id: logger.info("Starting worker mode") @@ -81,6 +82,14 @@ async def run(parsed_args: argparse.ArgumentParser) -> None: startup_args.rulesets = load_rulebook( parsed_args, startup_args.variables ) + if parsed_args.hot_reload is True and os.path.exists( + parsed_args.rulebook + ): + logger.critical( + "HOT-RELOAD: Hot-reload was requested, " + + "will monitor for rulebook file changes" + ) + file_monitor = parsed_args.rulebook if parsed_args.inventory: startup_args.inventory = parsed_args.inventory startup_args.project_data_file = parsed_args.project_tarball @@ -119,13 +128,14 @@ async def run(parsed_args: argparse.ArgumentParser) -> None: ) tasks.append(feedback_task) - await run_rulesets( + should_reload = await run_rulesets( event_log, ruleset_queues, startup_args.variables, startup_args.inventory, parsed_args, startup_args.project_data_file, + file_monitor, ) await event_log.put(dict(type="Exit")) @@ -151,6 +161,9 @@ async def run(parsed_args: argparse.ArgumentParser) -> None: await job_template_runner.close_session() if error_found: raise Exception("One of the source plugins failed") + elif should_reload is True: + logger.critical("HOT-RELOAD! rules file changed, now restarting") + await run(parsed_args) # TODO(cutwater): Maybe move to util.py diff --git a/ansible_rulebook/builtin.py b/ansible_rulebook/builtin.py index aee9ff16..cac1615d 100644 --- a/ansible_rulebook/builtin.py +++ b/ansible_rulebook/builtin.py @@ -26,6 +26,7 @@ from functools import partial from pprint import pprint from typing import Callable, Dict, List, Optional, Union +from urllib.parse import urljoin import ansible_runner import dpath @@ -835,7 +836,7 @@ async def run_job_template( rule_uuid=source_rule_uuid, status=controller_job["status"], run_at=controller_job["created"], - url=_controller_job_url(controller_job), + url=_controller_job_url(controller_job, "jobs"), matching_events=_get_events(variables), rule_run_at=rule_run_at, ) @@ -954,7 +955,7 @@ async def run_workflow_template( rule_uuid=source_rule_uuid, status=controller_job["status"], run_at=controller_job["created"], - url=_controller_job_url(controller_job), + url=_controller_job_url(controller_job, "jobs/workflow"), matching_events=_get_events(variables), rule_run_at=rule_run_at, ) @@ -1084,9 +1085,10 @@ def _embellish_internal_event(event: Dict, method_name: str) -> Dict: ) -def _controller_job_url(data: dict) -> str: +def _controller_job_url(data: dict, prefix: str) -> str: if "id" in data: - return f"{job_template_runner.host}/#/jobs/{data['id']}/details" + href_slug = f"/#/{prefix}/{data['id']}/details" + return urljoin(job_template_runner.host, href_slug) return "" diff --git a/ansible_rulebook/cli.py b/ansible_rulebook/cli.py index d3f2393b..c1ba74fb 100644 --- a/ansible_rulebook/cli.py +++ b/ansible_rulebook/cli.py @@ -173,6 +173,14 @@ def get_parser() -> argparse.ArgumentParser: "Default is sequential, actions will be run only after the " "previous one ends", ) + parser.add_argument( + "--hot-reload", + help="Will perform hot-reload on rulebook file changes " + "(when running in non-worker mode)." + "This option is ignored in worker mode.", + default="false", + action="store_true", + ) return parser diff --git a/ansible_rulebook/engine.py b/ansible_rulebook/engine.py index 5e576642..609d5883 100644 --- a/ansible_rulebook/engine.py +++ b/ansible_rulebook/engine.py @@ -22,6 +22,8 @@ from drools.dispatch import establish_async_channel, handle_async_messages from drools.ruleset import session_stats +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer import ansible_rulebook.rule_generator as rule_generator from ansible_rulebook.collection import ( @@ -47,6 +49,7 @@ ) from .exception import ( + HotReloadException, SourceFilterNotFoundException, SourcePluginMainMissingException, SourcePluginNotAsyncioCompatibleException, @@ -217,6 +220,41 @@ async def start_source( ) +class RulebookFileChangeHandler(FileSystemEventHandler): + modified = False + + def on_modified(self, event): + logger.debug(f"Rulebook file {event.src_path} has been modified") + self.modified = True + + def is_modified(self): + return self.modified + + +async def monitor_rulebook(rulebook_file): + event_handler = RulebookFileChangeHandler() + to_observe = os.path.abspath(rulebook_file) + observer = Observer() + observer.schedule(event_handler, to_observe, recursive=True) + observer.start() + try: + while not event_handler.is_modified(): + await asyncio.sleep(1) + finally: + observer.stop() + observer.join() + # we need to check if the try-clause completed because + # while-loop terminated successfully, in such case we + # follow on the hot-reload use case, or if we got into + # this finally-clause because of other errors. + if event_handler.is_modified(): + raise HotReloadException( + "Rulebook file changed, " + + "raising exception so to asyncio.FIRST_EXCEPTION " + + "in order to reload" + ) + + async def run_rulesets( event_log: asyncio.Queue, ruleset_queues: List[RuleSetQueue], @@ -224,7 +262,8 @@ async def run_rulesets( inventory: str = "", parsed_args: argparse.ArgumentParser = None, project_data_file: Optional[str] = None, -): + file_monitor: str = None, +) -> bool: logger.info("run_ruleset") rulesets_queue_plans = rule_generator.generate_rulesets( ruleset_queues, variables, inventory @@ -275,6 +314,11 @@ async def run_rulesets( ) ruleset_tasks.append(ruleset_task) + monitor_task = None + if file_monitor: + monitor_task = asyncio.create_task(monitor_rulebook(file_monitor)) + ruleset_tasks.append(monitor_task) + logger.info("Waiting for all ruleset tasks to end") await asyncio.wait(ruleset_tasks, return_when=asyncio.FIRST_EXCEPTION) async_task.cancel() @@ -284,12 +328,21 @@ async def run_rulesets( logger.info("Cancelling " + task.get_name()) task.cancel() + should_reload = False + if monitor_task and isinstance( + monitor_task.exception(), HotReloadException + ): + logger.debug("Hot-reload, setting should_reload") + should_reload = True + logger.info("Waiting on gather") - asyncio.gather(*ruleset_tasks) + asyncio.gather(*ruleset_tasks, return_exceptions=True) logger.info("Returning from run_rulesets") if send_heartbeat_task: send_heartbeat_task.cancel() + return should_reload + def meta_info_filter(source: EventSource) -> EventSourceFilter: source_filter_name = "eda.builtin.insert_meta_info" diff --git a/ansible_rulebook/exception.py b/ansible_rulebook/exception.py index 429da342..0b6e0da9 100644 --- a/ansible_rulebook/exception.py +++ b/ansible_rulebook/exception.py @@ -153,6 +153,11 @@ class UnsupportedActionException(Exception): pass +class HotReloadException(Exception): + + pass + + class InventoryNotFound(Exception): pass diff --git a/tests/e2e/files/rulebooks/test_hot_reload.yml b/tests/e2e/files/rulebooks/test_hot_reload.yml new file mode 100644 index 00000000..42ec2f8a --- /dev/null +++ b/tests/e2e/files/rulebooks/test_hot_reload.yml @@ -0,0 +1,18 @@ +--- +- name: Ruleset 1 + hosts: all + sources: + - generic: + payload: + - action: "value_a" + rules: + - name: Matching for value_a + condition: event.action == "value_a" + action: + debug: + msg: "Rule 1: I matched for value_a" + - name: Matching for value_b + condition: event.action == "value_b" + action: + debug: + msg: "Rule 2: I have now matched for value_b" \ No newline at end of file diff --git a/tests/e2e/test_runtime.py b/tests/e2e/test_runtime.py index 8fc12f62..5e2f7059 100644 --- a/tests/e2e/test_runtime.py +++ b/tests/e2e/test_runtime.py @@ -224,3 +224,59 @@ def test_terminate_process_sigint(): process.kill() assert process.returncode == 130 + + +@pytest.mark.e2e +def test_hot_reload(): + """ + Execute a rulebook with hot-reload option, + check for first action being triggered, + then modify the content of the rulebook, + check for the other action being triggered. + """ + + rulebook = utils.BASE_DATA_PATH / "rulebooks/test_hot_reload.yml" + cmd = utils.Command(rulebook=rulebook, hot_reload=True) + + LOGGER.info(f"Running command: {cmd}") + + process = subprocess.Popen( + cmd, + cwd=utils.BASE_DATA_PATH, + text=True, + stdout=subprocess.PIPE, + ) + + with open(rulebook, "rt") as file: + original_data = file.read() + found_rule_1_in_out = False + found_rule_2_in_out = False + + start = time.time() + while line := process.stdout.readline(): + if "Rule 1: I matched for value_a" in line: + found_rule_1_in_out = True + break + time.sleep(0.1) + if time.time() - start > DEFAULT_CMD_TIMEOUT: + process.kill() + + assert found_rule_1_in_out + + data = original_data.replace('- action: "value_a"', '- action: "value_b"') + with open(rulebook, "wt") as file: + file.write(data) + + start = time.time() + while line := process.stdout.readline(): + if "Rule 2: I have now matched for value_b" in line: + found_rule_2_in_out = True + break + time.sleep(0.1) + if time.time() - start > DEFAULT_CMD_TIMEOUT: + process.kill() + + with open(rulebook, "wt") as file: + file.write(original_data) + + assert found_rule_2_in_out diff --git a/tests/e2e/utils.py b/tests/e2e/utils.py index 5373f305..358d8229 100644 --- a/tests/e2e/utils.py +++ b/tests/e2e/utils.py @@ -37,6 +37,7 @@ class Command: verbosity: int = 0 heartbeat: int = 0 execution_strategy: Optional[str] = None + hot_reload: bool = False def __post_init__(self): # verbosity overrides verbose and debug @@ -83,6 +84,8 @@ def to_list(self) -> List: result.extend(["--heartbeat", str(self.heartbeat)]) if self.execution_strategy: result.extend(["--execution-strategy", self.execution_strategy]) + if self.hot_reload: + result.append("--hot-reload") return result diff --git a/tests/test_examples.py b/tests/test_examples.py index fb15927e..8e9e2c8f 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -2303,7 +2303,7 @@ async def test_79_workflow_job_template(): status="successful", id=945, created="dummy", artifacts=dict(a=1) ) job_template_runner.host = "https://examples.com" - job_url = "https://examples.com/#/jobs/945/details" + job_url = "https://examples.com/#/jobs/workflow/945/details" with SourceTask(rs.sources[0], "sources", {}, queue): with patch( "ansible_rulebook.builtin.job_template_runner."