From 883709910277a6c0347a8e92c2cdd1063e45fa5c Mon Sep 17 00:00:00 2001 From: koe Date: Wed, 20 Sep 2023 00:00:08 -0500 Subject: [PATCH] replace WorldDiff-based replication with a buffer --- src/client.rs | 4 +- src/replicon_core.rs | 402 +++++++++++++++++++++++++++++-------------- src/server.rs | 144 ++++++++-------- 3 files changed, 347 insertions(+), 203 deletions(-) diff --git a/src/client.rs b/src/client.rs index ba0c690a..1ebfe09b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -7,7 +7,7 @@ use bevy_renet::transport::client_connected; use bevy_renet::{renet::RenetClient, transport::NetcodeClientPlugin, RenetClientPlugin}; use crate::{ - replicon_core::{Mapper, NetworkTick, WorldDiff, REPLICATION_CHANNEL_ID}, + replicon_core::{deserialize_to_world, Mapper, NetworkTick, REPLICATION_CHANNEL_ID}, Replication, }; @@ -48,7 +48,7 @@ impl ClientPlugin { fn diff_receiving_system(world: &mut World) { world.resource_scope(|world, mut client: Mut| { while let Some(message) = client.receive_message(REPLICATION_CHANNEL_ID) { - WorldDiff::deserialize_to_world(world, message) + deserialize_to_world(world, message) .expect("server should send only valid world diffs"); } }); diff --git a/src/replicon_core.rs b/src/replicon_core.rs index 4ccf8a2c..4c8ca788 100644 --- a/src/replicon_core.rs +++ b/src/replicon_core.rs @@ -1,14 +1,17 @@ -use std::{cmp::Ordering, io::Cursor, marker::PhantomData}; +use std::{cmp::Ordering, io::Cursor, io::Write, marker::PhantomData}; use bevy::{ - ecs::{component::ComponentId, world::EntityMut}, + ecs::{ + component::{ComponentId, Tick}, + world::EntityMut, + }, prelude::*, ptr::Ptr, utils::HashMap, }; use bevy_renet::renet::{Bytes, ChannelConfig, SendType}; +use bincode::Options; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use strum::EnumDiscriminants; use crate::client::{ClientMapper, LastTick, NetworkEntityMap}; @@ -169,7 +172,7 @@ impl ReplicationRules { } /// Returns meta information about replicated component. - pub fn get_info(&self, replication_id: ReplicationId) -> &ReplicationInfo { + pub(super) fn get_info(&self, replication_id: ReplicationId) -> &ReplicationInfo { // SAFETY: `ReplicationId` always corresponds to a valid index. unsafe { self.info.get_unchecked(replication_id.0) } } @@ -287,130 +290,6 @@ pub trait Mapper { #[derive(Component, Clone, Copy)] pub struct Replication; -/// Changed world data and current tick from server. -/// -/// Sent from server to clients. -pub(super) struct WorldDiff<'a> { - pub(super) tick: NetworkTick, - pub(super) entities: HashMap>>, - pub(super) despawns: Vec, -} - -impl WorldDiff<'_> { - /// Creates a new [`WorldDiff`] with a tick and empty entities. - pub(super) fn new(tick: NetworkTick) -> Self { - Self { - tick, - entities: Default::default(), - despawns: Default::default(), - } - } - - /// Serializes itself into a buffer. - /// - /// We use custom implementation because serde impls require to use generics that can't be stored in [`ReplicationInfo`]. - pub(super) fn serialize( - &self, - replication_rules: &ReplicationRules, - message: &mut Vec, - ) -> Result<(), bincode::Error> { - let mut cursor = Cursor::new(message); - - bincode::serialize_into(&mut cursor, &self.tick)?; - - bincode::serialize_into(&mut cursor, &self.entities.len())?; - for (entity, components) in &self.entities { - bincode::serialize_into(&mut cursor, entity)?; - bincode::serialize_into(&mut cursor, &components.len())?; - for &component_diff in components { - bincode::serialize_into(&mut cursor, &ComponentDiffKind::from(component_diff))?; - match component_diff { - ComponentDiff::Changed((replication_id, ptr)) => { - bincode::serialize_into(&mut cursor, &replication_id)?; - let replication_info = replication_rules.get_info(replication_id); - (replication_info.serialize)(ptr, &mut cursor)?; - } - ComponentDiff::Removed(replication_id) => { - bincode::serialize_into(&mut cursor, &replication_id)?; - } - } - } - } - - bincode::serialize_into(&mut cursor, &self.despawns)?; - - Ok(()) - } - - /// Deserializes itself from bytes directly into the world by applying all changes. - /// - /// Does nothing if world already received a more recent diff. - /// See also [`LastTick`]. - pub(super) fn deserialize_to_world( - world: &mut World, - message: Bytes, - ) -> Result<(), bincode::Error> { - let mut cursor = Cursor::new(message); - - let tick = bincode::deserialize_from(&mut cursor)?; - let mut last_tick = world.resource_mut::(); - if last_tick.0 >= tick { - return Ok(()); - } - last_tick.0 = tick; - - world.resource_scope(|world, replication_rules: Mut| { - world.resource_scope(|world, mut entity_map: Mut| { - let entities_count: usize = bincode::deserialize_from(&mut cursor)?; - for _ in 0..entities_count { - let entity = bincode::deserialize_from(&mut cursor)?; - let mut entity = entity_map.get_by_server_or_spawn(world, entity); - let components_count: usize = bincode::deserialize_from(&mut cursor)?; - for _ in 0..components_count { - let diff_kind = bincode::deserialize_from(&mut cursor)?; - let replication_id = bincode::deserialize_from(&mut cursor)?; - let replication_info = replication_rules.get_info(replication_id); - match diff_kind { - ComponentDiffKind::Changed => { - (replication_info.deserialize)( - &mut entity, - &mut entity_map, - &mut cursor, - )?; - } - ComponentDiffKind::Removed => { - (replication_info.remove)(&mut entity); - } - } - } - } - - let despawns: Vec = bincode::deserialize_from(&mut cursor)?; - for server_entity in despawns { - // The entity might have already been deleted with the last diff, - // but the server might not yet have received confirmation from the - // client and could include the deletion in the latest diff. - if let Some(client_entity) = entity_map.remove_by_server(server_entity) { - world.entity_mut(client_entity).despawn_recursive(); - } - } - - Ok(()) - }) - }) - } -} - -/// Type of component change. -#[derive(EnumDiscriminants, Clone, Copy)] -#[strum_discriminants(name(ComponentDiffKind), derive(Deserialize, Serialize))] -pub(super) enum ComponentDiff<'a> { - /// Indicates that a component was added or changed, contains its ID and pointer. - Changed((ReplicationId, Ptr<'a>)), - /// Indicates that a component was removed, contains its ID. - Removed(ReplicationId), -} - /// Corresponds to the number of server update. /// /// See also [`crate::server::TickPolicy`]. @@ -446,3 +325,270 @@ impl PartialOrd for NetworkTick { } } } + +/// Buffer for an entity's replicated components. +/// - An entity may replicate up to 127 components. +#[derive(Default)] +struct ComponentData { + /// Number of updated components. + /// - Only 7 bits can be used. We reserve one bit to indicate if components were removed. + updated_count: u8, + updated_data: Vec, + + /// Number of removed components. + removed_count: u8, + removed_ids: Vec, +} + +/// Buffer for a client's replicated entities. +/// - Up to 65535 entities may be replicated. +//todo: save up to N ComponentData buffers for removed entities and re-use them for newly added entities +#[derive(Default)] +pub(super) struct ReplicationBuffer { + /// [ entity : component data ] + data: HashMap, + + /// Number of despawned entities + despawn_count: u16, + /// Despawned entities + despawned: Vec, + + /// Total buffer size + total_size: usize, + + /// The current server tick + server_tick: NetworkTick, + /// Last system tick acked by the client + acked_system_tick: Option, +} + +impl ReplicationBuffer { + /// Update ticks + pub(super) fn refresh_ticks(&mut self, server_tick: NetworkTick, acked_system_tick: Tick) { + self.server_tick = server_tick; + self.acked_system_tick = Some(acked_system_tick); + } + + /// Get the client's last acked system tick + pub(super) fn last_acked_system_tick(&self) -> Tick { + self.acked_system_tick.unwrap_or(Tick::new(0u32)) + } + + /// Add an updated component to the buffer. + pub(super) fn append_updated_component( + &mut self, + replication_rules: &ReplicationRules, + entity: Entity, + replication_id: ReplicationId, + component_diff: Ptr<'_>, + ) -> Result<(), bincode::Error> { + // get component data entry + let entry = self.data.entry(entity).or_default(); + + // write component diff into buffer + let prev_len = entry.updated_data.len(); + { + let len = entry.updated_data.len(); + let mut cursor = Cursor::new(&mut entry.updated_data); + cursor.set_position(len as u64); + bincode::config::DefaultOptions::new().serialize_into(&mut cursor, &replication_id)?; + let replication_info = replication_rules.get_info(replication_id); + (replication_info.serialize)(component_diff, &mut cursor)?; + } + let post_len = entry.updated_data.len(); + + // update trackers + entry.updated_count += 1; + if entry.updated_count > 127 { + error!("entity has too many components being updated"); + } + self.total_size += post_len.saturating_sub(prev_len); + + Ok(()) + } + + /// Add a removed component to the buffer. + pub(super) fn append_removed_component( + &mut self, + entity: Entity, + replication_id: ReplicationId, + ) -> Result<(), bincode::Error> { + // get component data entry + let entry = self.data.entry(entity).or_default(); + + // write removed component's id into buffer + let prev_len = entry.removed_ids.len(); + bincode::config::DefaultOptions::new() + .serialize_into(&mut entry.removed_ids, &replication_id)?; + let post_len = entry.removed_ids.len(); + + // update trackers + entry.removed_count = entry + .removed_count + .checked_add(1) + .expect("entity has removed too many components"); + self.total_size += post_len.saturating_sub(prev_len); + + Ok(()) + } + + /// Add a despawned entity to the buffer. + /// - Will clean up any internal entries for the despawned entity. + pub(super) fn despawn_entity(&mut self, entity: Entity) -> Result<(), bincode::Error> { + // write despawned entity into buffer + let prev_len = self.despawned.len(); + bincode::config::DefaultOptions::new().serialize_into(&mut self.despawned, &entity)?; + let post_len = self.despawned.len(); + + // update trackers + self.despawn_count = self + .despawn_count + .checked_add(1) + .expect("entity has despawned too many entities"); + self.total_size += post_len.saturating_sub(prev_len); + + // clean up entity entry + let _ = self.data.remove(&entity); + + Ok(()) + } + + /// Build a replication message and reset the buffer's internal state (without deallocating internal buffers). + pub(super) fn consume(&mut self) -> Result, bincode::Error> { + let mut message = Vec::with_capacity(self.estimate_len()); + + // current server tick + bincode::serialize_into(&mut message, &self.server_tick)?; + + // number of entities + if self.data.len() > u16::MAX as usize { + error!("replication buffer has too many entities"); + } + bincode::serialize_into(&mut message, &(self.data.len() as u16))?; + + // entities + for (entity, component_data) in self.data.iter_mut() { + // check if any components were removed from the entity + // - set most significant bit of updated count to indicate there are removed components + if component_data.updated_count > 127u8 { + error!("entity has too many components to replicate"); + } + if component_data.removed_count > 0 { + component_data.updated_count += 128u8 + } + + // write entity + bincode::config::DefaultOptions::new().serialize_into(&mut message, &entity)?; + bincode::serialize_into(&mut message, &component_data.updated_count)?; + message + .write(&component_data.updated_data[..]) + .map_err(|e| Box::new(bincode::ErrorKind::Io(e)))?; + + if component_data.removed_count > 0 { + bincode::serialize_into(&mut message, &component_data.removed_count)?; + message + .write(&component_data.removed_ids[..]) + .map_err(|e| Box::new(bincode::ErrorKind::Io(e)))?; + } + + // reset entity + component_data.updated_count = 0; + component_data.updated_data.clear(); + component_data.removed_count = 0; + component_data.removed_ids.clear(); + } + + // despawned entities + bincode::serialize_into(&mut message, &self.despawn_count)?; + message + .write(&self.despawned[..]) + .map_err(|e| Box::new(bincode::ErrorKind::Io(e)))?; + + // final resets + self.despawn_count = 0; + self.despawned.clear(); + self.total_size = 0; + + Ok(message) + } + + fn estimate_len(&self) -> usize { + // buffered data + entity ids + entity count + despawn count + network tick + self.total_size + self.data.len() * 8 + 2 + 2 + 4 + 20 + } +} + +/// Deserializes a replication buffer package from bytes directly into the world by applying all changes. +/// +/// Does nothing if world already received a more recent diff. +/// See also [`LastTick`]. +pub(super) fn deserialize_to_world( + world: &mut World, + message: Bytes, +) -> Result<(), bincode::Error> { + let mut cursor = Cursor::new(message); + + // tick + let tick = bincode::deserialize_from(&mut cursor)?; + let mut last_tick = world.resource_mut::(); + if last_tick.0 >= tick { + return Ok(()); + } + last_tick.0 = tick; + + // prep + let replication_rules = world.remove_resource::().unwrap(); + let mut entity_map = world.remove_resource::().unwrap(); + + // entities + let entities_count: u16 = bincode::deserialize_from(&mut cursor)?; + for _ in 0..entities_count { + // entity + let entity = bincode::config::DefaultOptions::new().deserialize_from(&mut cursor)?; + let mut entity = entity_map.get_by_server_or_spawn(world, entity); + + // updated components + let mut components_count: u8 = bincode::deserialize_from(&mut cursor)?; + let has_removed = components_count > 127; + components_count %= 128u8; + + for _ in 0..components_count { + let replication_id = + bincode::config::DefaultOptions::new().deserialize_from(&mut cursor)?; + let replication_info = replication_rules.get_info(replication_id); + (replication_info.deserialize)(&mut entity, &mut entity_map, &mut cursor)?; + } + + // removed components + if !has_removed { + continue; + } + let removed_count: u8 = bincode::deserialize_from(&mut cursor)?; + + for _ in 0..removed_count { + let replication_id = + bincode::config::DefaultOptions::new().deserialize_from(&mut cursor)?; + let replication_info = replication_rules.get_info(replication_id); + (replication_info.remove)(&mut entity); + } + } + + // despawns + let despawn_count: u16 = bincode::deserialize_from(&mut cursor)?; + for _ in 0..despawn_count { + let server_entity: Entity = + bincode::config::DefaultOptions::new().deserialize_from(&mut cursor)?; + // The entity might have already been deleted with the last diff, + // but the server might not yet have received confirmation from the + // client and could include the deletion in the latest diff. + if let Some(client_entity) = entity_map.remove_by_server(server_entity) { + world.entity_mut(client_entity).despawn_recursive(); + } + } + + // cleanup + world.insert_resource(replication_rules); + world.insert_resource(entity_map); + + Ok(()) +} diff --git a/src/server.rs b/src/server.rs index 4c8eb038..58113cb2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,7 +8,7 @@ use bevy::{ archetype::{Archetype, ArchetypeId}, component::{ComponentId, StorageType, Tick}, storage::{SparseSets, Table}, - system::SystemChangeTick, + system::{Local, SystemChangeTick}, }, prelude::*, time::common_conditions::on_timer, @@ -22,7 +22,7 @@ use bevy_renet::{ use derive_more::Constructor; use crate::replicon_core::{ - ComponentDiff, NetworkTick, ReplicationId, ReplicationRules, WorldDiff, REPLICATION_CHANNEL_ID, + NetworkTick, ReplicationBuffer, ReplicationId, ReplicationRules, REPLICATION_CHANNEL_ID, }; use despawn_tracker::{DespawnTracker, DespawnTrackerPlugin}; use removal_tracker::{RemovalTracker, RemovalTrackerPlugin}; @@ -121,49 +121,55 @@ impl ServerPlugin { } fn diffs_sending_system( + mut replication_buffers: Local>, change_tick: SystemChangeTick, mut set: ParamSet<(&World, ResMut, ResMut)>, replication_rules: Res, despawn_tracker: Res, removal_trackers: Query<(Entity, &RemovalTracker)>, ) { + // remove disconnected clients from replication buffer cache + { + let renet_server = set.p1(); + replication_buffers.retain(|client_id, _| renet_server.is_connected(*client_id)); + } + let mut server_ticks = set.p2(); server_ticks.increment(change_tick.this_run()); - let mut client_diffs = Vec::with_capacity(server_ticks.acked_ticks.len()); - for (&client_id, &tick) in &server_ticks.acked_ticks { - let system_tick = *server_ticks + for (&client_id, &acked_tick) in &server_ticks.acked_ticks { + let acked_system_tick = *server_ticks .system_ticks - .get(&tick) + .get(&acked_tick) .unwrap_or(&Tick::new(0)); - client_diffs.push(ClientDiff { - client_id, - system_tick, - world_diff: WorldDiff::new(server_ticks.current_tick), - }); + replication_buffers + .entry(client_id) + .or_default() + .refresh_ticks(server_ticks.current_tick, acked_system_tick); } collect_changes( - &mut client_diffs, + &mut replication_buffers, change_tick.this_run(), set.p0(), &replication_rules, ); - collect_removals(&mut client_diffs, change_tick.this_run(), &removal_trackers); - collect_despawns(&mut client_diffs, change_tick.this_run(), &despawn_tracker); - - let mut messages = Vec::with_capacity(client_diffs.len()); - for client_diff in client_diffs { - let mut message = Vec::new(); - client_diff - .world_diff - .serialize(&replication_rules, &mut message) - .expect("world diff should be serializable"); - messages.push((client_diff.client_id, message)); - } + collect_removals( + &mut replication_buffers, + change_tick.this_run(), + &removal_trackers, + ); + collect_despawns( + &mut replication_buffers, + change_tick.this_run(), + &despawn_tracker, + ); - for (client_id, message) in messages { + for (client_id, replication_buffer) in replication_buffers.iter_mut() { + let Ok(message) = replication_buffer.consume() else { + continue; + }; set.p1() - .send_message(client_id, REPLICATION_CHANNEL_ID, message); + .send_message(*client_id, REPLICATION_CHANNEL_ID, message); } } @@ -173,10 +179,10 @@ impl ServerPlugin { } } -fn collect_changes<'a>( - client_diffs: &mut [ClientDiff<'a>], +fn collect_changes( + replication_buffers: &mut HashMap, system_tick: Tick, - world: &'a World, + world: &World, replication_rules: &ReplicationRules, ) { for archetype in world @@ -208,7 +214,8 @@ fn collect_changes<'a>( match storage_type { StorageType::Table => { collect_table_components( - client_diffs, + replication_buffers, + replication_rules, system_tick, table, archetype, @@ -218,7 +225,8 @@ fn collect_changes<'a>( } StorageType::SparseSet => { collect_sparse_set_components( - client_diffs, + replication_buffers, + replication_rules, system_tick, &world.storages().sparse_sets, archetype, @@ -231,10 +239,11 @@ fn collect_changes<'a>( } } -fn collect_table_components<'a>( - client_diffs: &mut [ClientDiff<'a>], +fn collect_table_components( + replication_buffers: &mut HashMap, + replication_rules: &ReplicationRules, system_tick: Tick, - table: &'a Table, + table: &Table, archetype: &Archetype, replication_id: ReplicationId, component_id: ComponentId, @@ -249,23 +258,24 @@ fn collect_table_components<'a>( // SAFETY: component obtained from the archetype. let component = unsafe { column.get_data_unchecked(archetype_entity.table_row()) }; - for client_diff in &mut *client_diffs { - if ticks.is_changed(client_diff.system_tick, system_tick) { - client_diff - .world_diff - .entities - .entry(archetype_entity.entity()) - .or_default() - .push(ComponentDiff::Changed((replication_id, component))); + for (_, replication_buffer) in replication_buffers.iter_mut() { + if ticks.is_changed(replication_buffer.last_acked_system_tick(), system_tick) { + let _ = replication_buffer.append_updated_component( + replication_rules, + archetype_entity.entity(), + replication_id, + component, + ); } } } } -fn collect_sparse_set_components<'a>( - client_diffs: &mut [ClientDiff<'a>], +fn collect_sparse_set_components( + replication_buffers: &mut HashMap, + replication_rules: &ReplicationRules, system_tick: Tick, - sparse_sets: &'a SparseSets, + sparse_sets: &SparseSets, archetype: &Archetype, replication_id: ReplicationId, component_id: ComponentId, @@ -283,34 +293,29 @@ fn collect_sparse_set_components<'a>( .get(entity) .unwrap_or_else(|| panic!("{entity:?} should have {component_id:?}")); - for client_diff in &mut *client_diffs { - if ticks.is_changed(client_diff.system_tick, system_tick) { - client_diff - .world_diff - .entities - .entry(entity) - .or_default() - .push(ComponentDiff::Changed((replication_id, component))); + for (_, replication_buffer) in replication_buffers.iter_mut() { + if ticks.is_changed(replication_buffer.last_acked_system_tick(), system_tick) { + let _ = replication_buffer.append_updated_component( + replication_rules, + entity, + replication_id, + component, + ); } } } } fn collect_removals( - client_diffs: &mut [ClientDiff], + replication_buffers: &mut HashMap, system_tick: Tick, removal_trackers: &Query<(Entity, &RemovalTracker)>, ) { for (entity, removal_tracker) in removal_trackers { - for client_diff in &mut *client_diffs { + for (_, replication_buffer) in replication_buffers.iter_mut() { for (&replication_id, &tick) in &removal_tracker.0 { - if tick.is_newer_than(client_diff.system_tick, system_tick) { - client_diff - .world_diff - .entities - .entry(entity) - .or_default() - .push(ComponentDiff::Removed(replication_id)); + if tick.is_newer_than(replication_buffer.last_acked_system_tick(), system_tick) { + let _ = replication_buffer.append_removed_component(entity, replication_id); } } } @@ -318,14 +323,14 @@ fn collect_removals( } fn collect_despawns( - client_diffs: &mut [ClientDiff], + replication_buffers: &mut HashMap, system_tick: Tick, despawn_tracker: &DespawnTracker, ) { for &(entity, tick) in &despawn_tracker.despawns { - for client_diff in &mut *client_diffs { - if tick.is_newer_than(client_diff.system_tick, system_tick) { - client_diff.world_diff.despawns.push(entity); + for (_, replication_buffer) in replication_buffers.iter_mut() { + if tick.is_newer_than(replication_buffer.last_acked_system_tick(), system_tick) { + let _ = replication_buffer.despawn_entity(entity); } } } @@ -399,10 +404,3 @@ impl ServerTicks { &self.acked_ticks } } - -/// Intermediate struct that contains necessary information to create and send [`WorldDiff`]. -struct ClientDiff<'a> { - client_id: u64, - system_tick: Tick, - world_diff: WorldDiff<'a>, -}