Skip to content

Commit

Permalink
Merge pull request #164 from bento-platform/feat/events/event-timestamp
Browse files Browse the repository at this point in the history
feat(events): add timestamp to event data + modernize code
  • Loading branch information
davidlougheed authored Dec 6, 2023
2 parents 8b51054 + 3dc8c93 commit 7be0efe
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
26 changes: 15 additions & 11 deletions bento_lib/events/_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import redis
import uuid

from typing import Callable, Dict, Optional, Union
from datetime import datetime, timezone
from typing import Callable

__all__ = ["EventBus"]

Expand All @@ -13,7 +14,7 @@
DATA_TYPE_CHANNEL_TPL = "bento.data_type.{}"

# Types
Serializable = Union[bool, float, int, str, dict, list, tuple, None]
Serializable = bool | float | int | str | dict | list | tuple | None

# Redis
default_connection_data = {"host": "localhost", "port": 6379}
Expand All @@ -38,7 +39,7 @@ def __init__(self, allow_fake: bool = False, **kwargs):
:param allow_fake: Whether to allow for "fake" connections, i.e. no true connection to Redis.
"""

self._rc: Optional[redis.Redis] = None
self._rc: redis.Redis | None = None

logger = kwargs.pop("logger", None) or logging.getLogger(__name__)
self._logger: logging.Logger = logger
Expand All @@ -54,13 +55,13 @@ def __init__(self, allow_fake: bool = False, **kwargs):
raise e
logger.warning(f"Starting event bus in 'fake' mode (tried connection data: {connection_data})")

self._ps: Optional[redis.client.PubSub] = None
self._ps: redis.client.PubSub | None = None

self._ps_handlers: dict[str, Callable[[dict], None]] = {}
self._event_thread: Optional[redis.client.PubSubWorkerThread] = None
self._event_thread: redis.client.PubSubWorkerThread | None = None

self._service_event_types: Dict[str, dict] = {}
self._data_type_event_types: Dict[str, dict] = {}
self._service_event_types: dict[str, dict] = {}
self._data_type_event_types: dict[str, dict] = {}

@staticmethod
def _callback_deserialize(callback: Callable[[dict], None]) -> Callable[[dict], None]:
Expand Down Expand Up @@ -124,7 +125,10 @@ def stop_event_loop(self) -> None:
@staticmethod
def _make_event(event_type: str, event_data, attrs: dict) -> str:
return json.dumps({
"id": str(uuid.uuid4()), # So other services can decide how to claim specific events or whatever
# Generate a random ID, so other services can decide how to claim specific events or whatever:
"id": str(uuid.uuid4()),
# Events can arrive out-of-order; we can put them back in order using this generation-time timestamp:
"ts": datetime.now(timezone.utc).isoformat(),
"type": event_type.lower(),
"data": event_data,
**attrs
Expand Down Expand Up @@ -163,21 +167,21 @@ def register_data_type_event_type(self, event_type: str, event_schema: dict) ->
"""
return self._add_schema(self._data_type_event_types, event_type, event_schema)

def get_service_event_types(self) -> Dict[str, dict]:
def get_service_event_types(self) -> dict[str, dict]:
"""
:return: A dictionary of registered service event types and their associated schemas.
"""
return {**self._service_event_types}

def get_data_type_event_types(self) -> Dict[str, dict]:
def get_data_type_event_types(self) -> dict[str, dict]:
"""
:return: A dictionary of registered data type event types and their associated schemas.
"""
return {**self._data_type_event_types}

def _publish_event(
self,
event_types: Dict[str, dict],
event_types: dict[str, dict],
channel: str,
event_type: str,
event_data: Serializable,
Expand Down
2 changes: 1 addition & 1 deletion bento_lib/package.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = bento_lib
version = 11.0.0a3
version = 11.0.0a4
authors = David Lougheed, Paul Pillot
author_emails = [email protected], [email protected]
2 changes: 2 additions & 0 deletions tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def handle_service_event(message):
assert event["service_artifact"] == TEST_SERVICE
assert event["type"] == TEST_SERVICE_EVENT
assert event["data"] == TEST_EVENT_BODY
assert event["id"]
assert event["ts"]

event_bus.add_handler(bento_lib.events.ALL_SERVICE_EVENTS, handle_service_event)
event_bus.start_event_loop()
Expand Down

0 comments on commit 7be0efe

Please sign in to comment.