Skip to content

Commit

Permalink
Fix client event buffering (#323)
Browse files Browse the repository at this point in the history
I rushed a little with removing generics.
We need to keep separate buffer for each event, otherwise events of
different types encoded in Bytes will be mixed together.

This commit is partial revert of e98a940.
  • Loading branch information
Shatur authored Sep 3, 2024
1 parent 52a92fa commit 7729e74
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 85 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fix client event buffering.

## [0.28.0] - 2024-09-03

### Added
Expand Down
101 changes: 53 additions & 48 deletions src/client/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{ClientPlugin, ClientSet, ServerInitTick};
use crate::core::{
common_conditions::*,
ctx::{ClientReceiveCtx, ClientSendCtx},
event_registry::{server_event::ServerEventQueue, EventRegistry},
event_registry::EventRegistry,
replicon_client::RepliconClient,
server_entity_map::ServerEntityMap,
};
Expand All @@ -16,26 +16,25 @@ pub struct ClientEventsPlugin;

impl Plugin for ClientEventsPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<ServerEventQueue>()
.add_systems(
PreUpdate,
(
Self::reset.in_set(ClientSet::ResetEvents),
Self::receive
.after(ClientPlugin::receive_replication)
.in_set(ClientSet::Receive)
.run_if(client_connected),
),
app.add_systems(
PreUpdate,
(
Self::reset.in_set(ClientSet::ResetEvents),
Self::receive
.after(ClientPlugin::receive_replication)
.in_set(ClientSet::Receive)
.run_if(client_connected),
),
)
.add_systems(
PostUpdate,
(
Self::send.run_if(client_connected),
Self::resend_locally.run_if(has_authority),
)
.add_systems(
PostUpdate,
(
Self::send.run_if(client_connected),
Self::resend_locally.run_if(has_authority),
)
.chain()
.in_set(ClientSet::Send),
);
.chain()
.in_set(ClientSet::Send),
);
}
}

Expand Down Expand Up @@ -84,31 +83,37 @@ impl ClientEventsPlugin {
world.resource_scope(|world, registry: Mut<AppTypeRegistry>| {
world.resource_scope(|world, entity_map: Mut<ServerEntityMap>| {
world.resource_scope(|world, event_registry: Mut<EventRegistry>| {
world.resource_scope(|world, mut queue: Mut<ServerEventQueue>| {
let init_tick = **world.resource::<ServerInitTick>();
let mut ctx = ClientReceiveCtx {
registry: &registry.read(),
entity_map: &entity_map,
invalid_entities: Vec::new(),
};
let init_tick = **world.resource::<ServerInitTick>();
let mut ctx = ClientReceiveCtx {
registry: &registry.read(),
entity_map: &entity_map,
invalid_entities: Vec::new(),
};

for event_data in event_registry.iter_server_events() {
let events = world
let world_cell = world.as_unsafe_world_cell();
for event_data in event_registry.iter_server_events() {
// SAFETY: both resources mutably borrowed uniquely.
let (events, queue) = unsafe {
let events = world_cell
.get_resource_mut_by_id(event_data.events_id())
.expect("events shouldn't be removed");
let queue = world_cell
.get_resource_mut_by_id(event_data.queue_id())
.expect("event queue shouldn't be removed");
(events, queue)
};

// SAFETY: passed pointers were obtained using this event data.
unsafe {
event_data.receive(
&mut ctx,
events.into_inner(),
&mut queue,
&mut client,
init_tick,
)
};
}
});
// SAFETY: passed pointers were obtained using this event data.
unsafe {
event_data.receive(
&mut ctx,
events.into_inner(),
queue.into_inner(),
&mut client,
init_tick,
)
};
}
});
});
});
Expand Down Expand Up @@ -149,13 +154,13 @@ impl ClientEventsPlugin {
unsafe { event_data.reset(events.into_inner()) };
}

let mut queue = world.resource_mut::<ServerEventQueue>();
if !queue.is_empty() {
warn!(
"discarding {} queued server events due to a disconnect",
queue.values_len()
);
queue.clear();
for event_data in event_registry.iter_server_events() {
let queue = world
.get_resource_mut_by_id(event_data.queue_id())
.expect("event queue shouldn't be removed");

// SAFETY: passed pointer was obtained using this event data.
unsafe { event_data.reset(queue.into_inner()) };
}
});
}
Expand Down
116 changes: 91 additions & 25 deletions src/core/event_registry/server_event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
any::{self, TypeId},
io::Cursor,
marker::PhantomData,
mem,
};

Expand Down Expand Up @@ -153,7 +154,9 @@ impl ServerEventAppExt for App {
) -> &mut Self {
debug!("registering server event `{}`", any::type_name::<E>());

self.add_event::<E>().add_event::<ToClients<E>>();
self.add_event::<E>()
.add_event::<ToClients<E>>()
.init_resource::<ServerEventQueue<E>>();

let channel_id = self
.world_mut()
Expand Down Expand Up @@ -212,12 +215,16 @@ pub(crate) struct ServerEvent {
/// ID of [`Events<ToClients<E>>`].
server_events_id: ComponentId,

/// ID of [`ServerEventQueue<T>`].
queue_id: ComponentId,

/// Used channel.
channel_id: u8,

send: SendFn,
receive: ReceiveFn,
resend_locally: ResendLocallyFn,
reset: ResetFn,
serialize: unsafe fn(),
deserialize: unsafe fn(),
}
Expand All @@ -243,6 +250,14 @@ impl ServerEvent {
any::type_name::<ToClients<E>>()
)
});
let queue_id = components
.resource_id::<ServerEventQueue<E>>()
.unwrap_or_else(|| {
panic!(
"resource `{}` should be previously inserted",
any::type_name::<ServerEventQueue<E>>()
)
});

// SAFETY: these functions won't be called until the type is restored.
Self {
Expand All @@ -251,10 +266,12 @@ impl ServerEvent {
independent: false,
events_id,
server_events_id,
queue_id,
channel_id,
send: send::<E>,
receive: receive::<E>,
resend_locally: resend_locally::<E>,
reset: reset::<E>,
serialize: unsafe { mem::transmute::<SerializeFn<E>, unsafe fn()>(serialize) },
deserialize: unsafe { mem::transmute::<DeserializeFn<E>, unsafe fn()>(deserialize) },
}
Expand All @@ -268,6 +285,10 @@ impl ServerEvent {
self.server_events_id
}

pub(crate) fn queue_id(&self) -> ComponentId {
self.queue_id
}

pub(crate) fn is_independent(&self) -> bool {
self.independent
}
Expand Down Expand Up @@ -304,13 +325,13 @@ impl ServerEvent {
///
/// # Safety
///
/// The caller must ensure that `events` is [`Events<E>`]
/// The caller must ensure that `events` is [`Events<E>`], `queue` is [`ServerEventQueue<E>`],
/// and this instance was created for `E`.
pub(crate) unsafe fn receive(
&self,
ctx: &mut ClientReceiveCtx,
events: PtrMut,
queue: &mut ServerEventQueue,
queue: PtrMut,
client: &mut RepliconClient,
init_tick: RepliconTick,
) {
Expand All @@ -327,6 +348,18 @@ impl ServerEvent {
(self.resend_locally)(server_events, events);
}

/// Clears queued events.
///
/// We clear events while waiting for a connection to ensure clean reconnects.
///
/// # Safety
///
/// The caller must ensure that `queue` is [`Events<E>`]
/// and this instance was created for `E`.
pub(crate) unsafe fn reset(&self, queue: PtrMut) {
(self.reset)(queue);
}

/// Serializes an event into a cursor.
///
/// # Safety
Expand Down Expand Up @@ -401,14 +434,17 @@ type ReceiveFn = unsafe fn(
&ServerEvent,
&mut ClientReceiveCtx,
PtrMut,
&mut ServerEventQueue,
PtrMut,
&mut RepliconClient,
RepliconTick,
);

/// Signature of server event resending functions.
type ResendLocallyFn = unsafe fn(PtrMut, PtrMut);

/// Signature of server event reset functions.
type ResetFn = unsafe fn(PtrMut);

/// Typed version of [`ServerEvent::send`].
///
/// # Safety
Expand Down Expand Up @@ -449,11 +485,12 @@ unsafe fn receive<E: Event>(
event_data: &ServerEvent,
ctx: &mut ClientReceiveCtx,
events: PtrMut,
queue: &mut ServerEventQueue,
queue: PtrMut,
client: &mut RepliconClient,
init_tick: RepliconTick,
) {
let events: &mut Events<E> = events.deref_mut();
let queue: &mut ServerEventQueue<E> = queue.deref_mut();

while let Some((tick, message)) = queue.pop_if_le(init_tick) {
let mut cursor = Cursor::new(&*message);
Expand Down Expand Up @@ -537,6 +574,22 @@ unsafe fn resend_locally<E: Event>(server_events: PtrMut, events: PtrMut) {
}
}

/// Typed version of [`ServerEvent::reset`].
///
/// # Safety
///
/// The caller must ensure that `queue` is [`Events<E>`].
unsafe fn reset<E: Event>(queue: PtrMut) {
let queue: &mut ServerEventQueue<E> = queue.deref_mut();
if !queue.is_empty() {
warn!(
"discarding {} queued server events due to a disconnect",
queue.values_len()
);
}
queue.clear();
}

/// Sends event `E` based on a mode.
///
/// # Safety
Expand Down Expand Up @@ -686,26 +739,6 @@ unsafe fn serialize_with_tick<E: Event>(
}
}

/// Stores all received events from server that arrived earlier then replication message with their tick.
///
/// Stores data sorted by ticks and maintains order of arrival.
/// Needed to ensure that when an event is triggered, all the data that it affects or references already exists.
#[derive(Resource, Deref, DerefMut, Default)]
pub(crate) struct ServerEventQueue(ListOrderedMultimap<RepliconTick, Bytes>);

impl ServerEventQueue {
/// Pops the next event that is at least as old as the specified replicon tick.
pub(crate) fn pop_if_le(&mut self, init_tick: RepliconTick) -> Option<(RepliconTick, Bytes)> {
let (tick, _) = self.0.front()?;
if *tick > init_tick {
return None;
}
self.0
.pop_front()
.map(|(tick, message)| (tick.into_owned(), message))
}
}

/// Cached message for use in [`serialize_with`].
struct SerializedMessage {
init_tick: RepliconTick,
Expand Down Expand Up @@ -734,6 +767,39 @@ pub enum SendMode {
Direct(ClientId),
}

/// Stores all received events from server that arrived earlier then replication message with their tick.
///
/// Stores data sorted by ticks and maintains order of arrival.
/// Needed to ensure that when an event is triggered, all the data that it affects or references already exists.
#[derive(Resource, Deref, DerefMut)]
struct ServerEventQueue<E> {
#[deref]
list: ListOrderedMultimap<RepliconTick, Bytes>,
marker: PhantomData<E>,
}

impl<E> ServerEventQueue<E> {
/// Pops the next event that is at least as old as the specified replicon tick.
fn pop_if_le(&mut self, init_tick: RepliconTick) -> Option<(RepliconTick, Bytes)> {
let (tick, _) = self.list.front()?;
if *tick > init_tick {
return None;
}
self.list
.pop_front()
.map(|(tick, message)| (tick.into_owned(), message))
}
}

impl<E> Default for ServerEventQueue<E> {
fn default() -> Self {
Self {
list: Default::default(),
marker: PhantomData,
}
}
}

/// Default event serialization function.
pub fn default_serialize<E: Event + Serialize>(
_ctx: &mut ServerSendCtx,
Expand Down
Loading

0 comments on commit 7729e74

Please sign in to comment.