From 49f75ff56495e90a6cde0f65ef7ddd3049094eab Mon Sep 17 00:00:00 2001 From: "Sean M. Collins" Date: Thu, 28 Dec 2023 14:09:17 -0500 Subject: [PATCH] [WIP] cancel sources earlier This is an attempt to address #633 but it's very ham-fisted. --- ansible_rulebook/app.py | 1 + ansible_rulebook/engine.py | 25 +++++++++++++------------ ansible_rulebook/rule_set_runner.py | 6 +++++- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/ansible_rulebook/app.py b/ansible_rulebook/app.py index a98553309..45ae4b3bb 100644 --- a/ansible_rulebook/app.py +++ b/ansible_rulebook/app.py @@ -130,6 +130,7 @@ async def run(parsed_args: argparse.Namespace) -> None: should_reload = await run_rulesets( event_log, + tasks, ruleset_queues, startup_args.variables, startup_args.inventory, diff --git a/ansible_rulebook/engine.py b/ansible_rulebook/engine.py index e4a54ee55..759d9e7e4 100644 --- a/ansible_rulebook/engine.py +++ b/ansible_rulebook/engine.py @@ -18,7 +18,7 @@ import os import runpy from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from drools.dispatch import establish_async_channel, handle_async_messages from drools.ruleset import session_stats @@ -185,6 +185,16 @@ async def start_source( f"Source {source.source_name} initiated shutdown at " f"{str(datetime.now())}" ) + logger.info("Broadcast shutdown to all source plugins") + asyncio.create_task( + broadcast( + Shutdown( + message=shutdown_msg, + source_plugin=source.source_name, + delay=shutdown_delay, + ), + ) + ) except KeyboardInterrupt: shutdown_msg = ( @@ -205,17 +215,6 @@ async def start_source( ) logger.error(shutdown_msg) raise - finally: - logger.debug("Broadcast shutdown to all source plugins") - asyncio.create_task( - broadcast( - Shutdown( - message=shutdown_msg, - source_plugin=source.source_name, - delay=shutdown_delay, - ), - ) - ) class RulebookFileChangeHandler(FileSystemEventHandler): @@ -255,6 +254,7 @@ async def monitor_rulebook(rulebook_file): async def run_rulesets( event_log: asyncio.Queue, + source_tasks: Tuple[List[asyncio.Task]], ruleset_queues: List[RuleSetQueue], variables: Dict, inventory: str = "", @@ -299,6 +299,7 @@ async def run_rulesets( ruleset_runner = RuleSetRunner( event_log=event_log, ruleset_queue_plan=ruleset_queue_plan, + source_tasks=source_tasks, hosts_facts=hosts_facts, variables=variables, rule_set=rulesets[ruleset_queue_plan.ruleset.name], diff --git a/ansible_rulebook/rule_set_runner.py b/ansible_rulebook/rule_set_runner.py index 8d6c534c3..5614daa6c 100644 --- a/ansible_rulebook/rule_set_runner.py +++ b/ansible_rulebook/rule_set_runner.py @@ -18,7 +18,7 @@ import uuid from pprint import PrettyPrinter, pformat from types import MappingProxyType -from typing import Dict, List, Optional, Union, cast +from typing import Dict, List, Optional, Tuple, Union, cast import dpath from drools import ruleset as lang @@ -82,6 +82,7 @@ def __init__( self, event_log: asyncio.Queue, ruleset_queue_plan: EngineRuleSetQueuePlan, + source_tasks: Tuple[List[asyncio.Task]], hosts_facts, variables, rule_set, @@ -92,6 +93,7 @@ def __init__( self.action_loop_task = None self.event_log = event_log self.ruleset_queue_plan = ruleset_queue_plan + self.source_tasks = source_tasks self.name = ruleset_queue_plan.ruleset.name self.rule_set = rule_set self.hosts_facts = hosts_facts @@ -175,6 +177,8 @@ async def _handle_shutdown(self): self.name, str(self.shutdown), ) + for task in self.source_tasks: + task.cancel() if self.shutdown.kind == "now": logger.debug( "ruleset: %s has issued an immediate shutdown", self.name