Skip to content

Commit

Permalink
event bus: initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Pablo Panero committed Feb 11, 2022
1 parent 76274aa commit aaac2a3
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 2 deletions.
6 changes: 5 additions & 1 deletion invenio_records_resources/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2020 CERN.
# Copyright (C) 2020-2022 CERN.
# Copyright (C) 2020 Northwestern University.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
Expand All @@ -16,3 +16,7 @@
SITE_UI_URL = "https://127.0.0.1:5000"

SITE_API_URL = "https://127.0.0.1:5000/api"

RECORDS_RESOURCES_EVENTS_HANDLERS = {}

RECORDS_RESOURCES_EVENTS_QUEUE = "events"
17 changes: 17 additions & 0 deletions invenio_records_resources/services/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Module for event driven actions support."""

from .bus import EventBus
from .events import Event

__all__ = (
"Event",
"EventBus"
)
43 changes: 43 additions & 0 deletions invenio_records_resources/services/events/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Events bus module."""

from pickle import dumps, loads

from flask import current_app
from invenio_queues.proxies import current_queues


class EventBus:
"""Event bus."""

def __init__(self, queue_name=None):
"""Constructor."""
self._queue_name = queue_name or \
current_app.config["RECORDS_RESOURCES_EVENTS_QUEUE"]
self._queue = None

for name, queue in current_queues.queues.items():
if name == self._queue_name:
self._queue = queue
break

def publish(self, event):
"""Publish an event to the bus queue."""
return self._queue.publish([dumps(event)])

def consume(self):
"""Consume an event from the bus queue."""
for event in self._queue.consume(): # consume() returns a generator
yield loads(event)

def active_consumer(self):
"""Returns a consumer that stays open."""
# TODO: see usage in handlers.py
pass
40 changes: 40 additions & 0 deletions invenio_records_resources/services/events/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Events module."""

from dataclasses import dataclass
from datetime import datetime
from typing import ClassVar


@dataclass
class Event:
"""Base event."""

created: datetime
type: str
action: str
handling_key: str


@dataclass
class RecordEvent(Event):
"""Record related events."""

recid: str
type: ClassVar[str] = "RECORD"
handling_key: ClassVar[str] = "RECORD"


@dataclass
class RecordCreatedEvent(RecordEvent):
"""Record related events."""

action: ClassVar[str] = "PUBLISHED"
handling_key: ClassVar[str] = f"{RecordEvent.type}.{action}"
81 changes: 81 additions & 0 deletions invenio_records_resources/services/events/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Event handlers module."""

from datetime import datetime
from dataclasses import asdict

from celery import shared_task
from flask import current_app

from .bus import EventBus


def _handlers_for_key(key):
"""Returns the handlers for a key."""
config_handlers = current_app.config["RECORDS_RESOURCES_EVENTS_HANDLERS"]
keys_parts = key.split(".")

event_handlers = []
curr_key = ""
for part in keys_parts:
curr_key = f"{curr_key}.{part}"
try:
event_handlers.expand(config_handlers[curr_key])
except KeyError:
current_app.logger.warning(f"No handler for key {curr_key}")

return event_handlers


def _handle_event(event, handler=None):
"""Executes the handlers configured for an event."""
handlers = _handlers_for_key(event.handling_key)

for handler in handlers:
func = handler
async_ = True
if isinstance(handler, tuple):
func = handler[0]
async_ = handler[1]

if async_:
func.delay(**asdict(event))
else:
func(**asdict(event))

# audit logging
current_app.logger.info(
f"{event.type}-{event.action} handled successfully."
)


@shared_task(ignore_result=True)
def handle_events(queue_name=None, max_events=1000, ttl=300):
"""Handle events queue.
:param max_events: maximum number of events to process by the task.
:param ttl: time to live (in seconds) for the task.
"""
bus = EventBus(queue_name)
start = datetime.timestamp(datetime.now())
end = start
spawn_new = False
with bus.active_consumer() as consumer:
while max_events > 0 and (start + ttl) > end:
spawn_new = False
event = consumer.consume() # blocking
_handle_event(event) # execute all handlers
end = datetime.timestamp(datetime.now())
spawn_new = True

if spawn_new:
handle_events.delay(
queue_name=queue_name, max_events=max_events, ttl=ttl
)
18 changes: 18 additions & 0 deletions invenio_records_resources/services/uow.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def on_commit(self, uow):

from invenio_db import db

from .events import EventBus


#
# Unit of work operations
Expand Down Expand Up @@ -199,6 +201,21 @@ def on_post_commit(self, uow):
self._celery_task.delay(*self._args, **self._kwargs)


class EventOp(Operation):
"""A task to send an event.
All events will be sent after the commit phase.
"""

def __init__(self, event, *args, **kwargs):
"""Constructor."""
self._event = event

def on_post_commit(self, uow):
"""Publish the event to the bus."""
uow._event_bus.publish(self._event)


#
# Unit of work context manager
#
Expand All @@ -215,6 +232,7 @@ def __init__(self, session=None):
"""Initialize unit of work context."""
self._session = session or db.session
self._operations = []
self._event_bus = EventBus()
self._dirty = False

def __enter__(self):
Expand Down
2 changes: 1 addition & 1 deletion run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trap cleanup EXIT

python -m check_manifest --ignore ".*-requirements.txt"
python -m sphinx.cmd.build -qnNW docs docs/_build/html
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --env)"
eval "$(docker-services-cli up --db ${DB:-postgresql} --search ${SEARCH:-elasticsearch} --cache ${CACHE:-redis} --mq ${MQ:-rabbitmq} --env)"
python -m pytest $@
tests_exit_code=$?
python -m sphinx.cmd.build -qnNW -b doctest docs docs/_build/doctest
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"invenio-indexer>=1.2.1",
"invenio-jsonschemas>=1.1.3",
"invenio-pidstore>=1.2.2",
"invenio-queues>=1.0.0a4",
"invenio-records-permissions>=0.13.0,<0.14.0",
"invenio-records>=1.6.0",
"luqum>=0.11.0",
Expand Down
58 changes: 58 additions & 0 deletions tests/services/events/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Pytest configuration.
See https://pytest-invenio.readthedocs.io/ for documentation on which test
fixtures are available.
"""

import pytest
from datetime import timedelta
from kombu import Exchange

from invenio_records_resources.services.events.events import RecordEvent, \
RecordCreatedEvent


@pytest.fixture(scope="module")
def app_config(app_config):
"""Application configuration."""
# handlers
app_config["RECORDS_RESOURCES_EVENTS_HANDLERS"] = {
RecordEvent.handling_key: [],
RecordCreatedEvent.handling_key: [
# (sync_handler_task, True),
# (explicit_asyn_handler_task, False),
# implicit_asyn_handler_task,
],
}

# events queue
queue_name = "test-events"
exchange = Exchange(
queue=queue_name,
type="direct",
delivery_mode="persistent", # in-memory and disk
)

app_config["RECORDS_RESOURCES_EVENT_QUEUE"] = queue_name
app_config["QUEUES_DEFINITIONS"] = [
{"name": queue_name, "exchange": exchange}
]

# celery config
app_config["CELERY_ACCEPT_CONTENT"] = ["json", "msgpack", "yaml", "pickle"]
app_config["CELERYBEAT_SCHEDULE"] = {
'event_handling': {
'task': 'invenio_records_resources.services.events.handle_events',
'schedule': timedelta(minutes=5),
},
}

return app_config
25 changes: 25 additions & 0 deletions tests/services/events/test_bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 CERN.
#
# Invenio-Records-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
# details.

"""Event bus test."""

from datetime import datetime
from time import sleep

from invenio_records_resources.services.events import EventBus
from invenio_records_resources.services.events.events import RecordCreatedEvent


def test_bus_publish_consume(app):
bus = EventBus("test-events")
event = RecordCreatedEvent(created=datetime.now(), recid="12345-abcde")

bus.publish(event)
sleep(10)
consumed_event = bus.consume()
assert event == next(consumed_event)

0 comments on commit aaac2a3

Please sign in to comment.