From c7d35ce808c78d4cc969abc30df7e9de298ffad1 Mon Sep 17 00:00:00 2001 From: Madhu Kanoor Date: Thu, 2 Nov 2023 17:24:33 -0400 Subject: [PATCH] feat: added pg_notify Added an action to send notifications via postgres LISTEN/NOTIFY --- Dockerfile | 3 +- ansible_rulebook/action/pg_notify.py | 99 +++++++++++++++++++++ ansible_rulebook/rule_set_runner.py | 2 + ansible_rulebook/schema/ruleset_schema.json | 47 ++++++++++ ansible_rulebook/util.py | 22 +++-- setup.cfg | 3 +- 6 files changed, 169 insertions(+), 7 deletions(-) create mode 100644 ansible_rulebook/action/pg_notify.py diff --git a/Dockerfile b/Dockerfile index 97babe0ce..80ce71823 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,10 +16,11 @@ RUN pip install -U pip \ aiokafka \ watchdog \ azure-servicebus \ + psycopg \ && ansible-galaxy collection install ansible.eda RUN bash -c "if [ $DEVEL_COLLECTION_LIBRARY -ne 0 ]; then \ - ansible-galaxy collection install git+https://github.com/ansible/event-driven-ansible.git --force; fi" + ansible-galaxy collection install git+https://github.com/mkanoor/event-driven-ansible.git,pg_listener --force; fi" COPY . $WORKDIR RUN chown -R $USER_ID ./ diff --git a/ansible_rulebook/action/pg_notify.py b/ansible_rulebook/action/pg_notify.py new file mode 100644 index 000000000..6bcdc0138 --- /dev/null +++ b/ansible_rulebook/action/pg_notify.py @@ -0,0 +1,99 @@ +# Copyright 2023 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import json +import logging +import uuid + +import psycopg +from psycopg import OperationalError + +from .control import Control +from .helper import Helper +from .metadata import Metadata + +logger = logging.getLogger(__name__) +MAX_MESSAGE_LENGTH = 7 * 1024 + + +class PGNotify: + """The PGNotify action sends an event to a PG Pub Sub Channel + Needs + dbname + host + user + password + event + """ + + def __init__(self, metadata: Metadata, control: Control, **action_args): + self.helper = Helper(metadata, control, "pg_notify") + self.action_args = action_args + + async def __call__(self): + try: + conn = psycopg.connect( + host=self.action_args["host"], + dbname=self.action_args["dbname"], + user=self.action_args["user"], + password=self.action_args["password"], + autocommit=True, + ) + + cursor = conn.cursor() + event = self.action_args["event"].copy() + if "meta" in event: + event.pop("meta") + + payload = json.dumps(event) + message_length = len(payload) + if message_length >= MAX_MESSAGE_LENGTH: + md5_hash = hashlib.md5(payload.encode("utf-8")).hexdigest() + logger.info("Message length exceeds, will chunk") + message_uuid = str(uuid.uuid4()) + number_of_chunks = int(message_length / MAX_MESSAGE_LENGTH) + 1 + chunked = { + "chunked_msg_uuid": message_uuid, + "number_of_chunks": number_of_chunks, + "message_length": message_length, + "md5_hash": md5_hash, + } + logger.info(f"Chunk info {message_uuid}") + logger.info(f"Number of chunks {number_of_chunks}") + logger.info(f"Total data size {message_length}") + logger.info(f"MD5 Hash {md5_hash}") + + sequence = 1 + for i in range(0, message_length, MAX_MESSAGE_LENGTH): + chunked["chunk"] = payload[i : i + MAX_MESSAGE_LENGTH] + chunked["sequence"] = sequence + sequence += 1 + cursor.execute( + f"NOTIFY {self.action_args['channel']}, " + f"'{json.dumps(chunked)}';" + ) + else: + cursor.execute( + f"NOTIFY {self.action_args['channel']}, '{payload}';" + ) + except OperationalError as e: + logger.error(f"PG Notify operational error {e}") + finally: + if cursor: + cursor.close() + if conn: + conn.close() + + await self.helper.send_default_status() diff --git a/ansible_rulebook/rule_set_runner.py b/ansible_rulebook/rule_set_runner.py index e220d8b99..add7140c6 100644 --- a/ansible_rulebook/rule_set_runner.py +++ b/ansible_rulebook/rule_set_runner.py @@ -32,6 +32,7 @@ from ansible_rulebook.action.debug import Debug from ansible_rulebook.action.metadata import Metadata from ansible_rulebook.action.noop import Noop +from ansible_rulebook.action.pg_notify import PGNotify from ansible_rulebook.action.post_event import PostEvent from ansible_rulebook.action.print_event import PrintEvent from ansible_rulebook.action.retract_fact import RetractFact @@ -74,6 +75,7 @@ "run_module": RunModule, "run_job_template": RunJobTemplate, "run_workflow_template": RunWorkflowTemplate, + "pg_notify": PGNotify, } diff --git a/ansible_rulebook/schema/ruleset_schema.json b/ansible_rulebook/schema/ruleset_schema.json index e312b959f..05ce54798 100644 --- a/ansible_rulebook/schema/ruleset_schema.json +++ b/ansible_rulebook/schema/ruleset_schema.json @@ -203,6 +203,9 @@ }, { "$ref": "#/$defs/shutdown-action" + }, + { + "$ref": "#/$defs/pg-notify-action" } ] } @@ -241,6 +244,9 @@ }, { "$ref": "#/$defs/shutdown-action" + }, + { + "$ref": "#/$defs/pg-notify-action" } ] } @@ -507,6 +513,47 @@ ], "additionalProperties": false }, + "pg-notify-action": { + "type": "object", + "properties": { + "pg_notify": { + "type": "object", + "properties": { + "dbname": { + "type": "string" + }, + "host": { + "type": "string" + }, + "user": { + "type": "string" + }, + "password": { + "type": "string" + }, + "channel": { + "type": "string" + }, + "event": { + "type": "string" + } + }, + "required": [ + "dbname", + "host", + "user", + "password", + "channel", + "event" + ], + "additionalProperties": false + } + }, + "required": [ + "pg_notify" + ], + "additionalProperties": false + }, "post-event-action": { "type": "object", "properties": { diff --git a/ansible_rulebook/util.py b/ansible_rulebook/util.py index 6160f8e0b..7e3ba17ff 100644 --- a/ansible_rulebook/util.py +++ b/ansible_rulebook/util.py @@ -27,8 +27,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union import ansible_runner -import jinja2 -from jinja2.nativetypes import NativeTemplate +from jinja2.nativetypes import NativeEnvironment from packaging import version from packaging.version import InvalidVersion @@ -43,6 +42,17 @@ EDA_BUILTIN_FILTER_PREFIX = "eda.builtin." +def j2_getenv(name, default=None): + """ + Returns the value of a given environment variable name. + + Usage: "{{ getenv(name) }}" + Output: the value of the environment variable + + """ + return os.environ[name] + + def get_horizontal_rule(character): try: return character * int(os.get_terminal_size()[0]) @@ -52,9 +62,11 @@ def get_horizontal_rule(character): def render_string(value: str, context: Dict) -> str: if "{{" in value and "}}" in value: - return NativeTemplate(value, undefined=jinja2.StrictUndefined).render( - context - ) + env = NativeEnvironment() + env.filters["getenv"] = j2_getenv + + tmpl = env.from_string(value) + return tmpl.render(context) return value diff --git a/setup.cfg b/setup.cfg index cca38f83e..14e74d31d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,7 +32,8 @@ install_requires = ansible-runner websockets drools_jpy == 0.3.8 - watchdog + watchdog + psycopg [options.packages.find] include =