-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #3211 from HHS/OPS-3199/initial-message-bus
Ops 3199/initial message bus
- Loading branch information
Showing
7 changed files
with
147 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from loguru import logger | ||
from sqlalchemy.orm import Session | ||
|
||
from models import OpsEvent | ||
|
||
|
||
def can_history_trigger( | ||
event: OpsEvent, | ||
session: Session, | ||
): | ||
logger.debug(f"Handling event {event.event_type} with details: {event.event_details}") | ||
assert session is not None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
from typing import List | ||
|
||
from blinker import signal | ||
from flask import current_app | ||
from loguru import logger | ||
|
||
from models import OpsEvent, OpsEventType | ||
|
||
|
||
class MessageBus: | ||
""" | ||
A simple message bus implementation that allows for publishing and subscribing to events. | ||
This message bus implementation uses the Blinker library to handle event signals. | ||
This message bus assumes it exists in a single thread and is meant to be used within the context of a single request. | ||
Published events are stored in a list and are handled when the handle method is called (usually at the end of a request). | ||
""" | ||
|
||
published_events: List[OpsEvent] = [] | ||
known_callbacks = [] | ||
|
||
def handle(self): | ||
""" | ||
Handle all published events by calling the appropriate handlers for each event type. | ||
""" | ||
for event in self.published_events: | ||
ops_signal = signal(event.event_type.name) | ||
ops_signal.send(event, session=current_app.db_session) | ||
logger.debug(f"Handling event {event}") | ||
self.published_events.clear() | ||
|
||
def subscribe(self, event_type: OpsEventType, callback: callable): | ||
""" | ||
Subscribe to an event type with a callback function. | ||
:param event_type: The event type to subscribe to. | ||
:param callback: The callback function to call when the event is published. | ||
""" | ||
logger.debug(f"Subscribing to {event_type} with callback {callback}") | ||
ops_signal = signal(event_type.name) | ||
ops_signal.connect(callback) | ||
|
||
def publish(self, event_type: OpsEventType, event: OpsEvent): | ||
""" | ||
Publish an event with the given event type and details. | ||
N.B. This method does not handle the event immediately. The event will be handled when the handle method is called. | ||
:param event_type: The event type to publish. | ||
:param event: The event details to publish. | ||
""" | ||
logger.debug(f"Publishing event {event_type} with details {event}") | ||
self.published_events.append(event) | ||
|
||
def cleanup(self): | ||
""" | ||
Clean up all subscriptions and published events. | ||
""" | ||
for callback in self.known_callbacks: | ||
ops_signal = signal(callback) | ||
ops_signal.disconnect(callback) | ||
self.published_events.clear() | ||
self.known_callbacks.clear() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import pytest | ||
|
||
from models import OpsEvent, OpsEventType | ||
from ops_api.ops.services.message_bus import MessageBus | ||
from ops_api.ops.utils.events import OpsEventHandler | ||
|
||
|
||
@pytest.mark.usefixtures("app_ctx") | ||
def test_message_bus_handle(loaded_db, mocker): | ||
mock_callback_1 = mocker.MagicMock() | ||
mock_callback_2 = mocker.MagicMock() | ||
mock_callback_3 = mocker.MagicMock() | ||
|
||
message_bus = MessageBus() | ||
message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_1) | ||
message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_2) | ||
message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_3) | ||
|
||
message_bus.publish(OpsEventType.CREATE_NEW_CAN, OpsEvent(event_type=OpsEventType.CREATE_NEW_CAN)) | ||
|
||
message_bus.handle() | ||
|
||
mock_callback_1.assert_called() | ||
mock_callback_2.assert_called() | ||
mock_callback_3.assert_called() | ||
|
||
|
||
@pytest.mark.usefixtures("app_ctx") | ||
def test_message_bus_create_cans(loaded_db, mocker): | ||
mock_callback_1 = mocker.MagicMock() | ||
mock_callback_2 = mocker.MagicMock() | ||
mock_callback_3 = mocker.MagicMock() | ||
|
||
message_bus = MessageBus() | ||
|
||
# patch the request object | ||
r_patch = mocker.patch("ops_api.ops.utils.events.request") | ||
r_patch.message_bus = message_bus | ||
r_patch.message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_1) | ||
r_patch.message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_2) | ||
r_patch.message_bus.subscribe(OpsEventType.CREATE_NEW_CAN, mock_callback_3) | ||
|
||
oeh = OpsEventHandler(OpsEventType.CREATE_NEW_CAN) | ||
oeh.__exit__(None, None, None) | ||
|
||
message_bus.handle() | ||
|
||
mock_callback_1.assert_called() | ||
mock_callback_2.assert_called() | ||
mock_callback_3.assert_called() |