Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC]: event bus implementation #300

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ppanero
Copy link
Member

@ppanero ppanero commented Feb 3, 2022

depends on inveniosoftware/rfcs#56

The events module might have a better place in an invenio-events module. Or even invenio-events-resources if we plan to make REST-API for UI handling of events (might tie to the new invenio-requests?).

more details can be found on the RFC

@ppanero ppanero force-pushed the messagebus branch 4 times, most recently from f1335f5 to 7a3ed62 Compare February 4, 2022 12:54
@ppanero ppanero changed the title [POC]: message bus implementation [POC]: event bus implementation Feb 4, 2022
@ppanero ppanero force-pushed the messagebus branch 4 times, most recently from 60fba01 to 0dba51b Compare February 11, 2022 10:29
{"name": queue_name, "exchange": exchange}
]

app_config["CELERY_ACCEPT_CONTENT"] = ["json", "msgpack", "yaml", "pickle"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slint the only way I found was to globally accept pickle.... which might be a problem...

event = RecordCreatedEvent(created=datetime.now(), recid="12345-abcde")

bus.publish(event)
sleep(10)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes fails, sometimes doesn't I think we are falling once again in the issues we had with ES (fixed now with .index.refresh()).

@ppanero ppanero force-pushed the messagebus branch 2 times, most recently from cbb97d8 to aaac2a3 Compare February 11, 2022 15:40
start = datetime.timestamp(datetime.now())
end = start
spawn_new = False
with bus.active_consumer() as consumer:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is to have one open consumer for the duration of the task. We could potentially consume() but it might become 1000 consumers, even if they are light objects I think it is safe to open an active one here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a specific need for this?

end = start
spawn_new = False
with bus.active_consumer() as consumer:
while max_events > 0 and (start + ttl) > end:
Copy link
Member Author

@ppanero ppanero Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find any configuration that would allow killing a consumer after a determined amount of events or time elapsed (in a similar fashion than uWSGI does). So I have implemented it a bit roughly for PoC purposes.

end = datetime.timestamp(datetime.now())
spawn_new = True

if spawn_new:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning,

  • If the worker died while consuming events spawn a new task, since there are events to be consumed, and is better not to wait.
  • Otherwise, it died while waiting for events, in which case is better to wait until celery beat spawns it again.

def app_config(app_config):
"""Application configuration."""
# handlers
app_config["RECORDS_RESOURCES_EVENTS_HANDLERS"] = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this configuration would go to invenio-events and be overwritten in invenio-app-rdm. It is here for demo purposes.

@max-moser
Copy link
Contributor

Summary of a brief recent discussion with Pablo about the event bus topic

The topic of the event bus is a reoccurring one.
Every now and then a new feature comes along that would benefit from a sort of event bus.
Right now, the new feature in question would be Invenio-Stats.

There already exist minimal implementations of event-bus-like constructs that
could potenitally be extended into a full-fledged event bus.
As such, coming up with a basic construct for an event bus shouldn't be too much of an issue - getting it right however would be.

Comparison with other concepts

Service components

Service components are responsible for performing certain actions during service methods (e.g. populating some metadata fields in records,
resolving referenced entities, etc.).
Probably some, but not all, hooks (like after publish) could be replaced by the event bus.

Units of work (UoW)

Units of work serve a different purpose, namely grouping together sets of actions into atomic units of work that either succeed of fail as a whole.
If anything, they could be used for registering an event emission operation on commit, as one way to generate events.

Downsides

Event-driven confusion

Invenio would become more event-driven, which it wasn't really before.
This could cause confusion among developers because it is not immediately apparent which pieces of code are executed during certain operations.

One way to counteract this downside would be to add a CLI command that parses the registered event handlers and prints them in a human-readable form, e.g.:

$ invenio events show-handlers
EVENT_NAME_1:
  Handler (async): invenio_event_bus.handlers.handle_event_1
  Handler (sync):  invenio_event_bus.handlers.handle_event_1_more

EVENT_NAME_2:
  ...

Note that it's easy to imagine funky ways of registering event handlers that wouldn't be picked up by this CLI command (e.g. the now deprecated @before_first_request).
However, if somebody writes (or uses) an Invenio extension that does something like that, we can assume that they know what they're doing anyway.

Ensuring that no events/handlers are forgotten or executed multiple times

One of the major difficulties here would be to make sure that no events are forgotten and/or that none of the registered handlers are executed less or
more often than they should be.
Celery, RabbitMQ and the other tools in use could already provide some help here, but I'm not familiar enough with them yet to say for sure.

If the tools don't provide the necessary means, I had the idea of a "journal" for the status of events/handlers that could be stored e.g. in the cache (Redis).
This could help us keep track of which event handlers have already finished their jobs and which would need to re-run in the event of a system crash.
Example entry:

{"event": NAME, "handler_status": {"handler_name_1": "started", "handler_name_2": "completed"}}

This is only a rough idea though and still needs more thought (e.g. what if the cache dies too).

Alternatives (for Invenio-Stats)

We could create yet another sort of minimal implementation (similar to the update propagation) where we add a UoW operation that
directly acts as stats event emitter on commit.

Further remarks

It's already evening and I should go home.
Here are some further remarks that I found again too late to really rework into this summary:

question: do we support multiple ways of running, or we assume handlers are run sequentially for an event?
IIRC, the current implementation might have an issue. It launches new celery tasks, which means that order might not be respected and we might have dirty reads.
However, tackling this kind of problems seems a bit overkill. What about documenting them, then choosing a compromise and adhering to it. For this, we need a list of use cases indeed.

Conclusion

The event bus would be an awesome feature to have, but needs some more investigation and proper design, as well as development time for getting it right.

I think we should create a list of "relevant dangerous scenarios" that we want to safeguard against, and come up with ideas of how to prevent them, or
recover from them.

This could also provide an opportunity to have a look at the state of (celery) tasks in Invenio in general.
AFAIK failures in (scheduled) tasks aren't really monitored or logged, but rather follow a kind of fire-and-forget philosophy.

Copy link
Contributor

@ntarocco ntarocco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job! Added my 2 cents here.
I would also move all this to flask-resources, if not to a specific module.


def publish(self, event):
"""Publish an event to the bus queue."""
return self._queue.publish([dumps(event)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make configurable the serialization obj and not hardcode pickle. Celery does the same. We might want to use simple JSON or msgpack instead.

class RecordCreatedEvent(RecordEvent):
"""Record related events."""

action: ClassVar[str] = "PUBLISHED"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if RecordCreatedEvent is always for published, not sure that the action field is useful?

Is the handling_key necessary? Isn't it the class name/import enough? Unless there could be multiple class events with the same handling_key.

async_ = handler[1]

if async_:
func.delay(**asdict(event))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to pass more params to the delay func: wouldn't it be better to always have a sync func to handle the event, and that it eventually spawns an async task?

def hander(...):
    my_async_handler.delay(....)

else:
func(**asdict(event))

# audit logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not really audit logging this one. I would suggest creating a dedicate logger for the events so it can be configured ad-hoc.


for name, queue in current_queues.queues.items():
if name == self._queue_name:
self._queue = queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the queue always be found, or can it exit the for loop and stay None?

start = datetime.timestamp(datetime.now())
end = start
spawn_new = False
with bus.active_consumer() as consumer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a specific need for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants