-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BLOCKED] datastreams: implement asynchronous writer #140
Conversation
63e2d92
to
4e168c5
Compare
"""Write an entry. | ||
|
||
:param writer: writer configuration as accepted by the WriterFactory. | ||
:param entry: dictionary, StreamEntry is not serializable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note this. StreamEntry
is not serializable so a bit of juggling is needed.
|
||
:param writer: writer to use. | ||
""" | ||
self._writer = writer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accepts a writer configuration, we cannot accept the object because that would make it difficult to pass via config file/definition. Would only work for a programmatic API.
This config will be sent to the WriterFactory
later on. I'm not convinced about the name of this parameter. Alternatives, which I dont fully like either are:
writer_conf
,writer_config
,w_conf
...config
,conf
...
Everything is too generic....
|
||
|
||
@shared_task(ignore_result=True) | ||
def write_entry(writer, entry): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def write_entry(writer, entry): | |
def write_entry(writer_config, entry): |
Since it's passed to WriterFactory.create(config=...)
@@ -110,3 +111,21 @@ def write(self, stream_entry, *args, **kwargs): | |||
yaml.safe_dump([stream_entry.entry], file) | |||
|
|||
return stream_entry | |||
|
|||
|
|||
class AsyncWriter(BaseWriter): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I understand, this approach is to use this writer as a container for other writer configs.
I was thinking if it would be possible to have a delay
kwarg in BaseWriter
, which if True
would achieve the same result... But from what I see that's pretty difficult to handle, given __init__
overrides, etc.
My fear is that we end up with a very complex datastream config structure, where especially in YAML it's easy to make indentation mistakes, which as a result would eventually lead to more difficult UX/DX.
Other approaches to discuss:
- Move this up to the DataStream, i.e. in
BaseDataStream.write(...)
, do something likeif writer.delay: ...call task...
- Preserve the original writer config/init-params when initializing writers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. However, changes are not trivial.
In both cases we would need to implement a way to either save the configuration (type + args) or make writers JSON serializable. Otherwise, we won't be able to send them to the task. IMO this fits more at data stream level, i.e. the writers writes, but is the data stream who decides how/when.
Rebased and included in #357 |
closes #139