Skip to content

Commit

Permalink
[AAP-8646] Shutdown can optionally include message and delay (#314)
Browse files Browse the repository at this point in the history
https://issues.redhat.com/browse/AAP-8646

The Shutdown message can optionally take the following parameters
- message
- delay

When a source ends it puts out a message
```
Source range initiated shutdown at 2023-01-26 16:35:49.250861
```
  • Loading branch information
mkanoor authored Feb 3, 2023
2 parents b99b613 + 9ab665c commit 3f27d0e
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 10 deletions.
11 changes: 11 additions & 0 deletions ansible_rulebook/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,8 @@ async def shutdown(
source_ruleset_name: str,
source_rule_name: str,
ruleset: str,
delay: float = 0.0,
message: str = "Default shutdown message",
):
await event_log.put(
dict(
Expand All @@ -799,8 +801,17 @@ async def shutdown(
rule=source_rule_name,
run_at=str(datetime.utcnow()),
matching_events=_get_events(variables),
delay=delay,
message=message,
)
)

print(
"Ruleset: %s rule: %s has initiated shutdown. "
"Delay: %.3f seconds, Message: %s"
% (source_ruleset_name, source_rule_name, delay, message)
)
await asyncio.sleep(delay)
raise ShutdownException()


Expand Down
36 changes: 30 additions & 6 deletions ansible_rulebook/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,16 +155,32 @@ async def start_source(
raise Exception("Entrypoint is not a coroutine function.")

await entrypoint(fqueue, args)
shutdown_msg = (
f"Source {source.source_name} initiated shutdown at "
f"{str(datetime.now())}"
)

except KeyboardInterrupt:
shutdown_msg = (
f"Source {source.source_name} keyboard interrupt, "
f"initiated shutdown at {str(datetime.now())}"
)
pass
except asyncio.CancelledError:
logger.info("Task cancelled")
except BaseException:
shutdown_msg = (
f"Source {source.source_name} task cancelled, "
"initiated shutdown at {str(datetime.now())}"
)
logger.info("Task cancelled " + shutdown_msg)
except BaseException as e:
logger.exception("Source error")
shutdown_msg = (
f"Shutting down source: {source.source_name} error : {e}"
)
logger.error(shutdown_msg)
raise
finally:
await queue.put(Shutdown())
await queue.put(Shutdown(shutdown_msg, 0.0, source.source_name))


async def run_rulesets(
Expand Down Expand Up @@ -252,7 +268,7 @@ async def run_ruleset(self):
source_loop_task = asyncio.create_task(self._drain_source_queue())
await asyncio.wait([source_loop_task])

async def _stop(self):
async def _stop(self, obj):
# Wait for items in queues to be completed. Mainly for spec tests.
await asyncio.sleep(0.01)

Expand All @@ -266,7 +282,14 @@ async def _stop(self):
pass

await asyncio.wait([self.pa_runner_task, self.action_loop_task])
await self.event_log.put(dict(type="Shutdown"))
await self.event_log.put(
dict(
type="Shutdown",
message=obj.message,
delay=obj.delay,
source_plugin=obj.source_plugin,
)
)
lang.end_session(self.name)

async def _drain_source_queue(self):
Expand All @@ -279,7 +302,8 @@ async def _drain_source_queue(self):
logger.debug("Received event : " + str(data))
json_count(data)
if isinstance(data, Shutdown):
await self._stop()
logger.info("Shutdown message received: " + str(data))
await self._stop(data)
logger.info("Stopped waiting on events from %s", self.name)
return
if not data:
Expand Down
5 changes: 3 additions & 2 deletions ansible_rulebook/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@


class Shutdown(NamedTuple):

pass
message: str = "Not specified"
delay: float = 0.0
source_plugin: str = ""
7 changes: 6 additions & 1 deletion ansible_rulebook/schema/ruleset_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,12 @@
"type": "object",
"properties": {
"shutdown": {
"type": ["object","null"]
"type": ["object","null"],
"properties": {
"delay": {"type": "number" },
"message": {"type": "string" }
},
"additionalProperties": false
}
},
"required": [
Expand Down
17 changes: 16 additions & 1 deletion docs/actions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,21 @@ Example with multiple event match:
shutdown
********
.. list-table:: Shutdown ansible-rulebook
:widths: 25 150 10
:header-rows: 1

* - Name
- Description
- Required
* - delay
- A numeric value about how long to wait before shutting down, default 0.0
- No
* - message
- A message to be associated with this shutdown
- No

| Generate a shutdown event which will terminate the rulebook engine. If there are multiple
| Generate a shutdown event which will terminate the ansible-rulebook process.
| If there are multiple rule-sets running in your rule book, issuing a shutdown will cause
| all other rule-sets to end, care needs to be taken to account for running playbooks which
| can be impacted when one of the rule set decides to shutdown.
Expand All @@ -331,6 +344,8 @@ Example:
condition: event.i >= 5
action:
shutdown:
delay: 0.125
message: Shutting down after 5 events
Results
*******
Expand Down
2 changes: 2 additions & 0 deletions tests/examples/38_shutdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@
condition: event.i == 1
action:
shutdown:
delay: 1.1845
message: My rule has triggered a shutdown
4 changes: 4 additions & 0 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,10 @@ async def test_38_shutdown_action():
event = event_log.get_nowait()
assert event["type"] == "Action", "1"
assert event["action"] == "shutdown", "1"
assert event["message"] == "My rule has triggered a shutdown"
assert event["delay"] == 1.1845
assert event["ruleset"] == "Test shutdown action"
assert event["rule"] == "Host 1 rule"
event = event_log.get_nowait()
assert event["type"] == "Shutdown", "2"

Expand Down

0 comments on commit 3f27d0e

Please sign in to comment.