From 6019958b55b227af574f6265da53e810e60a3255 Mon Sep 17 00:00:00 2001 From: John DeAngelis Date: Tue, 10 Dec 2024 17:38:56 -0500 Subject: [PATCH 1/6] feat: add initial MessageBus class Signed-off-by: John DeAngelis --- backend/ops_api/ops/__init__.py | 5 +++ backend/ops_api/ops/messagebus/__init__.py | 0 backend/ops_api/ops/services/message_bus.py | 23 +++++++++++ backend/ops_api/ops/utils/events.py | 3 ++ .../ops_api/tests/ops/messagebus/__init__.py | 0 .../tests/ops/messagebus/test_message_bus.py | 41 +++++++++++++++++++ 6 files changed, 72 insertions(+) create mode 100644 backend/ops_api/ops/messagebus/__init__.py create mode 100644 backend/ops_api/ops/services/message_bus.py create mode 100644 backend/ops_api/tests/ops/messagebus/__init__.py create mode 100644 backend/ops_api/tests/ops/messagebus/test_message_bus.py diff --git a/backend/ops_api/ops/__init__.py b/backend/ops_api/ops/__init__.py index 89cc7e2839..d7fce4d0e0 100644 --- a/backend/ops_api/ops/__init__.py +++ b/backend/ops_api/ops/__init__.py @@ -16,6 +16,8 @@ from ops_api.ops.db import handle_create_update_by_attrs, init_db from ops_api.ops.error_handlers import register_error_handlers from ops_api.ops.home_page.views import home +from ops_api.ops.messagebus.cans import can_history_trigger +from ops_api.ops.services.message_bus import MessageBus from ops_api.ops.urls import register_api from ops_api.ops.utils.core import is_fake_user, is_unit_test @@ -84,6 +86,9 @@ def create_app() -> Flask: app.db_session = db_session app.engine = engine + app.message_bus = MessageBus() + app.message_bus.subscribe("can_history_signal", can_history_trigger) + @app.teardown_appcontext def shutdown_session(exception=None): app.db_session.remove() diff --git a/backend/ops_api/ops/messagebus/__init__.py b/backend/ops_api/ops/messagebus/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/ops_api/ops/services/message_bus.py b/backend/ops_api/ops/services/message_bus.py new file mode 100644 index 0000000000..79f81a5bb3 --- /dev/null +++ b/backend/ops_api/ops/services/message_bus.py @@ -0,0 +1,23 @@ +from blinker import signal +from flask import current_app + +from models import OpsEvent + + +class MessageBus: + + def subscribe(self, topic: str, callback: callable): + ops_signal = signal(topic) + ops_signal.connect(callback) + + def publish(self, topic: str, event: OpsEvent): + ops_signal = signal(topic) + ops_signal.send(event, session=current_app.db_session) + + def unsubscribe(self, topic, callback): + ops_signal = signal(topic) + ops_signal.disconnect(callback) + + def get_subscribers(self, topic) -> dict: + ops_signal = signal(topic) + return ops_signal.receivers diff --git a/backend/ops_api/ops/utils/events.py b/backend/ops_api/ops/utils/events.py index 3cfca03b69..d0658e13ff 100644 --- a/backend/ops_api/ops/utils/events.py +++ b/backend/ops_api/ops/utils/events.py @@ -62,3 +62,6 @@ def __exit__( if not current_app.db_session.is_active: current_app.logger.error("Session is not active. It has likely been rolled back.") + + if current_app.message_bus: + current_app.message_bus.publish(self.event_type.name, event) diff --git a/backend/ops_api/tests/ops/messagebus/__init__.py b/backend/ops_api/tests/ops/messagebus/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/ops_api/tests/ops/messagebus/test_message_bus.py b/backend/ops_api/tests/ops/messagebus/test_message_bus.py new file mode 100644 index 0000000000..10f2d9b851 --- /dev/null +++ b/backend/ops_api/tests/ops/messagebus/test_message_bus.py @@ -0,0 +1,41 @@ +from sqlalchemy.orm import Session + +from models import OpsEvent, OpsEventType +from ops_api.ops.services.message_bus import MessageBus +from ops_api.ops.utils.events import OpsEventHandler + + +def test_message_bus_subscriptions(app, loaded_db, mocker): + mock_callback_1 = mocker.MagicMock() + mock_callback_2 = mocker.MagicMock() + mock_callback_3 = mocker.MagicMock() + + message_bus = MessageBus() + message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, mock_callback_1) + message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, mock_callback_2) + message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, mock_callback_3) + + with app.app_context(): + app.message_bus = message_bus + + oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) + oeh.__exit__(None, None, None) + + assert mock_callback_1.call_count == 1 + assert mock_callback_2.call_count == 1 + assert mock_callback_3.call_count == 1 + + +def test_message_bus_db_session(app, loaded_db, mocker): + def callback_1(event: OpsEvent, session: Session): + assert event.event_type == OpsEventType.CREATE_NEW_CAN + assert session is not None + + message_bus = MessageBus() + message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, callback_1) + + with app.app_context(): + app.message_bus = message_bus + + oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) + oeh.__exit__(None, None, None) From 76a035e6de56cd6457f96e2f158abc31e9dbc823 Mon Sep 17 00:00:00 2001 From: John DeAngelis Date: Tue, 10 Dec 2024 17:41:53 -0500 Subject: [PATCH 2/6] chore: add __init__.py to package Signed-off-by: John DeAngelis --- backend/ops_api/ops/services/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 backend/ops_api/ops/services/__init__.py diff --git a/backend/ops_api/ops/services/__init__.py b/backend/ops_api/ops/services/__init__.py new file mode 100644 index 0000000000..e69de29bb2 From 88b491d2e480a56fca4396fe9207dc4abdd28e0b Mon Sep 17 00:00:00 2001 From: John DeAngelis Date: Wed, 11 Dec 2024 11:17:45 -0500 Subject: [PATCH 3/6] refactor: move MessageBus to request lifecycle Signed-off-by: John DeAngelis --- backend/ops_api/ops/__init__.py | 7 +++--- backend/ops_api/ops/utils/events.py | 4 +-- .../tests/ops/messagebus/test_message_bus.py | 25 +++++++++++-------- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/backend/ops_api/ops/__init__.py b/backend/ops_api/ops/__init__.py index d7fce4d0e0..06b1f1d84b 100644 --- a/backend/ops_api/ops/__init__.py +++ b/backend/ops_api/ops/__init__.py @@ -86,9 +86,6 @@ def create_app() -> Flask: app.db_session = db_session app.engine = engine - app.message_bus = MessageBus() - app.message_bus.subscribe("can_history_signal", can_history_trigger) - @app.teardown_appcontext def shutdown_session(exception=None): app.db_session.remove() @@ -167,3 +164,7 @@ def before_request_function(app: Flask, request: request): if not is_unit_test() and not is_fake_user(app, current_user): current_app.logger.info(f"Checking user session for {current_user.oidc_id}") check_user_session_function(current_user) + + # initialize MessageBus + request.message_bus = MessageBus() + request.message_bus.subscribe("can_history_signal", can_history_trigger) diff --git a/backend/ops_api/ops/utils/events.py b/backend/ops_api/ops/utils/events.py index d0658e13ff..7b03b1584e 100644 --- a/backend/ops_api/ops/utils/events.py +++ b/backend/ops_api/ops/utils/events.py @@ -63,5 +63,5 @@ def __exit__( if not current_app.db_session.is_active: current_app.logger.error("Session is not active. It has likely been rolled back.") - if current_app.message_bus: - current_app.message_bus.publish(self.event_type.name, event) + if request.message_bus: + request.message_bus.publish(self.event_type.name, event) diff --git a/backend/ops_api/tests/ops/messagebus/test_message_bus.py b/backend/ops_api/tests/ops/messagebus/test_message_bus.py index 10f2d9b851..d98941cb52 100644 --- a/backend/ops_api/tests/ops/messagebus/test_message_bus.py +++ b/backend/ops_api/tests/ops/messagebus/test_message_bus.py @@ -1,3 +1,4 @@ +import pytest from sqlalchemy.orm import Session from models import OpsEvent, OpsEventType @@ -5,7 +6,8 @@ from ops_api.ops.utils.events import OpsEventHandler -def test_message_bus_subscriptions(app, loaded_db, mocker): +@pytest.mark.usefixtures("app_ctx") +def test_message_bus_subscriptions(loaded_db, mocker): mock_callback_1 = mocker.MagicMock() mock_callback_2 = mocker.MagicMock() mock_callback_3 = mocker.MagicMock() @@ -15,18 +17,20 @@ def test_message_bus_subscriptions(app, loaded_db, mocker): message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, mock_callback_2) message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, mock_callback_3) - with app.app_context(): - app.message_bus = message_bus + # patch the request object + r_patch = mocker.patch("ops_api.ops.utils.events.request") + r_patch.message_bus = message_bus - oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) - oeh.__exit__(None, None, None) + oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) + oeh.__exit__(None, None, None) assert mock_callback_1.call_count == 1 assert mock_callback_2.call_count == 1 assert mock_callback_3.call_count == 1 -def test_message_bus_db_session(app, loaded_db, mocker): +@pytest.mark.usefixtures("app_ctx") +def test_message_bus_db_session(loaded_db, mocker): def callback_1(event: OpsEvent, session: Session): assert event.event_type == OpsEventType.CREATE_NEW_CAN assert session is not None @@ -34,8 +38,9 @@ def callback_1(event: OpsEvent, session: Session): message_bus = MessageBus() message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, callback_1) - with app.app_context(): - app.message_bus = message_bus + # patch the request object + r_patch = mocker.patch("ops_api.ops.utils.events.request") + r_patch.message_bus = message_bus - oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) - oeh.__exit__(None, None, None) + oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) + oeh.__exit__(None, None, None) From 4d639a0ab4a7cbb3f52159309315cf678a298dd6 Mon Sep 17 00:00:00 2001 From: John DeAngelis Date: Wed, 11 Dec 2024 16:30:33 -0500 Subject: [PATCH 4/6] feat: add initial can_messages.py Signed-off-by: John DeAngelis --- backend/ops_api/ops/__init__.py | 13 +++-- backend/ops_api/ops/messagebus/__init__.py | 0 backend/ops_api/ops/services/can_messages.py | 12 +++++ backend/ops_api/ops/services/message_bus.py | 54 ++++++++++++++----- backend/ops_api/ops/utils/events.py | 8 +-- .../tests/ops/messagebus/test_message_bus.py | 40 +++++++------- 6 files changed, 90 insertions(+), 37 deletions(-) delete mode 100644 backend/ops_api/ops/messagebus/__init__.py create mode 100644 backend/ops_api/ops/services/can_messages.py diff --git a/backend/ops_api/ops/__init__.py b/backend/ops_api/ops/__init__.py index 06b1f1d84b..d1ee0f5631 100644 --- a/backend/ops_api/ops/__init__.py +++ b/backend/ops_api/ops/__init__.py @@ -10,13 +10,14 @@ from sqlalchemy import event from sqlalchemy.orm import Session +from models import OpsEventType from models.utils import track_db_history_after, track_db_history_before, track_db_history_catch_errors from ops_api.ops.auth.decorators import check_user_session_function from ops_api.ops.auth.extension_config import jwtMgr from ops_api.ops.db import handle_create_update_by_attrs, init_db from ops_api.ops.error_handlers import register_error_handlers from ops_api.ops.home_page.views import home -from ops_api.ops.messagebus.cans import can_history_trigger +from ops_api.ops.services.can_messages import can_history_trigger from ops_api.ops.services.message_bus import MessageBus from ops_api.ops.urls import register_api from ops_api.ops.utils.core import is_fake_user, is_unit_test @@ -26,7 +27,7 @@ time.tzset() -def create_app() -> Flask: +def create_app() -> Flask: # noqa: C901 from ops_api.ops.utils.core import is_unit_test log_level = "INFO" if not is_unit_test() else "DEBUG" @@ -90,6 +91,11 @@ def create_app() -> Flask: def shutdown_session(exception=None): app.db_session.remove() + @app.teardown_request + def teardown_request(exception=None): + if hasattr(request, "message_bus"): + request.message_bus.handle() + @event.listens_for(db_session, "before_commit") def receive_before_commit(session: Session): track_db_history_before(session, current_user) @@ -165,6 +171,5 @@ def before_request_function(app: Flask, request: request): current_app.logger.info(f"Checking user session for {current_user.oidc_id}") check_user_session_function(current_user) - # initialize MessageBus request.message_bus = MessageBus() - request.message_bus.subscribe("can_history_signal", can_history_trigger) + request.message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, can_history_trigger) diff --git a/backend/ops_api/ops/messagebus/__init__.py b/backend/ops_api/ops/messagebus/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/backend/ops_api/ops/services/can_messages.py b/backend/ops_api/ops/services/can_messages.py new file mode 100644 index 0000000000..805983cae2 --- /dev/null +++ b/backend/ops_api/ops/services/can_messages.py @@ -0,0 +1,12 @@ +from loguru import logger +from sqlalchemy.orm import Session + +from models import OpsEvent + + +def can_history_trigger( + event: OpsEvent, + session: Session, +): + logger.debug(f"Handling event {event}") + assert session is not None diff --git a/backend/ops_api/ops/services/message_bus.py b/backend/ops_api/ops/services/message_bus.py index 79f81a5bb3..0a6a11f0f5 100644 --- a/backend/ops_api/ops/services/message_bus.py +++ b/backend/ops_api/ops/services/message_bus.py @@ -1,23 +1,53 @@ +from typing import List + from blinker import signal from flask import current_app +from loguru import logger -from models import OpsEvent +from models import OpsEvent, OpsEventType class MessageBus: + """ + A simple message bus implementation that allows for publishing and subscribing to events. + + This message bus implementation uses the Blinker library to handle event signals. + + This message bus assumes it exists in a single thread and is meant to be used within the context of a single request. + + Published events are stored in a list and are handled when the handle method is called (usually at the end of a request). + """ + + published_events: List[OpsEvent] = [] + + def handle(self): + """ + Handle all published events by calling the appropriate handlers for each event type. + """ + for event in self.published_events: + ops_signal = signal(event.event_type.name) + ops_signal.send(event, session=current_app.db_session) + logger.info(f"Handling event {event}") + + def subscribe(self, event_type: OpsEventType, callback: callable): + """ + Subscribe to an event type with a callback function. - def subscribe(self, topic: str, callback: callable): - ops_signal = signal(topic) + :param event_type: The event type to subscribe to. + :param callback: The callback function to call when the event is published. + """ + logger.debug(f"Subscribing to {event_type} with callback {callback}") + ops_signal = signal(event_type.name) ops_signal.connect(callback) - def publish(self, topic: str, event: OpsEvent): - ops_signal = signal(topic) - ops_signal.send(event, session=current_app.db_session) + def publish(self, event_type: OpsEventType, event: OpsEvent): + """ + Publish an event with the given event type and details. - def unsubscribe(self, topic, callback): - ops_signal = signal(topic) - ops_signal.disconnect(callback) + N.B. This method does not handle the event immediately. The event will be handled when the handle method is called. - def get_subscribers(self, topic) -> dict: - ops_signal = signal(topic) - return ops_signal.receivers + :param event_type: The event type to publish. + :param event: The event details to publish. + """ + logger.debug(f"Publishing event {event_type} with details {event}") + self.published_events.append(event) diff --git a/backend/ops_api/ops/utils/events.py b/backend/ops_api/ops/utils/events.py index 7b03b1584e..55b63e2ca0 100644 --- a/backend/ops_api/ops/utils/events.py +++ b/backend/ops_api/ops/utils/events.py @@ -3,6 +3,7 @@ from flask import current_app, request from flask_jwt_extended import current_user +from loguru import logger from sqlalchemy.orm import Session from werkzeug.exceptions import UnsupportedMediaType @@ -58,10 +59,11 @@ def __exit__( current_app.logger.info(f"EVENT: {event.to_dict()}") if isinstance(exc_val, Exception): - current_app.logger.error(f"EVENT ({exc_type}): {exc_val}") + logger.error(f"EVENT ({exc_type}): {exc_val}") if not current_app.db_session.is_active: - current_app.logger.error("Session is not active. It has likely been rolled back.") + logger.error("Session is not active. It has likely been rolled back.") - if request.message_bus: + if hasattr(request, "message_bus"): + logger.info(f"Publishing event {self.event_type.name}") request.message_bus.publish(self.event_type.name, event) diff --git a/backend/ops_api/tests/ops/messagebus/test_message_bus.py b/backend/ops_api/tests/ops/messagebus/test_message_bus.py index d98941cb52..5c30b99090 100644 --- a/backend/ops_api/tests/ops/messagebus/test_message_bus.py +++ b/backend/ops_api/tests/ops/messagebus/test_message_bus.py @@ -1,5 +1,4 @@ import pytest -from sqlalchemy.orm import Session from models import OpsEvent, OpsEventType from ops_api.ops.services.message_bus import MessageBus @@ -7,40 +6,45 @@ @pytest.mark.usefixtures("app_ctx") -def test_message_bus_subscriptions(loaded_db, mocker): +def test_message_bus_handle(loaded_db, mocker): mock_callback_1 = mocker.MagicMock() mock_callback_2 = mocker.MagicMock() mock_callback_3 = mocker.MagicMock() message_bus = MessageBus() - message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, mock_callback_1) - message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, mock_callback_2) - message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, mock_callback_3) + message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_1) + message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_2) + message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_3) - # patch the request object - r_patch = mocker.patch("ops_api.ops.utils.events.request") - r_patch.message_bus = message_bus + message_bus.publish(OpsEventType.CREATE_NEW_CAN, OpsEvent(event_type=OpsEventType.CREATE_NEW_CAN)) - oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) - oeh.__exit__(None, None, None) + message_bus.handle() - assert mock_callback_1.call_count == 1 - assert mock_callback_2.call_count == 1 - assert mock_callback_3.call_count == 1 + mock_callback_1.assert_called() + mock_callback_2.assert_called() + mock_callback_3.assert_called() @pytest.mark.usefixtures("app_ctx") -def test_message_bus_db_session(loaded_db, mocker): - def callback_1(event: OpsEvent, session: Session): - assert event.event_type == OpsEventType.CREATE_NEW_CAN - assert session is not None +def test_message_bus_create_cans(loaded_db, mocker): + mock_callback_1 = mocker.MagicMock() + mock_callback_2 = mocker.MagicMock() + mock_callback_3 = mocker.MagicMock() message_bus = MessageBus() - message_bus.subscribe(OpsEventType.CREATE_NEW_CAN.name, callback_1) # patch the request object r_patch = mocker.patch("ops_api.ops.utils.events.request") r_patch.message_bus = message_bus + r_patch.message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_1) + r_patch.message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_2) + r_patch.message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_3) oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) oeh.__exit__(None, None, None) + + message_bus.handle() + + mock_callback_1.assert_called() + mock_callback_2.assert_called() + mock_callback_3.assert_called() From 4089a957a0151daaabf409033b392316763c04f3 Mon Sep 17 00:00:00 2001 From: John DeAngelis Date: Wed, 11 Dec 2024 16:43:09 -0500 Subject: [PATCH 5/6] test: change log to debug Signed-off-by: John DeAngelis --- backend/ops_api/ops/services/message_bus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/ops_api/ops/services/message_bus.py b/backend/ops_api/ops/services/message_bus.py index 0a6a11f0f5..d580a89857 100644 --- a/backend/ops_api/ops/services/message_bus.py +++ b/backend/ops_api/ops/services/message_bus.py @@ -27,7 +27,7 @@ def handle(self): for event in self.published_events: ops_signal = signal(event.event_type.name) ops_signal.send(event, session=current_app.db_session) - logger.info(f"Handling event {event}") + logger.debug(f"Handling event {event}") def subscribe(self, event_type: OpsEventType, callback: callable): """ From 19da16eaca8b4c62fe9e1d36ced73a454a15a037 Mon Sep 17 00:00:00 2001 From: John DeAngelis Date: Thu, 12 Dec 2024 10:01:33 -0500 Subject: [PATCH 6/6] feat: add some cleanup methods to the MessageBus Signed-off-by: John DeAngelis --- backend/ops_api/ops/__init__.py | 1 + backend/ops_api/ops/services/can_messages.py | 2 +- backend/ops_api/ops/services/message_bus.py | 12 ++++++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/backend/ops_api/ops/__init__.py b/backend/ops_api/ops/__init__.py index d1ee0f5631..0f1746a632 100644 --- a/backend/ops_api/ops/__init__.py +++ b/backend/ops_api/ops/__init__.py @@ -95,6 +95,7 @@ def shutdown_session(exception=None): def teardown_request(exception=None): if hasattr(request, "message_bus"): request.message_bus.handle() + request.message_bus.cleanup() @event.listens_for(db_session, "before_commit") def receive_before_commit(session: Session): diff --git a/backend/ops_api/ops/services/can_messages.py b/backend/ops_api/ops/services/can_messages.py index 805983cae2..bb6e324ca2 100644 --- a/backend/ops_api/ops/services/can_messages.py +++ b/backend/ops_api/ops/services/can_messages.py @@ -8,5 +8,5 @@ def can_history_trigger( event: OpsEvent, session: Session, ): - logger.debug(f"Handling event {event}") + logger.debug(f"Handling event {event.event_type} with details: {event.event_details}") assert session is not None diff --git a/backend/ops_api/ops/services/message_bus.py b/backend/ops_api/ops/services/message_bus.py index d580a89857..edc61267aa 100644 --- a/backend/ops_api/ops/services/message_bus.py +++ b/backend/ops_api/ops/services/message_bus.py @@ -19,6 +19,7 @@ class MessageBus: """ published_events: List[OpsEvent] = [] + known_callbacks = [] def handle(self): """ @@ -28,6 +29,7 @@ def handle(self): ops_signal = signal(event.event_type.name) ops_signal.send(event, session=current_app.db_session) logger.debug(f"Handling event {event}") + self.published_events.clear() def subscribe(self, event_type: OpsEventType, callback: callable): """ @@ -51,3 +53,13 @@ def publish(self, event_type: OpsEventType, event: OpsEvent): """ logger.debug(f"Publishing event {event_type} with details {event}") self.published_events.append(event) + + def cleanup(self): + """ + Clean up all subscriptions and published events. + """ + for callback in self.known_callbacks: + ops_signal = signal(callback) + ops_signal.disconnect(callback) + self.published_events.clear() + self.known_callbacks.clear()