From b321b47a7f36846acff525444652de8f1c66233a Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Sat, 15 Apr 2023 14:58:21 +0200 Subject: [PATCH 01/16] [AAP-11261] Add MQTT event source Adding an MQTT event source for rulebooks - Tested --- extensions/eda/plugins/event_sources/mqtt.py | 82 ++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 extensions/eda/plugins/event_sources/mqtt.py diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py new file mode 100644 index 00000000..f5895290 --- /dev/null +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -0,0 +1,82 @@ +""" +mqtt.py +An ansible-rulebook event source plugin for receiving events via a mqtt topic. +Arguments: + host: The host where the mqtt topic is hosted + port: The port where the mqtt server is listening + username: The username to connect to the broker + password: The password to connect to the broker + ca_certs The optional certificate authority file path containing certificate + used to sign mqtt broker certificates + certfile The optional client certificate file path containing the client certificate, + as well as CA certificates needed to establish the certificate's authenticity + keyfile The optional client key file path containing the client private key + keyfile_password The optional password to be used when loading the certificate chain + topic: The mqtt topic to subscribe to + +""" + +import logging +import json +from typing import Any, Dict +import ssl + +import asyncio +import asyncio_mqtt as aiomqtt + +async def main(queue: asyncio.Queue, args: Dict[str, Any]): + logger = logging.getLogger() + + topic = args.get("topic") + + host = args.get("host") + port = args.get("port") + username = args.get("username") + password = args.get("password") + + ca_certs = args.get("ca_certs") + certfile = args.get("certfile") + keyfile = args.get("keyfile") + keyfile_password = args.get("keyfile_password") + + if ca_certs: + tls_params = aiomqtt.TLSParameters( + ca_certs=ca_certs, + certfile=certfile, + keyfile=keyfile, + keyfile_password=keyfile_password + ) + + mqtt_consumer=aiomqtt.Client( + hostname=host, + port=port, + username=username, + password=password, + tls_params=tls_params if ca_certs else None + ) + + async with mqtt_consumer: + async with mqtt_consumer.messages() as messages: + await mqtt_consumer.subscribe(topic) + async for message in messages: + try: + data = json.loads(message.payload.decode()) + await queue.put(data) + except json.decoder.JSONDecodeError as e: + logger.error(e) + finally: + logger.info("Disconnecting from MQTT broker") + mqtt_consumer.disconnect() + +if __name__ == "__main__": + + class MockQueue: + async def put(self, event): + print(event) + + asyncio.run( + main( + MockQueue(), + {"topic": "eda", "host": "localhost", "port": "1883"}, + ) + ) From ec8198a7349526c6be45df91a8f8855034418f92 Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Sun, 16 Apr 2023 12:52:58 +0200 Subject: [PATCH 02/16] Update mqtt.py --- extensions/eda/plugins/event_sources/mqtt.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index f5895290..5ab40917 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -55,7 +55,9 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): tls_params=tls_params if ca_certs else None ) - async with mqtt_consumer: + await mqtt_consumer.connect() + + try: async with mqtt_consumer.messages() as messages: await mqtt_consumer.subscribe(topic) async for message in messages: @@ -64,9 +66,9 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): await queue.put(data) except json.decoder.JSONDecodeError as e: logger.error(e) - finally: - logger.info("Disconnecting from MQTT broker") - mqtt_consumer.disconnect() + finally: + logger.info("Disconneccting from broker") + mqtt_consumer.disconnect() if __name__ == "__main__": From d78d8a5d1434385afdace09d17bf91fc1e2a7886 Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Sun, 16 Apr 2023 13:30:07 +0200 Subject: [PATCH 03/16] Add sample rulebook --- extensions/eda/rulebooks/mqtt-test.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 extensions/eda/rulebooks/mqtt-test.yml diff --git a/extensions/eda/rulebooks/mqtt-test.yml b/extensions/eda/rulebooks/mqtt-test.yml new file mode 100644 index 00000000..cf9d34d6 --- /dev/null +++ b/extensions/eda/rulebooks/mqtt-test.yml @@ -0,0 +1,14 @@ +--- +- name: Hello Events + hosts: all + sources: + - ansible.eda.mqtt: + host: localhost + port: 1883 + topic: test-topic + + rules: + - name: Debug connection + condition: event.test is defined + action: + debug: From 9103642004683a4a7dd12f60a6bb6e14b1ae71af Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Mon, 17 Apr 2023 12:58:20 +0200 Subject: [PATCH 04/16] Update mqtt.py Linting --- extensions/eda/plugins/event_sources/mqtt.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index 5ab40917..c80a5261 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -24,6 +24,7 @@ import asyncio import asyncio_mqtt as aiomqtt + async def main(queue: asyncio.Queue, args: Dict[str, Any]): logger = logging.getLogger() @@ -44,15 +45,15 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): ca_certs=ca_certs, certfile=certfile, keyfile=keyfile, - keyfile_password=keyfile_password + keyfile_password=keyfile_password, ) - mqtt_consumer=aiomqtt.Client( + mqtt_consumer = aiomqtt.Client( hostname=host, port=port, username=username, password=password, - tls_params=tls_params if ca_certs else None + tls_params=tls_params if ca_certs else None, ) await mqtt_consumer.connect() @@ -70,6 +71,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): logger.info("Disconneccting from broker") mqtt_consumer.disconnect() + if __name__ == "__main__": class MockQueue: From d839d689fe8f80f8149b1d1e053c4e27b200c7ba Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Mon, 17 Apr 2023 13:34:38 +0200 Subject: [PATCH 05/16] Linting --- extensions/eda/plugins/event_sources/mqtt.py | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index c80a5261..8bb24dac 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -6,25 +6,26 @@ port: The port where the mqtt server is listening username: The username to connect to the broker password: The password to connect to the broker - ca_certs The optional certificate authority file path containing certificate - used to sign mqtt broker certificates - certfile The optional client certificate file path containing the client certificate, - as well as CA certificates needed to establish the certificate's authenticity - keyfile The optional client key file path containing the client private key - keyfile_password The optional password to be used when loading the certificate chain + ca_certs The optional certificate authority file path containing + certificate used to sign mqtt broker certificates + certfile The optional client certificate file path containing + the client certificate, as well as CA certificates needed + to establish the certificate's authenticity + keyfile The optional client key file path containing the client + private key + keyfile_password The optional password to be used when loading the + certificate chain topic: The mqtt topic to subscribe to """ -import logging +import asyncio import json +import logging from typing import Any, Dict -import ssl -import asyncio import asyncio_mqtt as aiomqtt - async def main(queue: asyncio.Queue, args: Dict[str, Any]): logger = logging.getLogger() @@ -45,15 +46,15 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): ca_certs=ca_certs, certfile=certfile, keyfile=keyfile, - keyfile_password=keyfile_password, + keyfile_password=keyfile_password ) - mqtt_consumer = aiomqtt.Client( + mqtt_consumer=aiomqtt.Client( hostname=host, port=port, username=username, password=password, - tls_params=tls_params if ca_certs else None, + tls_params=tls_params if ca_certs else None ) await mqtt_consumer.connect() @@ -71,7 +72,6 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): logger.info("Disconneccting from broker") mqtt_consumer.disconnect() - if __name__ == "__main__": class MockQueue: From 919ce404d96a10f342c4226bc1cb50e7160852e7 Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Mon, 17 Apr 2023 14:57:17 +0200 Subject: [PATCH 06/16] final linting --- extensions/eda/plugins/event_sources/mqtt.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index 8bb24dac..2b838e37 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -26,6 +26,7 @@ import asyncio_mqtt as aiomqtt + async def main(queue: asyncio.Queue, args: Dict[str, Any]): logger = logging.getLogger() @@ -49,7 +50,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): keyfile_password=keyfile_password ) - mqtt_consumer=aiomqtt.Client( + mqtt_consumer = aiomqtt.Client( hostname=host, port=port, username=username, @@ -72,6 +73,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): logger.info("Disconneccting from broker") mqtt_consumer.disconnect() + if __name__ == "__main__": class MockQueue: From 8f7e49cd1fcdf43a5f6d373219b3a1ec0c9d7dab Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Mon, 17 Apr 2023 15:31:47 +0200 Subject: [PATCH 07/16] Linting --- extensions/eda/plugins/event_sources/mqtt.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index 2b838e37..158b2df5 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -47,7 +47,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): ca_certs=ca_certs, certfile=certfile, keyfile=keyfile, - keyfile_password=keyfile_password + keyfile_password=keyfile_password, ) mqtt_consumer = aiomqtt.Client( @@ -55,7 +55,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): port=port, username=username, password=password, - tls_params=tls_params if ca_certs else None + tls_params=tls_params if ca_certs else None, ) await mqtt_consumer.connect() From 7a98eeafa5dd3bc752ef759171db920af2000aae Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Mon, 29 May 2023 09:27:51 +0200 Subject: [PATCH 08/16] Update mqtt.py --- extensions/eda/plugins/event_sources/mqtt.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index 158b2df5..74192b29 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -8,6 +8,7 @@ password: The password to connect to the broker ca_certs The optional certificate authority file path containing certificate used to sign mqtt broker certificates + validate_certs Disable certificate validation - true/false certfile The optional client certificate file path containing the client certificate, as well as CA certificates needed to establish the certificate's authenticity @@ -38,6 +39,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): password = args.get("password") ca_certs = args.get("ca_certs") + validate_certs = bool(args.get("validate_certs")) certfile = args.get("certfile") keyfile = args.get("keyfile") keyfile_password = args.get("keyfile_password") @@ -48,6 +50,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): certfile=certfile, keyfile=keyfile, keyfile_password=keyfile_password, + cert_reqs=validate_certs if validate_certs is not None else True ) mqtt_consumer = aiomqtt.Client( @@ -55,7 +58,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): port=port, username=username, password=password, - tls_params=tls_params if ca_certs else None, + tls_params=tls_params if ca_certs else None ) await mqtt_consumer.connect() @@ -73,7 +76,6 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): logger.info("Disconneccting from broker") mqtt_consumer.disconnect() - if __name__ == "__main__": class MockQueue: From 90cd2d91b2cb43473c82bde9737835d07c233903 Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Mon, 10 Jul 2023 07:58:08 +0200 Subject: [PATCH 09/16] Linting and formatting --- extensions/eda/plugins/event_sources/mqtt.py | 26 +++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index 74192b29..0b9b64dd 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -1,6 +1,7 @@ -""" -mqtt.py +"""mqtt.py. + An ansible-rulebook event source plugin for receiving events via a mqtt topic. + Arguments: host: The host where the mqtt topic is hosted port: The port where the mqtt server is listening @@ -23,12 +24,13 @@ import asyncio import json import logging -from typing import Any, Dict +from typing import Any, dict import asyncio_mqtt as aiomqtt -async def main(queue: asyncio.Queue, args: Dict[str, Any]): +async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: + """Receive events via a MQTT topic.""" logger = logging.getLogger() topic = args.get("topic") @@ -50,7 +52,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): certfile=certfile, keyfile=keyfile, keyfile_password=keyfile_password, - cert_reqs=validate_certs if validate_certs is not None else True + cert_reqs=validate_certs if validate_certs is not None else True, ) mqtt_consumer = aiomqtt.Client( @@ -58,7 +60,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): port=port, username=username, password=password, - tls_params=tls_params if ca_certs else None + tls_params=tls_params if ca_certs else None, ) await mqtt_consumer.connect() @@ -71,16 +73,21 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): data = json.loads(message.payload.decode()) await queue.put(data) except json.decoder.JSONDecodeError as e: - logger.error(e) + logger.exception(e) finally: logger.info("Disconneccting from broker") mqtt_consumer.disconnect() + if __name__ == "__main__": + """MockQueue if running directly.""" class MockQueue: - async def put(self, event): - print(event) + """A fake queue.""" + + async def put(self: "MockQueue", event: dict) -> None: + """Print the event.""" + print(event) # noqa: T201 asyncio.run( main( @@ -88,3 +95,4 @@ async def put(self, event): {"topic": "eda", "host": "localhost", "port": "1883"}, ) ) + From e2d779d98703b6f05ad94e5a82159fa5f67e659b Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Mon, 10 Jul 2023 09:55:39 +0200 Subject: [PATCH 10/16] Adapted to new linting rules --- extensions/eda/plugins/event_sources/mqtt.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index 0b9b64dd..fb5637aa 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -3,6 +3,7 @@ An ansible-rulebook event source plugin for receiving events via a mqtt topic. Arguments: +--------- host: The host where the mqtt topic is hosted port: The port where the mqtt server is listening username: The username to connect to the broker @@ -73,7 +74,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: data = json.loads(message.payload.decode()) await queue.put(data) except json.decoder.JSONDecodeError as e: - logger.exception(e) + logger.exception("Decoding exception for incoming message") finally: logger.info("Disconneccting from broker") mqtt_consumer.disconnect() @@ -93,6 +94,5 @@ async def put(self: "MockQueue", event: dict) -> None: main( MockQueue(), {"topic": "eda", "host": "localhost", "port": "1883"}, - ) + ), ) - From 283c2b224795e9b95f0466b8ebaabd8f5f091a92 Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Mon, 10 Jul 2023 10:02:00 +0200 Subject: [PATCH 11/16] Fixing exception handling --- extensions/eda/plugins/event_sources/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_sources/mqtt.py index fb5637aa..0ea1d474 100644 --- a/extensions/eda/plugins/event_sources/mqtt.py +++ b/extensions/eda/plugins/event_sources/mqtt.py @@ -73,7 +73,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: try: data = json.loads(message.payload.decode()) await queue.put(data) - except json.decoder.JSONDecodeError as e: + except json.decoder.JSONDecodeError: logger.exception("Decoding exception for incoming message") finally: logger.info("Disconneccting from broker") From 594ec17ffd27d8d95674f48c943c85f98685059e Mon Sep 17 00:00:00 2001 From: kurokobo <2920259+kurokobo@users.noreply.github.com> Date: Sat, 12 Aug 2023 14:00:40 +0900 Subject: [PATCH 12/16] refactor: move mqtt.py from event_sources to event_source --- extensions/eda/plugins/{event_sources => event_source}/mqtt.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename extensions/eda/plugins/{event_sources => event_source}/mqtt.py (100%) diff --git a/extensions/eda/plugins/event_sources/mqtt.py b/extensions/eda/plugins/event_source/mqtt.py similarity index 100% rename from extensions/eda/plugins/event_sources/mqtt.py rename to extensions/eda/plugins/event_source/mqtt.py From a45fdb4eb0c65b6fc52f9812e817f5d17e2bc7d7 Mon Sep 17 00:00:00 2001 From: kurokobo <2920259+kurokobo@users.noreply.github.com> Date: Sat, 12 Aug 2023 14:03:53 +0900 Subject: [PATCH 13/16] fix: correct invalid import --- extensions/eda/plugins/event_source/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/eda/plugins/event_source/mqtt.py b/extensions/eda/plugins/event_source/mqtt.py index 0ea1d474..80da5b30 100644 --- a/extensions/eda/plugins/event_source/mqtt.py +++ b/extensions/eda/plugins/event_source/mqtt.py @@ -25,7 +25,7 @@ import asyncio import json import logging -from typing import Any, dict +from typing import Any import asyncio_mqtt as aiomqtt From 847040a1c560967e49ec010ddc2b5439400f8f7f Mon Sep 17 00:00:00 2001 From: kurokobo <2920259+kurokobo@users.noreply.github.com> Date: Sat, 12 Aug 2023 14:08:14 +0900 Subject: [PATCH 14/16] feat: add aiomqtt to requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 50a9219a..55011070 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ systemd-python dpath pyyaml dpath +aiomqtt From 0b5cc6eae659d468b0145a20f479abb503839ccc Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Sat, 26 Aug 2023 16:33:28 +0200 Subject: [PATCH 15/16] Align to recent aiomqtt changes React to decommissioning of asyncio_mqtt --- extensions/eda/plugins/event_source/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/eda/plugins/event_source/mqtt.py b/extensions/eda/plugins/event_source/mqtt.py index 80da5b30..03d4051d 100644 --- a/extensions/eda/plugins/event_source/mqtt.py +++ b/extensions/eda/plugins/event_source/mqtt.py @@ -27,7 +27,7 @@ import logging from typing import Any -import asyncio_mqtt as aiomqtt +import aiomqtt as aiomqtt async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: From 733f584247b46010b8cc008aab9dc2804d6b32a6 Mon Sep 17 00:00:00 2001 From: Alessandro Rossi <4215912+kubealex@users.noreply.github.com> Date: Sat, 26 Aug 2023 16:48:05 +0200 Subject: [PATCH 16/16] Update mqtt.py --- extensions/eda/plugins/event_source/mqtt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/eda/plugins/event_source/mqtt.py b/extensions/eda/plugins/event_source/mqtt.py index 03d4051d..005a1702 100644 --- a/extensions/eda/plugins/event_source/mqtt.py +++ b/extensions/eda/plugins/event_source/mqtt.py @@ -37,7 +37,7 @@ async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: topic = args.get("topic") host = args.get("host") - port = args.get("port") + port = int(args.get("port")) username = args.get("username") password = args.get("password")