From a7dd5e04e204bfd6246e897a5c69d8376e3d341d Mon Sep 17 00:00:00 2001 From: Hennadii Chernyshchyk Date: Thu, 21 Sep 2023 23:32:29 +0300 Subject: [PATCH] Refactor replication to use a buffer cache per client --- src/client.rs | 120 ++++++++++- src/lib.rs | 4 +- src/replicon_core.rs | 131 +----------- src/server.rs | 489 +++++++++++++++++++++++++++++-------------- 4 files changed, 454 insertions(+), 290 deletions(-) diff --git a/src/client.rs b/src/client.rs index ba0c690a..4759e27f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,13 +1,15 @@ +use std::io::Cursor; + use bevy::{ ecs::world::EntityMut, prelude::*, utils::{Entry, HashMap}, }; -use bevy_renet::transport::client_connected; +use bevy_renet::{renet::Bytes, transport::client_connected}; use bevy_renet::{renet::RenetClient, transport::NetcodeClientPlugin, RenetClientPlugin}; use crate::{ - replicon_core::{Mapper, NetworkTick, WorldDiff, REPLICATION_CHANNEL_ID}, + replicon_core::{Mapper, NetworkTick, ReplicationRules, REPLICATION_CHANNEL_ID}, Replication, }; @@ -29,6 +31,7 @@ impl Plugin for ClientPlugin { .add_systems( PreUpdate, Self::diff_receiving_system + .pipe(unwrap) .in_set(ClientSet::Receive) .run_if(client_connected()), ) @@ -45,13 +48,38 @@ impl Plugin for ClientPlugin { } impl ClientPlugin { - fn diff_receiving_system(world: &mut World) { + fn diff_receiving_system(world: &mut World) -> Result<(), bincode::Error> { world.resource_scope(|world, mut client: Mut| { - while let Some(message) = client.receive_message(REPLICATION_CHANNEL_ID) { - WorldDiff::deserialize_to_world(world, message) - .expect("server should send only valid world diffs"); - } - }); + world.resource_scope(|world, mut entity_map: Mut| { + world.resource_scope(|world, replication_rules: Mut| { + while let Some(message) = client.receive_message(REPLICATION_CHANNEL_ID) { + let mut cursor = Cursor::new(message); + + if !deserialize_tick(&mut cursor, world)? { + continue; + } + + deserialize_component_diffs( + &mut cursor, + world, + &mut entity_map, + &replication_rules, + DiffKind::Change, + )?; + deserialize_component_diffs( + &mut cursor, + world, + &mut entity_map, + &replication_rules, + DiffKind::Removal, + )?; + deserialize_despawns(&mut cursor, world, &mut entity_map)?; + } + + Ok(()) + }) + }) + }) } fn ack_sending_system(last_tick: Res, mut client: ResMut) { @@ -66,6 +94,82 @@ impl ClientPlugin { } } +/// Deserializes server tick and applies it to [`LastTick`] if it is newer. +/// +/// Returns true if [`LastTick`] has been updated. +fn deserialize_tick(cursor: &mut Cursor, world: &mut World) -> Result { + let tick = bincode::deserialize_from(cursor)?; + + let mut last_tick = world.resource_mut::(); + if last_tick.0 < tick { + last_tick.0 = tick; + Ok(true) + } else { + Ok(false) + } +} + +/// Deserializes component [`DiffKind`] and applies them to the [`World`]. +fn deserialize_component_diffs( + cursor: &mut Cursor, + world: &mut World, + entity_map: &mut NetworkEntityMap, + replication_rules: &ReplicationRules, + diff_kind: DiffKind, +) -> Result<(), bincode::Error> { + let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?; + if entities_count == 0 { + return Ok(()); + } + + for _ in 0..entities_count { + let entity = bincode::deserialize_from(&mut *cursor)?; + let mut entity = entity_map.get_by_server_or_spawn(world, entity); + let components_count: u8 = bincode::deserialize_from(&mut *cursor)?; + for _ in 0..components_count { + let replication_id = bincode::deserialize_from(&mut *cursor)?; + let replication_info = replication_rules.get_info(replication_id); + match diff_kind { + DiffKind::Change => { + (replication_info.deserialize)(&mut entity, entity_map, cursor)? + } + DiffKind::Removal => (replication_info.remove)(&mut entity), + } + } + } + + Ok(()) +} + +enum DiffKind { + Change, + Removal, +} + +/// Deserializes despawns and applies them to the [`World`]. +fn deserialize_despawns( + cursor: &mut Cursor, + world: &mut World, + entity_map: &mut NetworkEntityMap, +) -> Result<(), bincode::Error> { + let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?; + if entities_count == 0 { + return Ok(()); + } + + for _ in 0..entities_count { + // The entity might have already been deleted with the last diff, + // but the server might not yet have received confirmation from the + // client and could include the deletion in the latest diff. + let server_entity = bincode::deserialize_from(&mut *cursor)?; + if let Some(client_entity) = entity_map.remove_by_server(server_entity) { + world.entity_mut(client_entity).despawn_recursive(); + } + } + + Ok(()) +} + /// Last received tick from server. /// /// Exists only on clients, sent to the server. diff --git a/src/lib.rs b/src/lib.rs index 60965cc4..6138463d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,7 +100,7 @@ app.replicate_with::(serialize_transform, deserialize_transform); /// Serializes only translation. fn serialize_transform( component: Ptr, - cursor: &mut Cursor<&mut Vec>, + cursor: &mut Cursor>, ) -> Result<(), bincode::Error> { // SAFETY: Function called for registered `ComponentId`. let transform: &Transform = unsafe { component.deref() }; @@ -171,7 +171,7 @@ fn player_init_system( #[derive(Component, Deserialize, Serialize)] struct Player; -# fn serialize_transform(_: Ptr, _: &mut Cursor<&mut Vec>) -> Result<(), bincode::Error> { unimplemented!() } +# fn serialize_transform(_: Ptr, _: &mut Cursor>) -> Result<(), bincode::Error> { unimplemented!() } # fn deserialize_transform(_: &mut EntityMut, _: &mut NetworkEntityMap, _: &mut Cursor) -> Result<(), bincode::Error> { unimplemented!() } ``` diff --git a/src/replicon_core.rs b/src/replicon_core.rs index 4ccf8a2c..7ee1d5d8 100644 --- a/src/replicon_core.rs +++ b/src/replicon_core.rs @@ -8,9 +8,8 @@ use bevy::{ }; use bevy_renet::renet::{Bytes, ChannelConfig, SendType}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use strum::EnumDiscriminants; -use crate::client::{ClientMapper, LastTick, NetworkEntityMap}; +use crate::client::{ClientMapper, NetworkEntityMap}; pub struct RepliconCorePlugin; @@ -191,7 +190,7 @@ impl FromWorld for ReplicationRules { } /// Signature of serialization function stored in [`ReplicationInfo`]. -pub type SerializeFn = fn(Ptr, &mut Cursor<&mut Vec>) -> Result<(), bincode::Error>; +pub type SerializeFn = fn(Ptr, &mut Cursor>) -> Result<(), bincode::Error>; /// Signature of deserialization function stored in [`ReplicationInfo`]. pub type DeserializeFn = @@ -230,7 +229,7 @@ pub struct ReplicationId(usize); /// Default serialization function. fn serialize_component( component: Ptr, - cursor: &mut Cursor<&mut Vec>, + cursor: &mut Cursor>, ) -> Result<(), bincode::Error> { // SAFETY: Function called for registered `ComponentId`. let component: &C = unsafe { component.deref() }; @@ -287,130 +286,6 @@ pub trait Mapper { #[derive(Component, Clone, Copy)] pub struct Replication; -/// Changed world data and current tick from server. -/// -/// Sent from server to clients. -pub(super) struct WorldDiff<'a> { - pub(super) tick: NetworkTick, - pub(super) entities: HashMap>>, - pub(super) despawns: Vec, -} - -impl WorldDiff<'_> { - /// Creates a new [`WorldDiff`] with a tick and empty entities. - pub(super) fn new(tick: NetworkTick) -> Self { - Self { - tick, - entities: Default::default(), - despawns: Default::default(), - } - } - - /// Serializes itself into a buffer. - /// - /// We use custom implementation because serde impls require to use generics that can't be stored in [`ReplicationInfo`]. - pub(super) fn serialize( - &self, - replication_rules: &ReplicationRules, - message: &mut Vec, - ) -> Result<(), bincode::Error> { - let mut cursor = Cursor::new(message); - - bincode::serialize_into(&mut cursor, &self.tick)?; - - bincode::serialize_into(&mut cursor, &self.entities.len())?; - for (entity, components) in &self.entities { - bincode::serialize_into(&mut cursor, entity)?; - bincode::serialize_into(&mut cursor, &components.len())?; - for &component_diff in components { - bincode::serialize_into(&mut cursor, &ComponentDiffKind::from(component_diff))?; - match component_diff { - ComponentDiff::Changed((replication_id, ptr)) => { - bincode::serialize_into(&mut cursor, &replication_id)?; - let replication_info = replication_rules.get_info(replication_id); - (replication_info.serialize)(ptr, &mut cursor)?; - } - ComponentDiff::Removed(replication_id) => { - bincode::serialize_into(&mut cursor, &replication_id)?; - } - } - } - } - - bincode::serialize_into(&mut cursor, &self.despawns)?; - - Ok(()) - } - - /// Deserializes itself from bytes directly into the world by applying all changes. - /// - /// Does nothing if world already received a more recent diff. - /// See also [`LastTick`]. - pub(super) fn deserialize_to_world( - world: &mut World, - message: Bytes, - ) -> Result<(), bincode::Error> { - let mut cursor = Cursor::new(message); - - let tick = bincode::deserialize_from(&mut cursor)?; - let mut last_tick = world.resource_mut::(); - if last_tick.0 >= tick { - return Ok(()); - } - last_tick.0 = tick; - - world.resource_scope(|world, replication_rules: Mut| { - world.resource_scope(|world, mut entity_map: Mut| { - let entities_count: usize = bincode::deserialize_from(&mut cursor)?; - for _ in 0..entities_count { - let entity = bincode::deserialize_from(&mut cursor)?; - let mut entity = entity_map.get_by_server_or_spawn(world, entity); - let components_count: usize = bincode::deserialize_from(&mut cursor)?; - for _ in 0..components_count { - let diff_kind = bincode::deserialize_from(&mut cursor)?; - let replication_id = bincode::deserialize_from(&mut cursor)?; - let replication_info = replication_rules.get_info(replication_id); - match diff_kind { - ComponentDiffKind::Changed => { - (replication_info.deserialize)( - &mut entity, - &mut entity_map, - &mut cursor, - )?; - } - ComponentDiffKind::Removed => { - (replication_info.remove)(&mut entity); - } - } - } - } - - let despawns: Vec = bincode::deserialize_from(&mut cursor)?; - for server_entity in despawns { - // The entity might have already been deleted with the last diff, - // but the server might not yet have received confirmation from the - // client and could include the deletion in the latest diff. - if let Some(client_entity) = entity_map.remove_by_server(server_entity) { - world.entity_mut(client_entity).despawn_recursive(); - } - } - - Ok(()) - }) - }) - } -} - -/// Type of component change. -#[derive(EnumDiscriminants, Clone, Copy)] -#[strum_discriminants(name(ComponentDiffKind), derive(Deserialize, Serialize))] -pub(super) enum ComponentDiff<'a> { - /// Indicates that a component was added or changed, contains its ID and pointer. - Changed((ReplicationId, Ptr<'a>)), - /// Indicates that a component was removed, contains its ID. - Removed(ReplicationId), -} - /// Corresponds to the number of server update. /// /// See also [`crate::server::TickPolicy`]. diff --git a/src/server.rs b/src/server.rs index 4c8eb038..f5bca3db 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,28 +1,28 @@ pub(super) mod despawn_tracker; pub(super) mod removal_tracker; -use std::time::Duration; +use std::{io::Cursor, mem, time::Duration}; use bevy::{ ecs::{ - archetype::{Archetype, ArchetypeId}, - component::{ComponentId, StorageType, Tick}, - storage::{SparseSets, Table}, + archetype::ArchetypeId, + component::{StorageType, Tick}, system::SystemChangeTick, }, prelude::*, + ptr::Ptr, time::common_conditions::on_timer, utils::HashMap, }; use bevy_renet::{ - renet::{RenetClient, RenetServer, ServerEvent}, + renet::{Bytes, RenetClient, RenetServer, ServerEvent}, transport::NetcodeServerPlugin, RenetServerPlugin, }; use derive_more::Constructor; use crate::replicon_core::{ - ComponentDiff, NetworkTick, ReplicationId, ReplicationRules, WorldDiff, REPLICATION_CHANNEL_ID, + NetworkTick, ReplicationId, ReplicationInfo, ReplicationRules, REPLICATION_CHANNEL_ID, }; use despawn_tracker::{DespawnTracker, DespawnTrackerPlugin}; use removal_tracker::{RemovalTracker, RemovalTrackerPlugin}; @@ -69,6 +69,7 @@ impl Plugin for ServerPlugin { PostUpdate, ( Self::diffs_sending_system + .pipe(unwrap) .in_set(ServerSet::Send) .run_if(resource_exists::()), Self::reset_system.run_if(resource_removed::()), @@ -121,50 +122,38 @@ impl ServerPlugin { } fn diffs_sending_system( + mut buffers: Local>, change_tick: SystemChangeTick, mut set: ParamSet<(&World, ResMut, ResMut)>, replication_rules: Res, despawn_tracker: Res, removal_trackers: Query<(Entity, &RemovalTracker)>, - ) { + ) -> Result<(), bincode::Error> { let mut server_ticks = set.p2(); server_ticks.increment(change_tick.this_run()); - let mut client_diffs = Vec::with_capacity(server_ticks.acked_ticks.len()); - for (&client_id, &tick) in &server_ticks.acked_ticks { - let system_tick = *server_ticks - .system_ticks - .get(&tick) - .unwrap_or(&Tick::new(0)); - client_diffs.push(ClientDiff { - client_id, - system_tick, - world_diff: WorldDiff::new(server_ticks.current_tick), - }); - } + let buffers = prepare_buffers(&mut buffers, &server_ticks)?; collect_changes( - &mut client_diffs, - change_tick.this_run(), + buffers, set.p0(), + change_tick.this_run(), &replication_rules, - ); - collect_removals(&mut client_diffs, change_tick.this_run(), &removal_trackers); - collect_despawns(&mut client_diffs, change_tick.this_run(), &despawn_tracker); - - let mut messages = Vec::with_capacity(client_diffs.len()); - for client_diff in client_diffs { - let mut message = Vec::new(); - client_diff - .world_diff - .serialize(&replication_rules, &mut message) - .expect("world diff should be serializable"); - messages.push((client_diff.client_id, message)); + )?; + collect_removals(buffers, &removal_trackers, change_tick.this_run())?; + collect_despawns(buffers, &despawn_tracker, change_tick.this_run())?; + + for buffer in buffers { + debug_assert_eq!(buffer.array_len, 0); + debug_assert_eq!(buffer.entity_map_len, 0); + + set.p1().send_message( + buffer.client_id, + REPLICATION_CHANNEL_ID, + Bytes::copy_from_slice(buffer.message.get_ref()), + ); } - for (client_id, message) in messages { - set.p1() - .send_message(client_id, REPLICATION_CHANNEL_ID, message); - } + Ok(()) } fn reset_system(mut server_ticks: ResMut) { @@ -173,12 +162,48 @@ impl ServerPlugin { } } -fn collect_changes<'a>( - client_diffs: &mut [ClientDiff<'a>], +/// Initializes buffer for each client and returns it as mutable slice. +/// +/// Reuses already allocated buffers. +/// Creates new buffers if number of clients is bigger then the number of allocated buffers. +/// If there are more buffers than the number of clients, then the extra buffers remain untouched +/// and the returned slice will not include them. +fn prepare_buffers<'a>( + buffers: &'a mut Vec, + server_ticks: &ServerTicks, +) -> Result<&'a mut [ReplicationBuffer], bincode::Error> { + buffers.reserve(server_ticks.acked_ticks.len()); + for (index, (&client_id, &tick)) in server_ticks.acked_ticks.iter().enumerate() { + let system_tick = *server_ticks + .system_ticks + .get(&tick) + .unwrap_or(&Tick::new(0)); + + if let Some(buffer) = buffers.get_mut(index) { + buffer.reset(client_id, system_tick, server_ticks.current_tick)?; + } else { + buffers.push(ReplicationBuffer::new( + client_id, + system_tick, + server_ticks.current_tick, + )?); + } + } + + Ok(&mut buffers[..server_ticks.acked_ticks.len()]) +} + +/// Collect component changes into buffers based on last acknowledged tick. +fn collect_changes( + buffers: &mut [ReplicationBuffer], + world: &World, system_tick: Tick, - world: &'a World, replication_rules: &ReplicationRules, -) { +) -> Result<(), bincode::Error> { + for buffer in &mut *buffers { + buffer.start_array(); + } + for archetype in world .archetypes() .iter() @@ -192,143 +217,132 @@ fn collect_changes<'a>( .get(archetype.table_id()) .expect("archetype should be valid"); - for component_id in archetype.components() { - let Some(replication_id) = replication_rules.get_id(component_id) else { - continue; - }; - let replication_info = replication_rules.get_info(replication_id); - if archetype.contains(replication_info.ignored_id) { - continue; + for archetype_entity in archetype.entities() { + for buffer in &mut *buffers { + buffer.start_entity_map(); } - let storage_type = archetype - .get_storage_type(component_id) - .unwrap_or_else(|| panic!("{component_id:?} be in archetype")); - - match storage_type { - StorageType::Table => { - collect_table_components( - client_diffs, - system_tick, - table, - archetype, - replication_id, - component_id, - ); + for component_id in archetype.components() { + let Some(replication_id) = replication_rules.get_id(component_id) else { + continue; + }; + let replication_info = replication_rules.get_info(replication_id); + if archetype.contains(replication_info.ignored_id) { + continue; } - StorageType::SparseSet => { - collect_sparse_set_components( - client_diffs, - system_tick, - &world.storages().sparse_sets, - archetype, - replication_id, - component_id, - ); + + let storage_type = archetype + .get_storage_type(component_id) + .unwrap_or_else(|| panic!("{component_id:?} be in archetype")); + + match storage_type { + StorageType::Table => { + let column = table + .get_column(component_id) + .unwrap_or_else(|| panic!("{component_id:?} should belong to table")); + + // SAFETY: the table row obtained from the world state. + let ticks = + unsafe { column.get_ticks_unchecked(archetype_entity.table_row()) }; + // SAFETY: component obtained from the archetype. + let component = + unsafe { column.get_data_unchecked(archetype_entity.table_row()) }; + + for buffer in &mut *buffers { + if ticks.is_changed(buffer.system_tick, system_tick) { + buffer.write_change(replication_info, replication_id, component)?; + } + } + } + StorageType::SparseSet => { + let sparse_set = world + .storages() + .sparse_sets + .get(component_id) + .unwrap_or_else(|| panic!("{component_id:?} should be in sparse set")); + + let entity = archetype_entity.entity(); + let ticks = sparse_set + .get_ticks(entity) + .unwrap_or_else(|| panic!("{entity:?} should have {component_id:?}")); + let component = sparse_set + .get(entity) + .unwrap_or_else(|| panic!("{entity:?} should have {component_id:?}")); + + for buffer in &mut *buffers { + if ticks.is_changed(buffer.system_tick, system_tick) { + buffer.write_change(replication_info, replication_id, component)?; + } + } + } } } - } - } -} -fn collect_table_components<'a>( - client_diffs: &mut [ClientDiff<'a>], - system_tick: Tick, - table: &'a Table, - archetype: &Archetype, - replication_id: ReplicationId, - component_id: ComponentId, -) { - let column = table - .get_column(component_id) - .unwrap_or_else(|| panic!("{component_id:?} should belong to table")); - - for archetype_entity in archetype.entities() { - // SAFETY: the table row obtained from the world state. - let ticks = unsafe { column.get_ticks_unchecked(archetype_entity.table_row()) }; - // SAFETY: component obtained from the archetype. - let component = unsafe { column.get_data_unchecked(archetype_entity.table_row()) }; - - for client_diff in &mut *client_diffs { - if ticks.is_changed(client_diff.system_tick, system_tick) { - client_diff - .world_diff - .entities - .entry(archetype_entity.entity()) - .or_default() - .push(ComponentDiff::Changed((replication_id, component))); + for buffer in &mut *buffers { + buffer.end_entity_map(archetype_entity.entity())?; } } } -} -fn collect_sparse_set_components<'a>( - client_diffs: &mut [ClientDiff<'a>], - system_tick: Tick, - sparse_sets: &'a SparseSets, - archetype: &Archetype, - replication_id: ReplicationId, - component_id: ComponentId, -) { - let sparse_set = sparse_sets - .get(component_id) - .unwrap_or_else(|| panic!("{component_id:?} should belong to sparse set")); - - for archetype_entity in archetype.entities() { - let entity = archetype_entity.entity(); - let ticks = sparse_set - .get_ticks(entity) - .unwrap_or_else(|| panic!("{entity:?} should have {component_id:?}")); - let component = sparse_set - .get(entity) - .unwrap_or_else(|| panic!("{entity:?} should have {component_id:?}")); - - for client_diff in &mut *client_diffs { - if ticks.is_changed(client_diff.system_tick, system_tick) { - client_diff - .world_diff - .entities - .entry(entity) - .or_default() - .push(ComponentDiff::Changed((replication_id, component))); - } - } + for buffer in &mut *buffers { + buffer.end_array()?; } + + Ok(()) } +/// Collect component removals into buffers based on last acknowledged tick. fn collect_removals( - client_diffs: &mut [ClientDiff], - system_tick: Tick, + buffers: &mut [ReplicationBuffer], removal_trackers: &Query<(Entity, &RemovalTracker)>, -) { + system_tick: Tick, +) -> Result<(), bincode::Error> { + for buffer in &mut *buffers { + buffer.start_array(); + } + for (entity, removal_tracker) in removal_trackers { - for client_diff in &mut *client_diffs { + for buffer in &mut *buffers { + buffer.start_entity_map(); for (&replication_id, &tick) in &removal_tracker.0 { - if tick.is_newer_than(client_diff.system_tick, system_tick) { - client_diff - .world_diff - .entities - .entry(entity) - .or_default() - .push(ComponentDiff::Removed(replication_id)); + if tick.is_newer_than(buffer.system_tick, system_tick) { + buffer.write_removal(replication_id)?; } } + buffer.end_entity_map(entity)?; } } + + for buffer in &mut *buffers { + buffer.end_array()?; + } + + Ok(()) } +/// Collect entity despawns into buffers based on last acknowledged tick. fn collect_despawns( - client_diffs: &mut [ClientDiff], - system_tick: Tick, + buffers: &mut [ReplicationBuffer], despawn_tracker: &DespawnTracker, -) { + system_tick: Tick, +) -> Result<(), bincode::Error> { + for buffer in &mut *buffers { + buffer.start_array(); + } + for &(entity, tick) in &despawn_tracker.despawns { - for client_diff in &mut *client_diffs { - if tick.is_newer_than(client_diff.system_tick, system_tick) { - client_diff.world_diff.despawns.push(entity); + for buffer in &mut *buffers { + if tick.is_newer_than(buffer.system_tick, system_tick) { + buffer.write_despawn(entity)?; } } } + + for buffer in &mut *buffers { + buffer.end_array()?; + } + + Ok(()) } /// Condition that returns `true` for server or in singleplayer and `false` for client. @@ -400,9 +414,180 @@ impl ServerTicks { } } -/// Intermediate struct that contains necessary information to create and send [`WorldDiff`]. -struct ClientDiff<'a> { +/// A reusable buffer for a client's replicated entities. +/// +/// Up to [`u16::MAX`] entities may be replicated. +struct ReplicationBuffer { + /// ID of a client for which this buffer is written. client_id: u64, + + /// Last system tick acknowledged by the client. + /// + /// Used for changes preparation. system_tick: Tick, - world_diff: WorldDiff<'a>, + + /// Buffer with serialized data. + message: Cursor>, + + /// Position of the array from last call of [`Self::start_array`]. + array_pos: u64, + + /// Length of the array that updated automatically after writing data. + array_len: u16, + + /// Position of the entity map from last call of [`Self::start_entity_map`]. + entity_map_pos: u64, + + /// Length of the map that updated automatically after writing data. + entity_map_len: u8, +} + +impl ReplicationBuffer { + /// Creates a new buffer with assigned client ID and acknowledged system tick + /// and writes current server tick into buffer data. + fn new( + client_id: u64, + system_tick: Tick, + current_tick: NetworkTick, + ) -> Result { + let mut message = Default::default(); + bincode::serialize_into(&mut message, ¤t_tick)?; + Ok(Self { + client_id, + system_tick, + message, + array_pos: Default::default(), + array_len: Default::default(), + entity_map_pos: Default::default(), + entity_map_len: Default::default(), + }) + } + + /// Reassigns current client ID and acknowledged system tick to the buffer + /// and replaces buffer data with current server tick. + /// + /// Keeps allocated capacity of the buffer data. + fn reset( + &mut self, + client_id: u64, + system_tick: Tick, + current_tick: NetworkTick, + ) -> Result<(), bincode::Error> { + self.client_id = client_id; + self.system_tick = system_tick; + self.message.set_position(0); + self.message.get_mut().clear(); + bincode::serialize_into(&mut self.message, ¤t_tick)?; + + Ok(()) + } + + /// Starts writing array by remembering its position to write length after. + /// + /// Length will be increased automatically after writing data. + /// See [`Self::end_array`]. + fn start_array(&mut self) { + debug_assert_eq!(self.array_len, 0); + + self.array_pos = self.message.position(); + self.message + .set_position(self.array_pos + mem::size_of_val(&self.array_len) as u64); + } + + /// Ends writing array by writing its length into the last remembered position. + /// + /// See also [`Self::start_array`]. + fn end_array(&mut self) -> Result<(), bincode::Error> { + if self.array_len != 0 { + let previous_pos = self.message.position(); + self.message.set_position(self.array_pos); + + bincode::serialize_into(&mut self.message, &self.array_len)?; + + self.message.set_position(previous_pos); + self.array_len = 0; + } else { + self.message.set_position(self.array_pos); + bincode::serialize_into(&mut self.message, &self.array_len)?; + } + + Ok(()) + } + + /// Starts writing array by remembering its position to write length and [`Entity`] after. + /// + /// Length will be increased automatically after writing data. + /// See [`Self::end_entity_map`]. + fn start_entity_map(&mut self) { + debug_assert_eq!(self.entity_map_len, 0); + + let size = mem::size_of::() + mem::size_of_val(&self.entity_map_len); + self.entity_map_pos = self.message.position(); + self.message.set_position(self.entity_map_pos + size as u64); + } + + /// Ends writing array by writing its length and associated [`Entity`] into the last remembered position. + /// + /// If map is empty, only map length will be written. + /// See also [`Self::start_array`]. + fn end_entity_map(&mut self, entity: Entity) -> Result<(), bincode::Error> { + if self.entity_map_len != 0 { + let previous_pos = self.message.position(); + self.message.set_position(self.entity_map_pos); + + bincode::serialize_into(&mut self.message, &entity)?; + bincode::serialize_into(&mut self.message, &self.entity_map_len)?; + + self.message.set_position(previous_pos); + self.entity_map_len = 0; + self.array_len = self + .array_len + .checked_add(1) + .ok_or(bincode::ErrorKind::SizeLimit)?; + } else { + self.message.set_position(self.entity_map_pos); + bincode::serialize_into(&mut self.message, &self.entity_map_len)?; + } + + Ok(()) + } + + /// Serializes [`ReplicationId`] and component into the buffer data. + /// + /// Increases map length by 1. + fn write_change( + &mut self, + replication_info: &ReplicationInfo, + replication_id: ReplicationId, + ptr: Ptr, + ) -> Result<(), bincode::Error> { + bincode::serialize_into(&mut self.message, &replication_id)?; + (replication_info.serialize)(ptr, &mut self.message)?; + self.entity_map_len += 1; + + Ok(()) + } + + /// Serializes [`ReplicationId`] of the removed component into the buffer data. + /// + /// Increases map length by 1. + fn write_removal(&mut self, replication_id: ReplicationId) -> Result<(), bincode::Error> { + bincode::serialize_into(&mut self.message, &replication_id)?; + self.entity_map_len += 1; + + Ok(()) + } + + /// Serializes despawned [`Entity`]. + /// + /// Increases array length by 1. + fn write_despawn(&mut self, entity: Entity) -> Result<(), bincode::Error> { + bincode::serialize_into(&mut self.message, &entity)?; + self.array_len = self + .array_len + .checked_add(1) + .ok_or(bincode::ErrorKind::SizeLimit)?; + + Ok(()) + } }