diff --git a/ansible_rulebook/builtin.py b/ansible_rulebook/builtin.py index 78742e2f..c49731df 100644 --- a/ansible_rulebook/builtin.py +++ b/ansible_rulebook/builtin.py @@ -789,6 +789,8 @@ async def shutdown( source_ruleset_name: str, source_rule_name: str, ruleset: str, + delay: float = 0.0, + message: str = "Default shutdown message", ): await event_log.put( dict( @@ -799,8 +801,17 @@ async def shutdown( rule=source_rule_name, run_at=str(datetime.utcnow()), matching_events=_get_events(variables), + delay=delay, + message=message, ) ) + + print( + "Ruleset: %s rule: %s has initiated shutdown. " + "Delay: %.3f seconds, Message: %s" + % (source_ruleset_name, source_rule_name, delay, message) + ) + await asyncio.sleep(delay) raise ShutdownException() diff --git a/ansible_rulebook/engine.py b/ansible_rulebook/engine.py index a7c048ff..ee807b47 100644 --- a/ansible_rulebook/engine.py +++ b/ansible_rulebook/engine.py @@ -155,16 +155,32 @@ async def start_source( raise Exception("Entrypoint is not a coroutine function.") await entrypoint(fqueue, args) + shutdown_msg = ( + f"Source {source.source_name} initiated shutdown at " + f"{str(datetime.now())}" + ) except KeyboardInterrupt: + shutdown_msg = ( + f"Source {source.source_name} keyboard interrupt, " + f"initiated shutdown at {str(datetime.now())}" + ) pass except asyncio.CancelledError: - logger.info("Task cancelled") - except BaseException: + shutdown_msg = ( + f"Source {source.source_name} task cancelled, " + "initiated shutdown at {str(datetime.now())}" + ) + logger.info("Task cancelled " + shutdown_msg) + except BaseException as e: logger.exception("Source error") + shutdown_msg = ( + f"Shutting down source: {source.source_name} error : {e}" + ) + logger.error(shutdown_msg) raise finally: - await queue.put(Shutdown()) + await queue.put(Shutdown(shutdown_msg, 0.0, source.source_name)) async def run_rulesets( @@ -252,7 +268,7 @@ async def run_ruleset(self): source_loop_task = asyncio.create_task(self._drain_source_queue()) await asyncio.wait([source_loop_task]) - async def _stop(self): + async def _stop(self, obj): # Wait for items in queues to be completed. Mainly for spec tests. await asyncio.sleep(0.01) @@ -266,7 +282,14 @@ async def _stop(self): pass await asyncio.wait([self.pa_runner_task, self.action_loop_task]) - await self.event_log.put(dict(type="Shutdown")) + await self.event_log.put( + dict( + type="Shutdown", + message=obj.message, + delay=obj.delay, + source_plugin=obj.source_plugin, + ) + ) lang.end_session(self.name) async def _drain_source_queue(self): @@ -279,7 +302,8 @@ async def _drain_source_queue(self): logger.debug("Received event : " + str(data)) json_count(data) if isinstance(data, Shutdown): - await self._stop() + logger.info("Shutdown message received: " + str(data)) + await self._stop(data) logger.info("Stopped waiting on events from %s", self.name) return if not data: diff --git a/ansible_rulebook/messages.py b/ansible_rulebook/messages.py index c7bc817e..04803c51 100644 --- a/ansible_rulebook/messages.py +++ b/ansible_rulebook/messages.py @@ -16,5 +16,6 @@ class Shutdown(NamedTuple): - - pass + message: str = "Not specified" + delay: float = 0.0 + source_plugin: str = "" diff --git a/ansible_rulebook/schema/ruleset_schema.json b/ansible_rulebook/schema/ruleset_schema.json index 740cafdf..c92c9c58 100644 --- a/ansible_rulebook/schema/ruleset_schema.json +++ b/ansible_rulebook/schema/ruleset_schema.json @@ -385,7 +385,12 @@ "type": "object", "properties": { "shutdown": { - "type": ["object","null"] + "type": ["object","null"], + "properties": { + "delay": {"type": "number" }, + "message": {"type": "string" } + }, + "additionalProperties": false } }, "required": [ diff --git a/docs/actions.rst b/docs/actions.rst index d1e0c68c..7f76b6c2 100644 --- a/docs/actions.rst +++ b/docs/actions.rst @@ -317,8 +317,21 @@ Example with multiple event match: shutdown ******** +.. list-table:: Shutdown ansible-rulebook + :widths: 25 150 10 + :header-rows: 1 + + * - Name + - Description + - Required + * - delay + - A numeric value about how long to wait before shutting down, default 0.0 + - No + * - message + - A message to be associated with this shutdown + - No -| Generate a shutdown event which will terminate the rulebook engine. If there are multiple +| Generate a shutdown event which will terminate the ansible-rulebook process. | If there are multiple rule-sets running in your rule book, issuing a shutdown will cause | all other rule-sets to end, care needs to be taken to account for running playbooks which | can be impacted when one of the rule set decides to shutdown. @@ -331,6 +344,8 @@ Example: condition: event.i >= 5 action: shutdown: + delay: 0.125 + message: Shutting down after 5 events Results ******* diff --git a/tests/examples/38_shutdown.yml b/tests/examples/38_shutdown.yml index 49709927..e8daeff1 100644 --- a/tests/examples/38_shutdown.yml +++ b/tests/examples/38_shutdown.yml @@ -10,3 +10,5 @@ condition: event.i == 1 action: shutdown: + delay: 1.1845 + message: My rule has triggered a shutdown diff --git a/tests/test_examples.py b/tests/test_examples.py index 2d48ebc0..1dba0464 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -997,6 +997,10 @@ async def test_38_shutdown_action(): event = event_log.get_nowait() assert event["type"] == "Action", "1" assert event["action"] == "shutdown", "1" + assert event["message"] == "My rule has triggered a shutdown" + assert event["delay"] == 1.1845 + assert event["ruleset"] == "Test shutdown action" + assert event["rule"] == "Host 1 rule" event = event_log.get_nowait() assert event["type"] == "Shutdown", "2"