diff --git a/src/server/replication_messages.rs b/src/server/replication_messages.rs index 80b54cce..468ecadf 100644 --- a/src/server/replication_messages.rs +++ b/src/server/replication_messages.rs @@ -1,23 +1,22 @@ +pub(super) mod init_message; +pub(super) mod update_message; + use std::{ io::{Cursor, Write}, mem, time::Duration, }; -use bevy::{ecs::component::Tick, prelude::*, ptr::Ptr}; -use bincode::{DefaultOptions, Options}; -use bytes::Bytes; +use bevy::{ecs::component::Tick, prelude::*}; use varint_rs::VarintWriter; -use super::client_entity_map::ClientMapping; use crate::core::{ - channels::ReplicationChannel, - ctx::SerializeCtx, replicated_clients::{ClientBuffers, ReplicatedClient, ReplicatedClients}, - replication_registry::{component_fns::ComponentFns, rule_fns::UntypedRuleFns, FnsId}, replicon_server::RepliconServer, replicon_tick::RepliconTick, }; +use init_message::InitMessage; +use update_message::UpdateMessage; /// Accumulates replication messages and sends them to clients. /// @@ -96,539 +95,6 @@ impl ReplicationMessages { } } -/// A reusable message with replicated data. -/// -/// Contains tick and mappings, insertions, removals and despawns that -/// happened on this tick. -/// Sent over [`ReplicationChannel::Init`] channel. -/// -/// See also [Limits](../index.html#limits) -pub(super) struct InitMessage { - /// Serialized data. - cursor: Cursor>, - - /// Length of the array that updated automatically after writing data. - array_len: u16, - - /// Position of the array from last call of [`Self::start_array`]. - array_pos: u64, - - /// The number of empty arrays at the end. - trailing_empty_arrays: usize, - - /// Entity from last call of [`Self::start_entity_data`]. - data_entity: Entity, - - /// Size in bytes of the component data stored for the currently-being-written entity. - entity_data_size: u16, - - /// Position of entity from last call of [`Self::start_entity_data`]. - entity_data_pos: u64, - - /// Position of entity data length from last call of [`Self::write_data_entity`]. - entity_data_size_pos: u64, -} - -impl InitMessage { - /// Clears the message. - /// - /// Keeps allocated capacity for reuse. - fn reset(&mut self) { - self.cursor.set_position(0); - self.trailing_empty_arrays = 0; - } - - /// Returns size in bytes of the current entity data. - /// - /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. - pub(super) fn entity_data_size(&self) -> u16 { - self.entity_data_size - } - - /// Starts writing array by remembering its position to write length after. - /// - /// Arrays can contain entity data or despawns inside. - /// See also [`Self::end_array`], [`Self::write_client_mapping`], [`Self::write_entity`] and [`Self::start_entity_data`]. - pub(super) fn start_array(&mut self) { - debug_assert_eq!(self.array_len, 0); - - self.array_pos = self.cursor.position(); - self.cursor - .set_position(self.array_pos + mem::size_of_val(&self.array_len) as u64); - } - - /// Ends writing array by writing its length into the last remembered position. - /// - /// See also [`Self::start_array`]. - pub(super) fn end_array(&mut self) -> bincode::Result<()> { - if self.array_len != 0 { - let previous_pos = self.cursor.position(); - self.cursor.set_position(self.array_pos); - - bincode::serialize_into(&mut self.cursor, &self.array_len)?; - - self.cursor.set_position(previous_pos); - self.array_len = 0; - self.trailing_empty_arrays = 0; - } else { - self.trailing_empty_arrays += 1; - self.cursor.set_position(self.array_pos); - bincode::serialize_into(&mut self.cursor, &self.array_len)?; - } - - Ok(()) - } - - /// Serializes entity to entity mapping as an array element. - /// - /// Should be called only inside an array and increases its length by 1. - /// See also [`Self::start_array`]. - pub(super) fn write_client_mapping(&mut self, mapping: &ClientMapping) -> bincode::Result<()> { - serialize_entity(&mut self.cursor, mapping.server_entity)?; - serialize_entity(&mut self.cursor, mapping.client_entity)?; - self.array_len = self - .array_len - .checked_add(1) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) - } - - /// Serializes entity as an array element. - /// - /// Reuses previously shared bytes if they exist, or updates them. - /// Should be called only inside an array and increases its length by 1. - /// See also [`Self::start_array`]. - pub(super) fn write_entity<'a>( - &'a mut self, - shared_bytes: &mut Option<&'a [u8]>, - entity: Entity, - ) -> bincode::Result<()> { - write_with(shared_bytes, &mut self.cursor, |cursor| { - serialize_entity(cursor, entity) - })?; - - self.array_len = self - .array_len - .checked_add(1) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) - } - - /// Starts writing entity and its data as an array element. - /// - /// Should be called only inside an array and increases its length by 1. - /// Data can contain components with their IDs or IDs only. - /// Entity will be written lazily after first data write. - /// See also [`Self::end_entity_data`] and [`Self::write_component`]. - pub(super) fn start_entity_data(&mut self, entity: Entity) { - debug_assert_eq!(self.entity_data_size, 0); - - self.data_entity = entity; - self.entity_data_pos = self.cursor.position(); - } - - /// Writes entity for the current data and remembers the position after it to write length later. - /// - /// Should be called only after first data write. - fn write_data_entity(&mut self) -> bincode::Result<()> { - serialize_entity(&mut self.cursor, self.data_entity)?; - self.entity_data_size_pos = self.cursor.position(); - self.cursor.set_position( - self.entity_data_size_pos + mem::size_of_val(&self.entity_data_size) as u64, - ); - - Ok(()) - } - - /// Ends writing entity data by writing its length into the last remembered position. - /// - /// If the entity data is empty, nothing will be written unless `save_empty` is set to true. - /// Should be called only inside an array and increases its length by 1. - /// See also [`Self::start_array`], [`Self::write_component`] and - /// [`Self::write_component_id`]. - pub(super) fn end_entity_data(&mut self, save_empty: bool) -> bincode::Result<()> { - if self.entity_data_size == 0 && !save_empty { - self.cursor.set_position(self.entity_data_pos); - return Ok(()); - } - - if self.entity_data_size == 0 { - self.write_data_entity()?; - } - - let previous_pos = self.cursor.position(); - self.cursor.set_position(self.entity_data_size_pos); - - bincode::serialize_into(&mut self.cursor, &self.entity_data_size)?; - - self.cursor.set_position(previous_pos); - self.entity_data_size = 0; - self.array_len = self - .array_len - .checked_add(1) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) - } - - /// Serializes component and its replication functions ID as an element of entity data. - /// - /// Reuses previously shared bytes if they exist, or updates them. - /// Should be called only inside an entity data and increases its size. - /// See also [`Self::start_entity_data`]. - pub(super) fn write_component<'a>( - &'a mut self, - shared_bytes: &mut Option<&'a [u8]>, - rule_fns: &UntypedRuleFns, - component_fns: &ComponentFns, - ctx: &SerializeCtx, - fns_id: FnsId, - ptr: Ptr, - ) -> bincode::Result<()> { - if self.entity_data_size == 0 { - self.write_data_entity()?; - } - - let size = write_with(shared_bytes, &mut self.cursor, |cursor| { - DefaultOptions::new().serialize_into(&mut *cursor, &fns_id)?; - // SAFETY: `component_fns`, `ptr` and `rule_fns` were created for the same component type. - unsafe { component_fns.serialize(ctx, rule_fns, ptr, cursor) } - })?; - - self.entity_data_size = self - .entity_data_size - .checked_add(size) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) - } - - /// Serializes replication functions ID as an element of entity data. - /// - /// Should be called only inside an entity data and increases its size. - /// See also [`Self::start_entity_data`]. - pub(super) fn write_fns_id(&mut self, fns_id: FnsId) -> bincode::Result<()> { - if self.entity_data_size == 0 { - self.write_data_entity()?; - } - - let previous_pos = self.cursor.position(); - DefaultOptions::new().serialize_into(&mut self.cursor, &fns_id)?; - - let id_size = self.cursor.position() - previous_pos; - self.entity_data_size = self - .entity_data_size - .checked_add(id_size as u16) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) - } - - /// Removes entity data elements from update message and copies it. - /// - /// Ends entity data for the update message. - /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. - pub(super) fn take_entity_data( - &mut self, - update_message: &mut UpdateMessage, - ) -> bincode::Result<()> { - if update_message.entity_data_size != 0 { - if self.entity_data_size == 0 { - self.write_data_entity()?; - } - - let slice = update_message.as_slice(); - let offset = update_message.entity_data_size_pos as usize - + mem::size_of_val(&update_message.entity_data_size); - self.cursor.write_all(&slice[offset..]).unwrap(); - - self.entity_data_size = self - .entity_data_size - .checked_add(update_message.entity_data_size) - .ok_or(bincode::ErrorKind::SizeLimit)?; - update_message.entity_data_size = 0; - } - - update_message - .cursor - .set_position(update_message.entity_data_pos); - - Ok(()) - } - - /// Returns the serialized data, excluding trailing empty arrays, as a byte array. - fn as_slice(&self) -> &[u8] { - let slice = self.cursor.get_ref(); - let position = self.cursor.position() as usize; - let extra_len = self.trailing_empty_arrays * mem::size_of_val(&self.array_len); - &slice[..position - extra_len] - } - - /// Sends the message, excluding trailing empty arrays, to the specified client. - /// - /// Updates change tick for the client if there are data to send. - /// Does nothing if there is no data to send. - fn send( - &self, - server: &mut RepliconServer, - client: &mut ReplicatedClient, - server_tick: RepliconTick, - ) -> bincode::Result<()> { - debug_assert_eq!(self.array_len, 0); - debug_assert_eq!(self.entity_data_size, 0); - - let slice = self.as_slice(); - if slice.is_empty() { - trace!("no init data to send for {:?}", client.id()); - return Ok(()); - } - - client.set_init_tick(server_tick); - - let mut header = [0; mem::size_of::()]; - bincode::serialize_into(&mut header[..], &server_tick)?; - - trace!("sending init message to {:?}", client.id()); - server.send( - client.id(), - ReplicationChannel::Init, - Bytes::from([&header, slice].concat()), - ); - - Ok(()) - } -} - -impl Default for InitMessage { - fn default() -> Self { - Self { - cursor: Default::default(), - array_len: Default::default(), - array_pos: Default::default(), - trailing_empty_arrays: Default::default(), - entity_data_size: Default::default(), - entity_data_pos: Default::default(), - entity_data_size_pos: Default::default(), - data_entity: Entity::PLACEHOLDER, - } - } -} - -/// A reusable message with replicated component updates. -/// -/// Contains change tick, current tick and component updates since the last acknowledged tick for each entity. -/// Cannot be applied on the client until the init message matching this update message's change tick -/// has been applied to the client world. -/// The message will be manually split into packets up to max size, and each packet will be applied -/// independently on the client. -/// Message splits only happen per-entity to avoid weird behavior from partial entity updates. -/// Sent over the [`ReplicationChannel::Update`] channel. -/// -/// See also [Limits](../index.html#limits) -pub(super) struct UpdateMessage { - /// Serialized data. - cursor: Cursor>, - - /// Entities and their sizes in the message with data. - entities: Vec<(Entity, usize)>, - - /// Entity from last call of [`Self::start_entity_data`]. - data_entity: Entity, - - /// Size in bytes of the component data stored for the currently-being-written entity. - entity_data_size: u16, - - /// Position of entity from last call of [`Self::start_entity_data`]. - entity_data_pos: u64, - - /// Position of entity data length from last call of [`Self::write_data_entity`]. - entity_data_size_pos: u64, -} - -impl UpdateMessage { - /// Clears the message. - /// - /// Keeps allocated capacity for reuse. - fn reset(&mut self) { - self.cursor.set_position(0); - self.entities.clear(); - } - - /// Starts writing entity and its data. - /// - /// Data can contain components with their IDs. - /// Entity will be written lazily after first data write. - /// See also [`Self::end_entity_data`] and [`Self::write_component`]. - pub(super) fn start_entity_data(&mut self, entity: Entity) { - debug_assert_eq!(self.entity_data_size, 0); - - self.data_entity = entity; - self.entity_data_pos = self.cursor.position(); - } - - /// Writes entity for the current data and remembers the position after it to write length later. - /// - /// Should be called only after first data write. - fn write_data_entity(&mut self) -> bincode::Result<()> { - serialize_entity(&mut self.cursor, self.data_entity)?; - self.entity_data_size_pos = self.cursor.position(); - self.cursor.set_position( - self.entity_data_size_pos + mem::size_of_val(&self.entity_data_size) as u64, - ); - - Ok(()) - } - - /// Ends writing entity data by writing its length into the last remembered position. - /// - /// If the entity data is empty, nothing will be written and the cursor will reset. - /// See also [`Self::start_array`] and [`Self::write_component`]. - pub(super) fn end_entity_data(&mut self) -> bincode::Result<()> { - if self.entity_data_size == 0 { - self.cursor.set_position(self.entity_data_pos); - return Ok(()); - } - - let previous_pos = self.cursor.position(); - self.cursor.set_position(self.entity_data_size_pos); - - bincode::serialize_into(&mut self.cursor, &self.entity_data_size)?; - - self.cursor.set_position(previous_pos); - - let data_size = self.cursor.position() - self.entity_data_pos; - self.entities.push((self.data_entity, data_size as usize)); - - self.entity_data_size = 0; - - Ok(()) - } - - /// Serializes component and its replication functions ID as an element of entity data. - /// - /// Reuses previously shared bytes if they exist, or updates them. - /// Should be called only inside an entity data and increases its size. - /// See also [`Self::start_entity_data`]. - pub(super) fn write_component<'a>( - &'a mut self, - shared_bytes: &mut Option<&'a [u8]>, - rule_fns: &UntypedRuleFns, - component_fns: &ComponentFns, - ctx: &SerializeCtx, - fns_id: FnsId, - ptr: Ptr, - ) -> bincode::Result<()> { - if self.entity_data_size == 0 { - self.write_data_entity()?; - } - - let size = write_with(shared_bytes, &mut self.cursor, |cursor| { - DefaultOptions::new().serialize_into(&mut *cursor, &fns_id)?; - // SAFETY: `component_fns`, `ptr` and `rule_fns` were created for the same component type. - unsafe { component_fns.serialize(ctx, rule_fns, ptr, cursor) } - })?; - - self.entity_data_size = self - .entity_data_size - .checked_add(size) - .ok_or(bincode::ErrorKind::SizeLimit)?; - - Ok(()) - } - - /// Returns the serialized data as a byte array. - fn as_slice(&self) -> &[u8] { - let slice = self.cursor.get_ref(); - let position = self.cursor.position() as usize; - &slice[..position] - } - - /// Splits message according to entities inside it and sends it to the specified client. - /// - /// Does nothing if there is no data to send. - fn send( - &mut self, - server: &mut RepliconServer, - client_buffers: &mut ClientBuffers, - client: &mut ReplicatedClient, - server_tick: RepliconTick, - tick: Tick, - timestamp: Duration, - ) -> bincode::Result<()> { - debug_assert_eq!(self.entity_data_size, 0); - - let mut slice = self.as_slice(); - if slice.is_empty() { - trace!("no updates to send for {:?}", client.id()); - return Ok(()); - } - - trace!("sending update message(s) to {:?}", client.id()); - const TICKS_SIZE: usize = 2 * mem::size_of::(); - let mut header = [0; TICKS_SIZE + mem::size_of::()]; - bincode::serialize_into(&mut header[..], &(client.init_tick(), server_tick))?; - - let mut message_size = 0; - let client_id = client.id(); - let (mut update_index, mut entities) = - client.register_update(client_buffers, tick, timestamp); - for &(entity, data_size) in &self.entities { - // Try to pack back first, then try to pack forward. - if message_size == 0 - || can_pack(header.len(), message_size, data_size) - || can_pack(header.len(), data_size, message_size) - { - entities.push(entity); - message_size += data_size; - } else { - let (message, remaining) = slice.split_at(message_size); - slice = remaining; - message_size = data_size; - - bincode::serialize_into(&mut header[TICKS_SIZE..], &update_index)?; - - server.send( - client_id, - ReplicationChannel::Update, - Bytes::from([&header, message].concat()), - ); - - if !slice.is_empty() { - (update_index, entities) = - client.register_update(client_buffers, tick, timestamp); - } - } - } - - if !slice.is_empty() { - bincode::serialize_into(&mut header[TICKS_SIZE..], &update_index)?; - - server.send( - client_id, - ReplicationChannel::Update, - Bytes::from([&header, slice].concat()), - ); - } - - Ok(()) - } -} - -impl Default for UpdateMessage { - fn default() -> Self { - Self { - cursor: Default::default(), - entities: Default::default(), - entity_data_size: Default::default(), - entity_data_pos: Default::default(), - entity_data_size_pos: Default::default(), - data_entity: Entity::PLACEHOLDER, - } - } -} - /// Writes new data into a cursor and returns the serialized size. /// /// Reuses previously shared bytes if they exist, or updates them. @@ -661,13 +127,6 @@ fn write_with<'a>( Ok(size) } -fn can_pack(header_size: usize, base: usize, add: usize) -> bool { - const MAX_PACKET_SIZE: usize = 1200; // TODO: make it configurable by the messaging backend. - - let dangling = (base + header_size) % MAX_PACKET_SIZE; - (dangling > 0) && ((dangling + add) <= MAX_PACKET_SIZE) -} - /// Serializes `entity` by writing its index and generation as separate varints. /// /// The index is first prepended with a bit flag to indicate if the generation @@ -686,21 +145,3 @@ fn serialize_entity(cursor: &mut Cursor>, entity: Entity) -> bincode::Re Ok(()) } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn packing() { - assert!(can_pack(10, 0, 5)); - assert!(can_pack(10, 0, 1190)); - assert!(!can_pack(10, 0, 1191)); - assert!(!can_pack(10, 0, 3000)); - - assert!(can_pack(10, 1189, 1)); - assert!(!can_pack(10, 1190, 0)); - assert!(!can_pack(10, 1190, 1)); - assert!(!can_pack(10, 1190, 3000)); - } -} diff --git a/src/server/replication_messages/init_message.rs b/src/server/replication_messages/init_message.rs new file mode 100644 index 00000000..6ff8eccf --- /dev/null +++ b/src/server/replication_messages/init_message.rs @@ -0,0 +1,341 @@ +use std::{ + io::{Cursor, Write}, + mem, +}; + +use bevy::{prelude::*, ptr::Ptr}; +use bincode::{DefaultOptions, Options}; +use bytes::Bytes; + +use super::update_message::UpdateMessage; +use crate::{ + core::{ + channels::ReplicationChannel, + ctx::SerializeCtx, + replicated_clients::ReplicatedClient, + replication_registry::{component_fns::ComponentFns, rule_fns::UntypedRuleFns, FnsId}, + replicon_server::RepliconServer, + replicon_tick::RepliconTick, + }, + server::client_entity_map::ClientMapping, +}; + +/// A reusable message with replicated data. +/// +/// Contains tick and mappings, insertions, removals and despawns that +/// happened on this tick. +/// Sent over [`ReplicationChannel::Init`] channel. +/// +/// See also [Limits](../index.html#limits) +pub(crate) struct InitMessage { + /// Serialized data. + cursor: Cursor>, + + /// Length of the array that updated automatically after writing data. + array_len: u16, + + /// Position of the array from last call of [`Self::start_array`]. + array_pos: u64, + + /// The number of empty arrays at the end. + trailing_empty_arrays: usize, + + /// Entity from last call of [`Self::start_entity_data`]. + data_entity: Entity, + + /// Size in bytes of the component data stored for the currently-being-written entity. + entity_data_size: u16, + + /// Position of entity from last call of [`Self::start_entity_data`]. + entity_data_pos: u64, + + /// Position of entity data length from last call of [`Self::write_data_entity`]. + entity_data_size_pos: u64, +} + +impl InitMessage { + /// Clears the message. + /// + /// Keeps allocated capacity for reuse. + pub(super) fn reset(&mut self) { + self.cursor.set_position(0); + self.trailing_empty_arrays = 0; + } + + /// Returns size in bytes of the current entity data. + /// + /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. + pub(crate) fn entity_data_size(&self) -> u16 { + self.entity_data_size + } + + /// Starts writing array by remembering its position to write length after. + /// + /// Arrays can contain entity data or despawns inside. + /// See also [`Self::end_array`], [`Self::write_client_mapping`], [`Self::write_entity`] and [`Self::start_entity_data`]. + pub(crate) fn start_array(&mut self) { + debug_assert_eq!(self.array_len, 0); + + self.array_pos = self.cursor.position(); + self.cursor + .set_position(self.array_pos + mem::size_of_val(&self.array_len) as u64); + } + + /// Ends writing array by writing its length into the last remembered position. + /// + /// See also [`Self::start_array`]. + pub(crate) fn end_array(&mut self) -> bincode::Result<()> { + if self.array_len != 0 { + let previous_pos = self.cursor.position(); + self.cursor.set_position(self.array_pos); + + bincode::serialize_into(&mut self.cursor, &self.array_len)?; + + self.cursor.set_position(previous_pos); + self.array_len = 0; + self.trailing_empty_arrays = 0; + } else { + self.trailing_empty_arrays += 1; + self.cursor.set_position(self.array_pos); + bincode::serialize_into(&mut self.cursor, &self.array_len)?; + } + + Ok(()) + } + + /// Serializes entity to entity mapping as an array element. + /// + /// Should be called only inside an array and increases its length by 1. + /// See also [`Self::start_array`]. + pub(crate) fn write_client_mapping(&mut self, mapping: &ClientMapping) -> bincode::Result<()> { + super::serialize_entity(&mut self.cursor, mapping.server_entity)?; + super::serialize_entity(&mut self.cursor, mapping.client_entity)?; + self.array_len = self + .array_len + .checked_add(1) + .ok_or(bincode::ErrorKind::SizeLimit)?; + + Ok(()) + } + + /// Serializes entity as an array element. + /// + /// Reuses previously shared bytes if they exist, or updates them. + /// Should be called only inside an array and increases its length by 1. + /// See also [`Self::start_array`]. + pub(crate) fn write_entity<'a>( + &'a mut self, + shared_bytes: &mut Option<&'a [u8]>, + entity: Entity, + ) -> bincode::Result<()> { + super::write_with(shared_bytes, &mut self.cursor, |cursor| { + super::serialize_entity(cursor, entity) + })?; + + self.array_len = self + .array_len + .checked_add(1) + .ok_or(bincode::ErrorKind::SizeLimit)?; + + Ok(()) + } + + /// Starts writing entity and its data as an array element. + /// + /// Should be called only inside an array and increases its length by 1. + /// Data can contain components with their IDs or IDs only. + /// Entity will be written lazily after first data write. + /// See also [`Self::end_entity_data`] and [`Self::write_component`]. + pub(crate) fn start_entity_data(&mut self, entity: Entity) { + debug_assert_eq!(self.entity_data_size, 0); + + self.data_entity = entity; + self.entity_data_pos = self.cursor.position(); + } + + /// Writes entity for the current data and remembers the position after it to write length later. + /// + /// Should be called only after first data write. + fn write_data_entity(&mut self) -> bincode::Result<()> { + super::serialize_entity(&mut self.cursor, self.data_entity)?; + self.entity_data_size_pos = self.cursor.position(); + self.cursor.set_position( + self.entity_data_size_pos + mem::size_of_val(&self.entity_data_size) as u64, + ); + + Ok(()) + } + + /// Ends writing entity data by writing its length into the last remembered position. + /// + /// If the entity data is empty, nothing will be written unless `save_empty` is set to true. + /// Should be called only inside an array and increases its length by 1. + /// See also [`Self::start_array`], [`Self::write_component`] and + /// [`Self::write_component_id`]. + pub(crate) fn end_entity_data(&mut self, save_empty: bool) -> bincode::Result<()> { + if self.entity_data_size == 0 && !save_empty { + self.cursor.set_position(self.entity_data_pos); + return Ok(()); + } + + if self.entity_data_size == 0 { + self.write_data_entity()?; + } + + let previous_pos = self.cursor.position(); + self.cursor.set_position(self.entity_data_size_pos); + + bincode::serialize_into(&mut self.cursor, &self.entity_data_size)?; + + self.cursor.set_position(previous_pos); + self.entity_data_size = 0; + self.array_len = self + .array_len + .checked_add(1) + .ok_or(bincode::ErrorKind::SizeLimit)?; + + Ok(()) + } + + /// Serializes component and its replication functions ID as an element of entity data. + /// + /// Reuses previously shared bytes if they exist, or updates them. + /// Should be called only inside an entity data and increases its size. + /// See also [`Self::start_entity_data`]. + pub(crate) fn write_component<'a>( + &'a mut self, + shared_bytes: &mut Option<&'a [u8]>, + rule_fns: &UntypedRuleFns, + component_fns: &ComponentFns, + ctx: &SerializeCtx, + fns_id: FnsId, + ptr: Ptr, + ) -> bincode::Result<()> { + if self.entity_data_size == 0 { + self.write_data_entity()?; + } + + let size = super::write_with(shared_bytes, &mut self.cursor, |cursor| { + DefaultOptions::new().serialize_into(&mut *cursor, &fns_id)?; + // SAFETY: `component_fns`, `ptr` and `rule_fns` were created for the same component type. + unsafe { component_fns.serialize(ctx, rule_fns, ptr, cursor) } + })?; + + self.entity_data_size = self + .entity_data_size + .checked_add(size) + .ok_or(bincode::ErrorKind::SizeLimit)?; + + Ok(()) + } + + /// Serializes replication functions ID as an element of entity data. + /// + /// Should be called only inside an entity data and increases its size. + /// See also [`Self::start_entity_data`]. + pub(crate) fn write_fns_id(&mut self, fns_id: FnsId) -> bincode::Result<()> { + if self.entity_data_size == 0 { + self.write_data_entity()?; + } + + let previous_pos = self.cursor.position(); + DefaultOptions::new().serialize_into(&mut self.cursor, &fns_id)?; + + let id_size = self.cursor.position() - previous_pos; + self.entity_data_size = self + .entity_data_size + .checked_add(id_size as u16) + .ok_or(bincode::ErrorKind::SizeLimit)?; + + Ok(()) + } + + /// Removes entity data elements from update message and copies it. + /// + /// Ends entity data for the update message. + /// See also [`Self::start_entity_data`] and [`Self::end_entity_data`]. + pub(crate) fn take_entity_data( + &mut self, + update_message: &mut UpdateMessage, + ) -> bincode::Result<()> { + if update_message.entity_data_size != 0 { + if self.entity_data_size == 0 { + self.write_data_entity()?; + } + + let slice = update_message.as_slice(); + let offset = update_message.entity_data_size_pos as usize + + mem::size_of_val(&update_message.entity_data_size); + self.cursor.write_all(&slice[offset..]).unwrap(); + + self.entity_data_size = self + .entity_data_size + .checked_add(update_message.entity_data_size) + .ok_or(bincode::ErrorKind::SizeLimit)?; + update_message.entity_data_size = 0; + } + + update_message + .cursor + .set_position(update_message.entity_data_pos); + + Ok(()) + } + + /// Returns the serialized data, excluding trailing empty arrays, as a byte array. + fn as_slice(&self) -> &[u8] { + let slice = self.cursor.get_ref(); + let position = self.cursor.position() as usize; + let extra_len = self.trailing_empty_arrays * mem::size_of_val(&self.array_len); + &slice[..position - extra_len] + } + + /// Sends the message, excluding trailing empty arrays, to the specified client. + /// + /// Updates change tick for the client if there are data to send. + /// Does nothing if there is no data to send. + pub(super) fn send( + &self, + server: &mut RepliconServer, + client: &mut ReplicatedClient, + server_tick: RepliconTick, + ) -> bincode::Result<()> { + debug_assert_eq!(self.array_len, 0); + debug_assert_eq!(self.entity_data_size, 0); + + let slice = self.as_slice(); + if slice.is_empty() { + trace!("no init data to send for {:?}", client.id()); + return Ok(()); + } + + client.set_init_tick(server_tick); + + let mut header = [0; mem::size_of::()]; + bincode::serialize_into(&mut header[..], &server_tick)?; + + trace!("sending init message to {:?}", client.id()); + server.send( + client.id(), + ReplicationChannel::Init, + Bytes::from([&header, slice].concat()), + ); + + Ok(()) + } +} + +impl Default for InitMessage { + fn default() -> Self { + Self { + cursor: Default::default(), + array_len: Default::default(), + array_pos: Default::default(), + trailing_empty_arrays: Default::default(), + entity_data_size: Default::default(), + entity_data_pos: Default::default(), + entity_data_size_pos: Default::default(), + data_entity: Entity::PLACEHOLDER, + } + } +} diff --git a/src/server/replication_messages/update_message.rs b/src/server/replication_messages/update_message.rs new file mode 100644 index 00000000..0fc2bfd6 --- /dev/null +++ b/src/server/replication_messages/update_message.rs @@ -0,0 +1,252 @@ +use std::{io::Cursor, mem, time::Duration}; + +use bevy::{ecs::component::Tick, prelude::*, ptr::Ptr}; +use bincode::{DefaultOptions, Options}; +use bytes::Bytes; + +use crate::core::{ + channels::ReplicationChannel, + ctx::SerializeCtx, + replicated_clients::{ClientBuffers, ReplicatedClient}, + replication_registry::{component_fns::ComponentFns, rule_fns::UntypedRuleFns, FnsId}, + replicon_server::RepliconServer, + replicon_tick::RepliconTick, +}; + +/// A reusable message with replicated component updates. +/// +/// Contains change tick, current tick and component updates since the last acknowledged tick for each entity. +/// Cannot be applied on the client until the init message matching this update message's change tick +/// has been applied to the client world. +/// The message will be manually split into packets up to max size, and each packet will be applied +/// independently on the client. +/// Message splits only happen per-entity to avoid weird behavior from partial entity updates. +/// Sent over the [`ReplicationChannel::Update`] channel. +/// +/// See also [Limits](../index.html#limits) +pub(crate) struct UpdateMessage { + /// Serialized data. + pub(super) cursor: Cursor>, + + /// Entities and their sizes in the message with data. + entities: Vec<(Entity, usize)>, + + /// Entity from last call of [`Self::start_entity_data`]. + data_entity: Entity, + + /// Size in bytes of the component data stored for the currently-being-written entity. + pub(super) entity_data_size: u16, + + /// Position of entity from last call of [`Self::start_entity_data`]. + pub(super) entity_data_pos: u64, + + /// Position of entity data length from last call of [`Self::write_data_entity`]. + pub(super) entity_data_size_pos: u64, +} + +impl UpdateMessage { + /// Clears the message. + /// + /// Keeps allocated capacity for reuse. + pub(super) fn reset(&mut self) { + self.cursor.set_position(0); + self.entities.clear(); + } + + /// Starts writing entity and its data. + /// + /// Data can contain components with their IDs. + /// Entity will be written lazily after first data write. + /// See also [`Self::end_entity_data`] and [`Self::write_component`]. + pub(crate) fn start_entity_data(&mut self, entity: Entity) { + debug_assert_eq!(self.entity_data_size, 0); + + self.data_entity = entity; + self.entity_data_pos = self.cursor.position(); + } + + /// Writes entity for the current data and remembers the position after it to write length later. + /// + /// Should be called only after first data write. + fn write_data_entity(&mut self) -> bincode::Result<()> { + super::serialize_entity(&mut self.cursor, self.data_entity)?; + self.entity_data_size_pos = self.cursor.position(); + self.cursor.set_position( + self.entity_data_size_pos + mem::size_of_val(&self.entity_data_size) as u64, + ); + + Ok(()) + } + + /// Ends writing entity data by writing its length into the last remembered position. + /// + /// If the entity data is empty, nothing will be written and the cursor will reset. + /// See also [`Self::start_array`] and [`Self::write_component`]. + pub(crate) fn end_entity_data(&mut self) -> bincode::Result<()> { + if self.entity_data_size == 0 { + self.cursor.set_position(self.entity_data_pos); + return Ok(()); + } + + let previous_pos = self.cursor.position(); + self.cursor.set_position(self.entity_data_size_pos); + + bincode::serialize_into(&mut self.cursor, &self.entity_data_size)?; + + self.cursor.set_position(previous_pos); + + let data_size = self.cursor.position() - self.entity_data_pos; + self.entities.push((self.data_entity, data_size as usize)); + + self.entity_data_size = 0; + + Ok(()) + } + + /// Serializes component and its replication functions ID as an element of entity data. + /// + /// Reuses previously shared bytes if they exist, or updates them. + /// Should be called only inside an entity data and increases its size. + /// See also [`Self::start_entity_data`]. + pub(crate) fn write_component<'a>( + &'a mut self, + shared_bytes: &mut Option<&'a [u8]>, + rule_fns: &UntypedRuleFns, + component_fns: &ComponentFns, + ctx: &SerializeCtx, + fns_id: FnsId, + ptr: Ptr, + ) -> bincode::Result<()> { + if self.entity_data_size == 0 { + self.write_data_entity()?; + } + + let size = super::write_with(shared_bytes, &mut self.cursor, |cursor| { + DefaultOptions::new().serialize_into(&mut *cursor, &fns_id)?; + // SAFETY: `component_fns`, `ptr` and `rule_fns` were created for the same component type. + unsafe { component_fns.serialize(ctx, rule_fns, ptr, cursor) } + })?; + + self.entity_data_size = self + .entity_data_size + .checked_add(size) + .ok_or(bincode::ErrorKind::SizeLimit)?; + + Ok(()) + } + + /// Returns the serialized data as a byte array. + pub(super) fn as_slice(&self) -> &[u8] { + let slice = self.cursor.get_ref(); + let position = self.cursor.position() as usize; + &slice[..position] + } + + /// Splits message according to entities inside it and sends it to the specified client. + /// + /// Does nothing if there is no data to send. + pub(super) fn send( + &mut self, + server: &mut RepliconServer, + client_buffers: &mut ClientBuffers, + client: &mut ReplicatedClient, + server_tick: RepliconTick, + tick: Tick, + timestamp: Duration, + ) -> bincode::Result<()> { + debug_assert_eq!(self.entity_data_size, 0); + + let mut slice = self.as_slice(); + if slice.is_empty() { + trace!("no updates to send for {:?}", client.id()); + return Ok(()); + } + + trace!("sending update message(s) to {:?}", client.id()); + const TICKS_SIZE: usize = 2 * mem::size_of::(); + let mut header = [0; TICKS_SIZE + mem::size_of::()]; + bincode::serialize_into(&mut header[..], &(client.init_tick(), server_tick))?; + + let mut message_size = 0; + let client_id = client.id(); + let (mut update_index, mut entities) = + client.register_update(client_buffers, tick, timestamp); + for &(entity, data_size) in &self.entities { + // Try to pack back first, then try to pack forward. + if message_size == 0 + || can_pack(header.len(), message_size, data_size) + || can_pack(header.len(), data_size, message_size) + { + entities.push(entity); + message_size += data_size; + } else { + let (message, remaining) = slice.split_at(message_size); + slice = remaining; + message_size = data_size; + + bincode::serialize_into(&mut header[TICKS_SIZE..], &update_index)?; + + server.send( + client_id, + ReplicationChannel::Update, + Bytes::from([&header, message].concat()), + ); + + if !slice.is_empty() { + (update_index, entities) = + client.register_update(client_buffers, tick, timestamp); + } + } + } + + if !slice.is_empty() { + bincode::serialize_into(&mut header[TICKS_SIZE..], &update_index)?; + + server.send( + client_id, + ReplicationChannel::Update, + Bytes::from([&header, slice].concat()), + ); + } + + Ok(()) + } +} + +impl Default for UpdateMessage { + fn default() -> Self { + Self { + cursor: Default::default(), + entities: Default::default(), + entity_data_size: Default::default(), + entity_data_pos: Default::default(), + entity_data_size_pos: Default::default(), + data_entity: Entity::PLACEHOLDER, + } + } +} + +fn can_pack(header_size: usize, base: usize, add: usize) -> bool { + const MAX_PACKET_SIZE: usize = 1200; // TODO: make it configurable by the messaging backend. + + let dangling = (base + header_size) % MAX_PACKET_SIZE; + (dangling > 0) && ((dangling + add) <= MAX_PACKET_SIZE) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn packing() { + assert!(can_pack(10, 0, 5)); + assert!(can_pack(10, 0, 1190)); + assert!(!can_pack(10, 0, 1191)); + assert!(!can_pack(10, 0, 3000)); + + assert!(can_pack(10, 1189, 1)); + assert!(!can_pack(10, 1190, 0)); + assert!(!can_pack(10, 1190, 1)); + assert!(!can_pack(10, 1190, 3000)); + } +}