-
-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add message history and retransmission #3199
base: main
Are you sure you want to change the base?
Conversation
I discovered a potential edge case where the client can get out of sync. Converted to draft until I can investigate. Also, I got an email about a failed test with Python 3.9. It looks like it's from before the tests are even run. Not sure what to do about this. |
Thanks for starting this pull request, @afullerx! We're looking forward to reviewing your implementation once it's ready. Regarding the failing test. Sometimes one of the "startup tests" fails because of some caching that takes longer than expected. This can safely be ignored. Next time the test will probably pass. |
OK, I believe this pull request is good to go. The desync I was seeing was caused by two new issues I discovered in the current codebase. One is a race condition when multiple clients are connecting to an auto-index page. The other is due to a gap in time between when the webpage is generated and when updates can be received. This could actually be fixed using the new message history, but I think it's best left for a future PR. I'll submit issues and/or pull requests once this one is done. |
Regarding the pre-existing issue with missed updates due to a gap between page render and websocket connection. I realized I could fix it by just including a clients initial |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I finally had a chance to take a look into your code. Amazing work!
Just a few thoughts:
- Somehow a retransmission ID is added to every message from the message history, which is then broadcasted to all clients, where it is checked against the expected retransmission ID:
for i in range(start, len(self._history)): args = self._history[i][2] args[1]['retransmit_id'] = retransmit_id self.enqueue_message('retransmit', args, '')
This seems like a lot of overhead. Can't we pass the socket ID of the handshaking client toif ( data.message_id <= window.last_message_id || ("retransmit_id" in data && data.retransmit_id != window.retransmitId) ) { return; }
synchronize()
and send a custom "retransmit" message containing all missed messages? This way we wouldn't need to manipulate messages and filter them on the client. - What do you think about additional CPU and memory consumption? Now that we keep every message for at least 30 seconds, this can accumulate quickly when, e.g., streaming 3D data. Should we make the history length configurable?
- We should check how the new retransmission works with
ui.scene
andui.leaflet
, because they use a separate "init" message for initialization. (Maybe we can solve their initialization problem more elegantly by introducing anon_handshake
method toui.element
that is called whenever a client handshakes... But that's probably out of scope of this pull request.) - Before merging, @rodja and I should check if it works seamlessly with NiceGUI On Air.
Thanks for the feedback. Good idea about bundling the retransmissions into a single special message. However, I didn't see any way to send a message directly to a client connected via Air. We can still get almost all the benefit, as other clients will only need to filter a single infrequent message instead of checking every message. I did think the history duration deserved a config option, but decided it wasn't my place to make that decision. I'll add a I'll also do some testing with |
After being short on time for a bit, I was finally able to implement the improvements. I should be able to push the changes in the next couple days after I do some final testing. |
I decided it's probably better to allow the user to configure the maximum number of history entries ( I did some profiling of the message handling overhead, and it seemed pretty negligible. For example, on average, calls to I realized the message history isn't needed to cover the initial connection for Since As far as I can tell, As a possible enhancement, when sync fails, instead of reloading the page, we could dump the entire state of the page (as we do on page render) and send it in a message. We would then just replace the element object with the up-to-date one. This is much faster and more seamless than a full page reload and, for a |
Regarding the enhancement I mentioned in my previous post, if we can do a full state resync without a page reload when While Ultimately, I'm not sure if this would work out or not, but I think there's enough merit in the idea that I should take some time to fully explore it. |
Oh, wow, this PR keeps on growing... But it is certainly a good idea to re-evaluate our options and to think about the best path forward, before spending more time with the implementation or even merging something that hinders us later. The special initialization of |
I decided that doing a full-state resync without reloading is going to be a no-go. I was able to get it working pretty well in most cases by having Anyway, I believe this PR is ready for review again. Some other improvements I made:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @afullerx, I finally found enough time and focus to review your pull request.
I made just a few minor changes:
- I made use of the
Message
type to simplify argument lists and type annotations a bit. - I think
_message_count
should always be increased when emitting a message. - Instead of ignoring a type error we can safely assert that
self._history
is notNone
. - I restructured the
sync
method in JavaScript using early exits and destructuring.
Apart from that, I have some more thoughts I'd like to clarify:
-
As far as I understand setting
message_buffer_max
to 0 disables the deque, which behaves differently than setmaxlen=0
? Or could we assume to always work with a deque, just sometimes with zero length? -
I thought about creating the deque in the initializer with a default length of 1000, and changing it in
loop()
according tomessage_buffer_max
. Themaxlen
attribute is readonly, but we could create a copy liked = deque(d, maxlen=...)
. But what would we do it the current deque already contains more messages than the newmaxlen
? -
> The front end now keeps a list of all its past socket IDs. This is then used by
synchronize()
to filter out messages intended for other targets.We should propable prune these socket IDs...
-
Maybe there is a better parameter name than
message_buffer_max
. Maybemessage_history_length
? -
In client.js we compare
msg.target
againstwindow.socket.id
. I think we can avoid sending sync messages to the wrong clients in the first place like this:await self._emit('sync', {...}, socket_ids[-1])
. -
You're adding
message_id
todata
and removing it again on the client. Couldn't this interfer with the other payload? Maybe it's better to keep this attribute separate, even if this would complicate the data structure of a history item once again.
I made additional changes to address some of the remaining concerns. Most of them are explained by my previous post and the commit messages. One additional change is that I realized that previous socket IDs only need to be kept by the client temporarily. Once the sync operation is complete, all previous socket IDs become irrelevant. As far as maintaining the length of the history using the I didn't change it so that the "sync" message is emitted directly to the client's socket ID because this didn't work with On Air in my previous testing |
Sorry, @afullerx, your pull request is not forgotten. I just didn't find the time and focus to dive into the details of this implementation once again, especially analyzing the issue with NiceGUI On Air. But it's still on my list. I'll be traveling over the next two weeks, so I hope to continue working on it by the end of September. |
I finally had another look into this pull request. It looks like we're almost good. I'm just experimenting with sending sync messages directly to the socket ID doing the handshake, so that we can remove the
It looks like we need to |
Thanks for revisiting this, @falkoschindler. I believe the The only thing that didn't work with On Air was emitting the sync message directly to the client using it's socket ID. I assumed this was because the On Air server wasn't properly forwarding messages targeted in this way. Even if this were remedied, I believe we would still need |
# Conflicts: # nicegui/static/nicegui.js
While discussing this pull request with @rodja, we decided to simplify the whole retransmission logic by excluding the shared auto-index page. We can include it later if we really want to. But for the moment we chose simplicity over completeness. This way every message is sent to one client only and we can simply keep it in the already existing message queue. A It still needs some testing with On Air and elements like |
Local tests with import random
import time
from nicegui import ui
@ui.page('/', reconnect_timeout=10.0)
def page():
log = ui.log()
ui.timer(1.0, lambda: log.push(f'{time.time():.0f}'))
scene = ui.scene()
ui.timer(1.0, lambda: scene.sphere().scale(0.5).move(random.random() - 0.5, random.random() - 0.5, random.random())) |
I forgot to handle the case when client reconnects too late and the message history isn't long enough. I'll add that tomorrow. |
Apparently, updates based on running methods like "update_grid" are broken: grid = ui.aggrid({'columnDefs': [{'field': 'name'}], 'rowData': []})
def update():
grid.options['rowData'].append({'name': 'Alice'})
grid.update()
ui.button('Update', on_click=update) The update message might be enqueued in a wrong place. But changing self.messages.append((self.client.id, self.next_message_id, time.time(), 'update', data)) to self.messages.insert(self._message_index, (self.client.id, self.next_message_id, time.time(), 'update', data)) didn't help immediately. |
Ah, inserting the update message is basically correct, but it messes up the order of message IDs. |
@rodja Tests are green, ready for review. |
This PR attempts to resolve #3143 by adding a message history to
outbox
and providing for the retransmission of missed messages in order to resynchronize the client's state during a reconnection. If this cannot be accomplished, a reload of the page is triggered. The goal of this is to prevent a connected client's state from ever being out of sync with the server.For the auto-index page, a history duration of 30 seconds was arbitrarily chosen. Since this value only determines when the UI is updated through resending messages instead of a page reload, the UI should stay properly synchronized regardless of this value.
For a
ui.page
, the history duration is computed based on the expected lifetime of theclient
object. Currently, with the defaultreconnect_timeout = 3.0
, this is a max of 9 seconds. With this change, a re-evaluation of this default could be warranted. Now that UI state can be resynchronized indefinitely, discarding the user's page after only 5-9s of disconnection seems premature. See #3143 (comment) for more.Open tasks (October 24, 2024):
message_history_length
isn't being used