Skip to content

Commit

Permalink
[WIP] cancel sources earlier
Browse files Browse the repository at this point in the history
This is an attempt to address #633 but it's very ham-fisted.
  • Loading branch information
sc68cal committed Dec 28, 2023
1 parent e17f1cb commit 49f75ff
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
1 change: 1 addition & 0 deletions ansible_rulebook/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async def run(parsed_args: argparse.Namespace) -> None:

should_reload = await run_rulesets(
event_log,
tasks,
ruleset_queues,
startup_args.variables,
startup_args.inventory,
Expand Down
25 changes: 13 additions & 12 deletions ansible_rulebook/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import os
import runpy
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

from drools.dispatch import establish_async_channel, handle_async_messages
from drools.ruleset import session_stats
Expand Down Expand Up @@ -185,6 +185,16 @@ async def start_source(
f"Source {source.source_name} initiated shutdown at "
f"{str(datetime.now())}"
)
logger.info("Broadcast shutdown to all source plugins")
asyncio.create_task(
broadcast(
Shutdown(
message=shutdown_msg,
source_plugin=source.source_name,
delay=shutdown_delay,
),
)
)

except KeyboardInterrupt:
shutdown_msg = (
Expand All @@ -205,17 +215,6 @@ async def start_source(
)
logger.error(shutdown_msg)
raise
finally:
logger.debug("Broadcast shutdown to all source plugins")
asyncio.create_task(
broadcast(
Shutdown(
message=shutdown_msg,
source_plugin=source.source_name,
delay=shutdown_delay,
),
)
)


class RulebookFileChangeHandler(FileSystemEventHandler):
Expand Down Expand Up @@ -255,6 +254,7 @@ async def monitor_rulebook(rulebook_file):

async def run_rulesets(
event_log: asyncio.Queue,
source_tasks: Tuple[List[asyncio.Task]],
ruleset_queues: List[RuleSetQueue],
variables: Dict,
inventory: str = "",
Expand Down Expand Up @@ -299,6 +299,7 @@ async def run_rulesets(
ruleset_runner = RuleSetRunner(
event_log=event_log,
ruleset_queue_plan=ruleset_queue_plan,
source_tasks=source_tasks,
hosts_facts=hosts_facts,
variables=variables,
rule_set=rulesets[ruleset_queue_plan.ruleset.name],
Expand Down
6 changes: 5 additions & 1 deletion ansible_rulebook/rule_set_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import uuid
from pprint import PrettyPrinter, pformat
from types import MappingProxyType
from typing import Dict, List, Optional, Union, cast
from typing import Dict, List, Optional, Tuple, Union, cast

import dpath
from drools import ruleset as lang
Expand Down Expand Up @@ -82,6 +82,7 @@ def __init__(
self,
event_log: asyncio.Queue,
ruleset_queue_plan: EngineRuleSetQueuePlan,
source_tasks: Tuple[List[asyncio.Task]],
hosts_facts,
variables,
rule_set,
Expand All @@ -92,6 +93,7 @@ def __init__(
self.action_loop_task = None
self.event_log = event_log
self.ruleset_queue_plan = ruleset_queue_plan
self.source_tasks = source_tasks
self.name = ruleset_queue_plan.ruleset.name
self.rule_set = rule_set
self.hosts_facts = hosts_facts
Expand Down Expand Up @@ -175,6 +177,8 @@ async def _handle_shutdown(self):
self.name,
str(self.shutdown),
)
for task in self.source_tasks:
task.cancel()
if self.shutdown.kind == "now":
logger.debug(
"ruleset: %s has issued an immediate shutdown", self.name
Expand Down

0 comments on commit 49f75ff

Please sign in to comment.