-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[POC]: event bus implementation #300
base: master
Are you sure you want to change the base?
Conversation
f1335f5
to
7a3ed62
Compare
60fba01
to
0dba51b
Compare
{"name": queue_name, "exchange": exchange} | ||
] | ||
|
||
app_config["CELERY_ACCEPT_CONTENT"] = ["json", "msgpack", "yaml", "pickle"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@slint the only way I found was to globally accept pickle.... which might be a problem...
event = RecordCreatedEvent(created=datetime.now(), recid="12345-abcde") | ||
|
||
bus.publish(event) | ||
sleep(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes fails, sometimes doesn't I think we are falling once again in the issues we had with ES (fixed now with .index.refresh()).
cbb97d8
to
aaac2a3
Compare
start = datetime.timestamp(datetime.now()) | ||
end = start | ||
spawn_new = False | ||
with bus.active_consumer() as consumer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is to have one open consumer for the duration of the task. We could potentially consume()
but it might become 1000 consumers, even if they are light objects I think it is safe to open an active one here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a specific need for this?
end = start | ||
spawn_new = False | ||
with bus.active_consumer() as consumer: | ||
while max_events > 0 and (start + ttl) > end: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not find any configuration that would allow killing a consumer after a determined amount of events or time elapsed (in a similar fashion than uWSGI does). So I have implemented it a bit roughly for PoC purposes.
end = datetime.timestamp(datetime.now()) | ||
spawn_new = True | ||
|
||
if spawn_new: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meaning,
- If the worker died while consuming events spawn a new task, since there are events to be consumed, and is better not to wait.
- Otherwise, it died while waiting for events, in which case is better to wait until celery beat spawns it again.
def app_config(app_config): | ||
"""Application configuration.""" | ||
# handlers | ||
app_config["RECORDS_RESOURCES_EVENTS_HANDLERS"] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this configuration would go to invenio-events
and be overwritten in invenio-app-rdm
. It is here for demo purposes.
Summary of a brief recent discussion with Pablo about the event bus topicThe topic of the event bus is a reoccurring one. There already exist minimal implementations of event-bus-like constructs that Comparison with other conceptsService componentsService components are responsible for performing certain actions during service methods (e.g. populating some metadata fields in records, Units of work (UoW)Units of work serve a different purpose, namely grouping together sets of actions into atomic units of work that either succeed of fail as a whole. DownsidesEvent-driven confusionInvenio would become more event-driven, which it wasn't really before. One way to counteract this downside would be to add a CLI command that parses the registered event handlers and prints them in a human-readable form, e.g.:
Note that it's easy to imagine funky ways of registering event handlers that wouldn't be picked up by this CLI command (e.g. the now deprecated Ensuring that no events/handlers are forgotten or executed multiple timesOne of the major difficulties here would be to make sure that no events are forgotten and/or that none of the registered handlers are executed less or If the tools don't provide the necessary means, I had the idea of a "journal" for the status of events/handlers that could be stored e.g. in the cache (Redis).
This is only a rough idea though and still needs more thought (e.g. what if the cache dies too). Alternatives (for Invenio-Stats)We could create yet another sort of minimal implementation (similar to the update propagation) where we add a UoW operation that Further remarksIt's already evening and I should go home. question: do we support multiple ways of running, or we assume handlers are run sequentially for an event? ConclusionThe event bus would be an awesome feature to have, but needs some more investigation and proper design, as well as development time for getting it right. I think we should create a list of "relevant dangerous scenarios" that we want to safeguard against, and come up with ideas of how to prevent them, or This could also provide an opportunity to have a look at the state of (celery) tasks in Invenio in general. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! Added my 2 cents here.
I would also move all this to flask-resources
, if not to a specific module.
|
||
def publish(self, event): | ||
"""Publish an event to the bus queue.""" | ||
return self._queue.publish([dumps(event)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make configurable the serialization obj and not hardcode pickle
. Celery does the same. We might want to use simple JSON or msgpack
instead.
class RecordCreatedEvent(RecordEvent): | ||
"""Record related events.""" | ||
|
||
action: ClassVar[str] = "PUBLISHED" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if RecordCreatedEvent
is always for published, not sure that the action
field is useful?
Is the handling_key
necessary? Isn't it the class name/import enough? Unless there could be multiple class events with the same handling_key
.
async_ = handler[1] | ||
|
||
if async_: | ||
func.delay(**asdict(event)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to pass more params to the delay
func: wouldn't it be better to always have a sync func to handle the event, and that it eventually spawns an async task?
def hander(...):
my_async_handler.delay(....)
else: | ||
func(**asdict(event)) | ||
|
||
# audit logging |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not really audit logging this one. I would suggest creating a dedicate logger for the events so it can be configured ad-hoc.
|
||
for name, queue in current_queues.queues.items(): | ||
if name == self._queue_name: | ||
self._queue = queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the queue always be found, or can it exit the for loop and stay None
?
start = datetime.timestamp(datetime.now()) | ||
end = start | ||
spawn_new = False | ||
with bus.active_consumer() as consumer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a specific need for this?
depends on inveniosoftware/rfcs#56
The
events
module might have a better place in aninvenio-events
module. Or eveninvenio-events-resources
if we plan to make REST-API for UI handling of events (might tie to the newinvenio-requests
?).more details can be found on the RFC