From 7729e744e40591cb2711cab7932938290d285a78 Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Wed, 4 Sep 2024 02:17:58 +0300 Subject: [PATCH] Fix client event buffering (#323) I rushed a little with removing generics. We need to keep separate buffer for each event, otherwise events of different types encoded in Bytes will be mixed together. This commit is partial revert of e98a940a9f746cc5c3a2fd8bbb7a10506476b125. --- CHANGELOG.md | 4 + src/client/events.rs | 101 +++++++++++---------- src/core/event_registry/server_event.rs | 116 +++++++++++++++++++----- tests/server_event.rs | 82 ++++++++++++++--- 4 files changed, 218 insertions(+), 85 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9505b7cc..8b06db77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fix client event buffering. + ## [0.28.0] - 2024-09-03 ### Added diff --git a/src/client/events.rs b/src/client/events.rs index 791571cc..0ea610b5 100644 --- a/src/client/events.rs +++ b/src/client/events.rs @@ -2,7 +2,7 @@ use super::{ClientPlugin, ClientSet, ServerInitTick}; use crate::core::{ common_conditions::*, ctx::{ClientReceiveCtx, ClientSendCtx}, - event_registry::{server_event::ServerEventQueue, EventRegistry}, + event_registry::EventRegistry, replicon_client::RepliconClient, server_entity_map::ServerEntityMap, }; @@ -16,26 +16,25 @@ pub struct ClientEventsPlugin; impl Plugin for ClientEventsPlugin { fn build(&self, app: &mut App) { - app.init_resource::() - .add_systems( - PreUpdate, - ( - Self::reset.in_set(ClientSet::ResetEvents), - Self::receive - .after(ClientPlugin::receive_replication) - .in_set(ClientSet::Receive) - .run_if(client_connected), - ), + app.add_systems( + PreUpdate, + ( + Self::reset.in_set(ClientSet::ResetEvents), + Self::receive + .after(ClientPlugin::receive_replication) + .in_set(ClientSet::Receive) + .run_if(client_connected), + ), + ) + .add_systems( + PostUpdate, + ( + Self::send.run_if(client_connected), + Self::resend_locally.run_if(has_authority), ) - .add_systems( - PostUpdate, - ( - Self::send.run_if(client_connected), - Self::resend_locally.run_if(has_authority), - ) - .chain() - .in_set(ClientSet::Send), - ); + .chain() + .in_set(ClientSet::Send), + ); } } @@ -84,31 +83,37 @@ impl ClientEventsPlugin { world.resource_scope(|world, registry: Mut| { world.resource_scope(|world, entity_map: Mut| { world.resource_scope(|world, event_registry: Mut| { - world.resource_scope(|world, mut queue: Mut| { - let init_tick = **world.resource::(); - let mut ctx = ClientReceiveCtx { - registry: ®istry.read(), - entity_map: &entity_map, - invalid_entities: Vec::new(), - }; + let init_tick = **world.resource::(); + let mut ctx = ClientReceiveCtx { + registry: ®istry.read(), + entity_map: &entity_map, + invalid_entities: Vec::new(), + }; - for event_data in event_registry.iter_server_events() { - let events = world + let world_cell = world.as_unsafe_world_cell(); + for event_data in event_registry.iter_server_events() { + // SAFETY: both resources mutably borrowed uniquely. + let (events, queue) = unsafe { + let events = world_cell .get_resource_mut_by_id(event_data.events_id()) .expect("events shouldn't be removed"); + let queue = world_cell + .get_resource_mut_by_id(event_data.queue_id()) + .expect("event queue shouldn't be removed"); + (events, queue) + }; - // SAFETY: passed pointers were obtained using this event data. - unsafe { - event_data.receive( - &mut ctx, - events.into_inner(), - &mut queue, - &mut client, - init_tick, - ) - }; - } - }); + // SAFETY: passed pointers were obtained using this event data. + unsafe { + event_data.receive( + &mut ctx, + events.into_inner(), + queue.into_inner(), + &mut client, + init_tick, + ) + }; + } }); }); }); @@ -149,13 +154,13 @@ impl ClientEventsPlugin { unsafe { event_data.reset(events.into_inner()) }; } - let mut queue = world.resource_mut::(); - if !queue.is_empty() { - warn!( - "discarding {} queued server events due to a disconnect", - queue.values_len() - ); - queue.clear(); + for event_data in event_registry.iter_server_events() { + let queue = world + .get_resource_mut_by_id(event_data.queue_id()) + .expect("event queue shouldn't be removed"); + + // SAFETY: passed pointer was obtained using this event data. + unsafe { event_data.reset(queue.into_inner()) }; } }); } diff --git a/src/core/event_registry/server_event.rs b/src/core/event_registry/server_event.rs index 9d5e44a9..d6cbeed2 100644 --- a/src/core/event_registry/server_event.rs +++ b/src/core/event_registry/server_event.rs @@ -1,6 +1,7 @@ use std::{ any::{self, TypeId}, io::Cursor, + marker::PhantomData, mem, }; @@ -153,7 +154,9 @@ impl ServerEventAppExt for App { ) -> &mut Self { debug!("registering server event `{}`", any::type_name::()); - self.add_event::().add_event::>(); + self.add_event::() + .add_event::>() + .init_resource::>(); let channel_id = self .world_mut() @@ -212,12 +215,16 @@ pub(crate) struct ServerEvent { /// ID of [`Events>`]. server_events_id: ComponentId, + /// ID of [`ServerEventQueue`]. + queue_id: ComponentId, + /// Used channel. channel_id: u8, send: SendFn, receive: ReceiveFn, resend_locally: ResendLocallyFn, + reset: ResetFn, serialize: unsafe fn(), deserialize: unsafe fn(), } @@ -243,6 +250,14 @@ impl ServerEvent { any::type_name::>() ) }); + let queue_id = components + .resource_id::>() + .unwrap_or_else(|| { + panic!( + "resource `{}` should be previously inserted", + any::type_name::>() + ) + }); // SAFETY: these functions won't be called until the type is restored. Self { @@ -251,10 +266,12 @@ impl ServerEvent { independent: false, events_id, server_events_id, + queue_id, channel_id, send: send::, receive: receive::, resend_locally: resend_locally::, + reset: reset::, serialize: unsafe { mem::transmute::, unsafe fn()>(serialize) }, deserialize: unsafe { mem::transmute::, unsafe fn()>(deserialize) }, } @@ -268,6 +285,10 @@ impl ServerEvent { self.server_events_id } + pub(crate) fn queue_id(&self) -> ComponentId { + self.queue_id + } + pub(crate) fn is_independent(&self) -> bool { self.independent } @@ -304,13 +325,13 @@ impl ServerEvent { /// /// # Safety /// - /// The caller must ensure that `events` is [`Events`] + /// The caller must ensure that `events` is [`Events`], `queue` is [`ServerEventQueue`], /// and this instance was created for `E`. pub(crate) unsafe fn receive( &self, ctx: &mut ClientReceiveCtx, events: PtrMut, - queue: &mut ServerEventQueue, + queue: PtrMut, client: &mut RepliconClient, init_tick: RepliconTick, ) { @@ -327,6 +348,18 @@ impl ServerEvent { (self.resend_locally)(server_events, events); } + /// Clears queued events. + /// + /// We clear events while waiting for a connection to ensure clean reconnects. + /// + /// # Safety + /// + /// The caller must ensure that `queue` is [`Events`] + /// and this instance was created for `E`. + pub(crate) unsafe fn reset(&self, queue: PtrMut) { + (self.reset)(queue); + } + /// Serializes an event into a cursor. /// /// # Safety @@ -401,7 +434,7 @@ type ReceiveFn = unsafe fn( &ServerEvent, &mut ClientReceiveCtx, PtrMut, - &mut ServerEventQueue, + PtrMut, &mut RepliconClient, RepliconTick, ); @@ -409,6 +442,9 @@ type ReceiveFn = unsafe fn( /// Signature of server event resending functions. type ResendLocallyFn = unsafe fn(PtrMut, PtrMut); +/// Signature of server event reset functions. +type ResetFn = unsafe fn(PtrMut); + /// Typed version of [`ServerEvent::send`]. /// /// # Safety @@ -449,11 +485,12 @@ unsafe fn receive( event_data: &ServerEvent, ctx: &mut ClientReceiveCtx, events: PtrMut, - queue: &mut ServerEventQueue, + queue: PtrMut, client: &mut RepliconClient, init_tick: RepliconTick, ) { let events: &mut Events = events.deref_mut(); + let queue: &mut ServerEventQueue = queue.deref_mut(); while let Some((tick, message)) = queue.pop_if_le(init_tick) { let mut cursor = Cursor::new(&*message); @@ -537,6 +574,22 @@ unsafe fn resend_locally(server_events: PtrMut, events: PtrMut) { } } +/// Typed version of [`ServerEvent::reset`]. +/// +/// # Safety +/// +/// The caller must ensure that `queue` is [`Events`]. +unsafe fn reset(queue: PtrMut) { + let queue: &mut ServerEventQueue = queue.deref_mut(); + if !queue.is_empty() { + warn!( + "discarding {} queued server events due to a disconnect", + queue.values_len() + ); + } + queue.clear(); +} + /// Sends event `E` based on a mode. /// /// # Safety @@ -686,26 +739,6 @@ unsafe fn serialize_with_tick( } } -/// Stores all received events from server that arrived earlier then replication message with their tick. -/// -/// Stores data sorted by ticks and maintains order of arrival. -/// Needed to ensure that when an event is triggered, all the data that it affects or references already exists. -#[derive(Resource, Deref, DerefMut, Default)] -pub(crate) struct ServerEventQueue(ListOrderedMultimap); - -impl ServerEventQueue { - /// Pops the next event that is at least as old as the specified replicon tick. - pub(crate) fn pop_if_le(&mut self, init_tick: RepliconTick) -> Option<(RepliconTick, Bytes)> { - let (tick, _) = self.0.front()?; - if *tick > init_tick { - return None; - } - self.0 - .pop_front() - .map(|(tick, message)| (tick.into_owned(), message)) - } -} - /// Cached message for use in [`serialize_with`]. struct SerializedMessage { init_tick: RepliconTick, @@ -734,6 +767,39 @@ pub enum SendMode { Direct(ClientId), } +/// Stores all received events from server that arrived earlier then replication message with their tick. +/// +/// Stores data sorted by ticks and maintains order of arrival. +/// Needed to ensure that when an event is triggered, all the data that it affects or references already exists. +#[derive(Resource, Deref, DerefMut)] +struct ServerEventQueue { + #[deref] + list: ListOrderedMultimap, + marker: PhantomData, +} + +impl ServerEventQueue { + /// Pops the next event that is at least as old as the specified replicon tick. + fn pop_if_le(&mut self, init_tick: RepliconTick) -> Option<(RepliconTick, Bytes)> { + let (tick, _) = self.list.front()?; + if *tick > init_tick { + return None; + } + self.list + .pop_front() + .map(|(tick, message)| (tick.into_owned(), message)) + } +} + +impl Default for ServerEventQueue { + fn default() -> Self { + Self { + list: Default::default(), + marker: PhantomData, + } + } +} + /// Default event serialization function. pub fn default_serialize( _ctx: &mut ServerSendCtx, diff --git a/tests/server_event.rs b/tests/server_event.rs index 65c691f6..757a3db2 100644 --- a/tests/server_event.rs +++ b/tests/server_event.rs @@ -67,7 +67,7 @@ fn sending_receiving_and_mapping() { ..Default::default() }), )) - .add_mapped_server_event::(ChannelKind::Ordered); + .add_mapped_server_event::(ChannelKind::Ordered); } server_app.connect_client(&mut client_app); @@ -81,7 +81,7 @@ fn sending_receiving_and_mapping() { server_app.world_mut().send_event(ToClients { mode: SendMode::Broadcast, - event: MappedEvent(server_entity), + event: EntityEvent(server_entity), }); server_app.update(); @@ -90,7 +90,7 @@ fn sending_receiving_and_mapping() { let mapped_entities: Vec<_> = client_app .world_mut() - .resource_mut::>() + .resource_mut::>() .drain() .map(|event| event.0) .collect(); @@ -255,7 +255,7 @@ fn event_queue_and_mapping() { ..Default::default() }), )) - .add_server_event::(ChannelKind::Ordered); + .add_server_event::(ChannelKind::Ordered); } server_app.connect_client(&mut client_app); @@ -279,14 +279,14 @@ fn event_queue_and_mapping() { *init_tick = Default::default(); server_app.world_mut().send_event(ToClients { mode: SendMode::Broadcast, - event: MappedEvent(server_entity), + event: EntityEvent(server_entity), }); server_app.update(); server_app.exchange_with_client(&mut client_app); client_app.update(); - let events = client_app.world().resource::>(); + let events = client_app.world().resource::>(); assert!(events.is_empty()); // Restore the init tick to receive the event. @@ -296,13 +296,74 @@ fn event_queue_and_mapping() { let mapped_entities: Vec<_> = client_app .world_mut() - .resource_mut::>() + .resource_mut::>() .drain() .map(|event| event.0) .collect(); assert_eq!(mapped_entities, [client_entity]); } +#[test] +fn multiple_event_queues() { + let mut server_app = App::new(); + let mut client_app = App::new(); + for app in [&mut server_app, &mut client_app] { + app.add_plugins(( + MinimalPlugins, + RepliconPlugins.set(ServerPlugin { + tick_policy: TickPolicy::EveryFrame, + ..Default::default() + }), + )) + .add_server_event::(ChannelKind::Ordered) + .add_server_event::(ChannelKind::Ordered); // Use as a regular event with a different serialization size. + } + + server_app.connect_client(&mut client_app); + + // Spawn entity to trigger world change. + server_app.world_mut().spawn(Replicated); + + server_app.update(); + server_app.exchange_with_client(&mut client_app); + client_app.update(); + server_app.exchange_with_client(&mut client_app); + + // Artificially reset the init tick to force the next received event to be queued. + let mut init_tick = client_app.world_mut().resource_mut::(); + let previous_tick = *init_tick; + *init_tick = Default::default(); + server_app.world_mut().send_event(ToClients { + mode: SendMode::Broadcast, + event: DummyEvent, + }); + server_app.world_mut().send_event(ToClients { + mode: SendMode::Broadcast, + event: EntityEvent(Entity::PLACEHOLDER), + }); + + server_app.update(); + server_app.exchange_with_client(&mut client_app); + client_app.update(); + + let events = client_app.world().resource::>(); + assert!(events.is_empty()); + + let mapped_events = client_app.world().resource::>(); + assert!(mapped_events.is_empty()); + + // Restore the init tick to receive the event. + *client_app.world_mut().resource_mut::() = previous_tick; + + client_app.update(); + + assert_eq!(client_app.world().resource::>().len(), 1); + assert_eq!( + client_app.world().resource::>().len(), + 1 + ); +} + #[test] fn independent() { let mut server_app = App::new(); @@ -494,12 +555,9 @@ fn different_ticks() { struct DummyEvent; #[derive(Deserialize, Event, Serialize)] -struct IndependentDummyEvent; - -#[derive(Deserialize, Event, Serialize)] -struct MappedEvent(Entity); +struct EntityEvent(Entity); -impl MapEntities for MappedEvent { +impl MapEntities for EntityEvent { fn map_entities(&mut self, entity_mapper: &mut T) { self.0 = entity_mapper.map_entity(self.0); }