diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 401d5740..422f5b7c 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -133,6 +133,6 @@ jobs: name: code-coverage-report - name: Upload to Codecov - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v5 with: token: ${{ secrets.CODECOV_TOKEN }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f28a3b1..1689ef9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,39 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- RTT, bytes per second and packet loss information for `RepliconClient` and `ConnectedClients`. +- `ClientSet::Diagnostics` for systems that collect client diagnostics. + +### Changed + +- Make `core::replication::replication_rules::ReplicationRules` public. +- Various optimizations for replication messages to use fewer bytes. +- Accept `Vec` instead of `Cursor>` for serialization. +- `ConnectedClients` now store `ConnectedClient` instead of `ClientId` with more information about the client. +- All `TestFnsEntityExt` now accept `FnsId`. +- Move replication-related modules from `core` module under `core::replication`. +- Move `Replicated` to the `replication` module. +- Split the `ctx` module and move event-related contexts under `core::events_registry::ctx` and replication-related contexts under `core::replication_registry::ctx`. +- Separate paths from `diagnostics` module by `/` and their parent path now `client/replication` instead of `replication/client`. +- Provide replication statistics by sum instead of per second and use `usize` for it. +- Rename `ServerPlugin::change_timeout` into `ServerPlugin::mutations_timeout`. +- Rename `ServerInitTick` into `ServerChangeTick`. +- Rename `ReplicatedClient::init_tick` into `ReplicatedClient::change_tick`. +- Rename `ReplicatedClient::get_change_tick` into `ReplicatedClient::mutation_tick`. +- Rename `ReplicationChannel::Init` into `ReplicationChannel::Changes`. +- Rename `ReplicationChannel::Update` into `ReplicationChannel::Mutations`. +- Rename `ClientStats` into `ClientReplicationStats`. +- Rename `ClientDiagnosticsPlugin::MESSAGES` into `ClientDiagnosticsPlugin::REPLICATION_MESSAGES`. +- Rename `ClientDiagnosticsPlugin::BYTES` into `ClientDiagnosticsPlugin::REPLICATION_BYTES`. +- Rename `ClientDiagnosticsPlugin::ENTITY_CHANGES` into `ClientDiagnosticsPlugin::ENTITIES_CHANGED`. +- Rename `ClientDiagnosticsPlugin::COMPONENT_CHANGES` into `ClientDiagnosticsPlugin::COMPONENTS_CHANGED`. + +### Removed + +- `FnsInfo`, use `(ComponentId, FnsId)` instead. + ## [0.28.4] - 2024-10-15 ### Added diff --git a/Cargo.toml b/Cargo.toml index f79195bc..eccae40b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,8 +31,9 @@ bevy = { version = "0.15.0", default-features = false, features = [ bytes = "1.5" bincode = "1.3" serde = "1.0" -varint-rs = "2.2" +integer-encoding = "4.0" ordered-multimap = "0.7" +bitflags = "2.6" [dev-dependencies] bevy = { version = "0.15.0", default-features = false, features = [ @@ -68,7 +69,7 @@ name = "replication" harness = false [[test]] -name = "changes" +name = "mutations" required-features = ["client", "server"] [[test]] diff --git a/README.md b/README.md index 8c8c40b3..8da4673d 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,7 @@ Have any questions? Feel free to ask in the dedicated [`bevy_replicon` channel]( - [`bevy_replicon_renet`](https://github.com/projectharmonia/bevy_replicon_renet) - integration for [`bevy_renet`](https://github.com/lucaspoffo/renet/tree/master/bevy_renet). Maintained by the authors of this crate. - [`bevy_replicon_renet2`](https://github.com/UkoeHB/renet2/tree/main/bevy_replicon_renet2) - integration for [`bevy_renet2`](https://github.com/UkoeHB/renet2/tree/main/bevy_renet2). Includes a WebTransport backend for browsers, and enables servers that can manage multi-platform clients simultaneously. - [`bevy_replicon_quinnet`](https://github.com/Henauxg/bevy_quinnet/tree/main/bevy_replicon_quinnet) - integration for [`bevy_quinnet`](https://github.com/Henauxg/bevy_quinnet). +- [`aeronet_replicon`](https://github.com/aecsocket/aeronet/tree/main/crates/aeronet_replicon) - integration for [`aeronet`](https://github.com/aecsocket/aeronet). Works on any IO layer supported by `aeronet_io`, but requires `aeronet_transport`. #### Helpers diff --git a/benches/replication.rs b/benches/replication.rs index a8a746b7..130be87c 100644 --- a/benches/replication.rs +++ b/benches/replication.rs @@ -49,7 +49,7 @@ fn replication(c: name = &name[MODULE_PREFIX_LEN..]; for clients in [1, 20] { - c.bench_function(&format!("{name}, init send, {clients} client(s)"), |b| { + c.bench_function(&format!("{name}, changes send, {clients} client(s)"), |b| { b.iter_custom(|iter| { let mut elapsed = Duration::ZERO; for _ in 0..iter { @@ -82,53 +82,56 @@ fn replication(c: }) }); - c.bench_function(&format!("{name}, update send, {clients} client(s)"), |b| { - b.iter_custom(|iter| { - let mut server_app = create_app::(); - let mut client_apps = Vec::new(); - for _ in 0..clients { - client_apps.push(create_app::()); - } - - for client_app in &mut client_apps { - server_app.connect_client(client_app); - } - - server_app - .world_mut() - .spawn_batch(vec![(Replicated, C::default()); ENTITIES as usize]); - let mut query = server_app.world_mut().query::<&mut C>(); - - server_app.update(); - for client_app in &mut client_apps { - server_app.exchange_with_client(client_app); - client_app.update(); - assert_eq!(client_app.world().entities().len(), ENTITIES); - } + c.bench_function( + &format!("{name}, mutations send, {clients} client(s)"), + |b| { + b.iter_custom(|iter| { + let mut server_app = create_app::(); + let mut client_apps = Vec::new(); + for _ in 0..clients { + client_apps.push(create_app::()); + } - let mut elapsed = Duration::ZERO; - for _ in 0..iter { - for mut component in query.iter_mut(server_app.world_mut()) { - component.set_changed(); + for client_app in &mut client_apps { + server_app.connect_client(client_app); } - let instant = Instant::now(); - server_app.update(); - elapsed += instant.elapsed(); + server_app + .world_mut() + .spawn_batch(vec![(Replicated, C::default()); ENTITIES as usize]); + let mut query = server_app.world_mut().query::<&mut C>(); + server_app.update(); for client_app in &mut client_apps { server_app.exchange_with_client(client_app); client_app.update(); assert_eq!(client_app.world().entities().len(), ENTITIES); } - } - elapsed - }) - }); + let mut elapsed = Duration::ZERO; + for _ in 0..iter { + for mut component in query.iter_mut(server_app.world_mut()) { + component.set_changed(); + } + + let instant = Instant::now(); + server_app.update(); + elapsed += instant.elapsed(); + + for client_app in &mut client_apps { + server_app.exchange_with_client(client_app); + client_app.update(); + assert_eq!(client_app.world().entities().len(), ENTITIES); + } + } + + elapsed + }) + }, + ); } - c.bench_function(&format!("{name}, init receive"), |b| { + c.bench_function(&format!("{name}, changes receive"), |b| { b.iter_custom(|iter| { let mut elapsed = Duration::ZERO; for _ in 0..iter { @@ -154,7 +157,7 @@ fn replication(c: }) }); - c.bench_function(&format!("{name}, update receive"), |b| { + c.bench_function(&format!("{name}, mutations receive"), |b| { b.iter_custom(|iter| { let mut server_app = create_app::(); let mut client_app = create_app::(); diff --git a/src/client.rs b/src/client.rs index 272c17f9..547dd8b7 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,19 +8,24 @@ use std::{io::Cursor, mem}; use bevy::{ecs::world::CommandQueue, prelude::*}; use bincode::{DefaultOptions, Options}; use bytes::Bytes; -use varint_rs::VarintReader; +use integer_encoding::{FixedIntReader, VarIntReader}; use crate::core::{ channels::{ReplicationChannel, RepliconChannels}, - command_markers::{CommandMarkers, EntityMarkers}, common_conditions::{client_connected, client_just_connected, client_just_disconnected}, - ctx::{DespawnCtx, RemoveCtx, WriteCtx}, - deferred_entity::DeferredEntity, - replication_registry::ReplicationRegistry, + replication::{ + change_message_flags::ChangeMessageFlags, + command_markers::{CommandMarkers, EntityMarkers}, + deferred_entity::DeferredEntity, + replication_registry::{ + ctx::{DespawnCtx, RemoveCtx, WriteCtx}, + ReplicationRegistry, + }, + Replicated, + }, replicon_client::RepliconClient, replicon_tick::RepliconTick, server_entity_map::ServerEntityMap, - Replicated, }; use confirm_history::ConfirmHistory; @@ -33,8 +38,8 @@ impl Plugin for ClientPlugin { fn build(&self, app: &mut App) { app.init_resource::() .init_resource::() - .init_resource::() - .init_resource::() + .init_resource::() + .init_resource::() .configure_sets( PreUpdate, ( @@ -44,7 +49,7 @@ impl Plugin for ClientPlugin { ClientSet::Reset.run_if(client_just_disconnected), ), ClientSet::Receive, - ClientSet::SyncHierarchy, + (ClientSet::Diagnostics, ClientSet::SyncHierarchy), ) .chain(), ) @@ -71,18 +76,18 @@ impl ClientPlugin { /// Receives and applies replication messages from the server. /// - /// Tick init messages are sent over the [`ReplicationChannel::Init`] and are applied first to ensure valid state - /// for entity updates. + /// Change messages are sent over the [`ReplicationChannel::Changes`] and are applied first to ensure valid state + /// for component mutations. /// - /// Entity update messages are sent over [`ReplicationChannel::Update`], which means they may appear - /// ahead-of or behind init messages from the same server tick. An update will only be applied if its - /// change tick has already appeared in an init message, otherwise it will be buffered while waiting. - /// Since entity updates can arrive in any order, updates will only be applied if they correspond to a more + /// Mutate messages are sent over [`ReplicationChannel::Mutations`], which means they may appear + /// ahead-of or behind change messages from the same server tick. A mutation will only be applied if its + /// change tick has already appeared in an change message, otherwise it will be buffered while waiting. + /// Since component mutations can arrive in any order, they will only be applied if they correspond to a more /// recent server tick than the last acked server tick for each entity. /// - /// Buffered entity update messages are processed last. + /// Buffered mutate messages are processed last. /// - /// Acknowledgments for received entity update messages are sent back to the server. + /// Acknowledgments for received mutate messages are sent back to the server. /// /// See also [`ReplicationMessages`](crate::server::replication_messages::ReplicationMessages). pub(super) fn receive_replication( @@ -92,10 +97,10 @@ impl ClientPlugin { ) -> bincode::Result<()> { world.resource_scope(|world, mut client: Mut| { world.resource_scope(|world, mut entity_map: Mut| { - world.resource_scope(|world, mut buffered_updates: Mut| { + world.resource_scope(|world, mut buffered_mutations: Mut| { world.resource_scope(|world, command_markers: Mut| { world.resource_scope(|world, registry: Mut| { - let mut stats = world.remove_resource::(); + let mut stats = world.remove_resource::(); let mut params = ReceiveParams { queue: &mut queue, entity_markers: &mut entity_markers, @@ -109,7 +114,7 @@ impl ClientPlugin { world, &mut params, &mut client, - &mut buffered_updates, + &mut buffered_mutations, )?; if let Some(stats) = stats { @@ -125,148 +130,182 @@ impl ClientPlugin { } fn reset( - mut init_tick: ResMut, + mut change_tick: ResMut, mut entity_map: ResMut, - mut buffered_updates: ResMut, + mut buffered_mutations: ResMut, + mut stats: ResMut, ) { - *init_tick = Default::default(); + *change_tick = Default::default(); entity_map.clear(); - buffered_updates.clear(); + buffered_mutations.clear(); + *stats = Default::default(); } } /// Reads all received messages and applies them. /// -/// Sends acknowledgments for update messages back. +/// Sends acknowledgments for mutate messages back. fn apply_replication( world: &mut World, params: &mut ReceiveParams, client: &mut RepliconClient, - buffered_updates: &mut BufferedUpdates, + buffered_mutations: &mut BufferedMutations, ) -> bincode::Result<()> { - for message in client.receive(ReplicationChannel::Init) { - apply_init_message(world, params, &message)?; + for message in client.receive(ReplicationChannel::Changes) { + apply_change_message(world, params, &message)?; } - // Unlike init messages, we read all updates first, sort them by tick - // in descending order to ensure that the last update will be applied first. - // Since update messages manually split by packet size, we apply all messages, + // Unlike change messages, we read all mutate messages first, sort them by tick + // in descending order to ensure that the last mutation will be applied first. + // Since mutate messages manually split by packet size, we apply all messages, // but skip outdated data per-entity by checking last received tick for it // (unless user requested history via marker). - let init_tick = *world.resource::(); - let acks_size = mem::size_of::() * client.received_count(ReplicationChannel::Update); + let change_tick = *world.resource::(); + let acks_size = mem::size_of::() * client.received_count(ReplicationChannel::Mutations); if acks_size != 0 { let mut acks = Vec::with_capacity(acks_size); - for message in client.receive(ReplicationChannel::Update) { - let update_index = read_update_message(params, buffered_updates, message)?; - bincode::serialize_into(&mut acks, &update_index)?; + for message in client.receive(ReplicationChannel::Mutations) { + let mutate_index = buffer_mutate_message(params, buffered_mutations, message)?; + bincode::serialize_into(&mut acks, &mutate_index)?; } - client.send(ReplicationChannel::Init, acks); + client.send(ReplicationChannel::Changes, acks); } - apply_update_messages(world, params, buffered_updates, init_tick) + apply_mutate_messages(world, params, buffered_mutations, change_tick) } -/// Applies [`InitMessage`](crate::server::replication_messages::InitMessage). -fn apply_init_message( +/// Reads and applies a change message. +/// +/// For details see [`replication_messages`](crate::server::replication_messages). +fn apply_change_message( world: &mut World, params: &mut ReceiveParams, message: &[u8], ) -> bincode::Result<()> { - let end_pos: u64 = message.len().try_into().unwrap(); + let end_pos = message.len(); let mut cursor = Cursor::new(message); if let Some(stats) = &mut params.stats { stats.messages += 1; stats.bytes += end_pos; } - let message_tick = bincode::deserialize_from(&mut cursor)?; - trace!("applying init message for {message_tick:?}"); - world.resource_mut::().0 = message_tick; - debug_assert!(cursor.position() < end_pos, "init message can't be empty"); + let flags = ChangeMessageFlags::from_bits_retain(cursor.read_fixedint()?); + debug_assert!(!flags.is_empty(), "message can't be empty"); - apply_entity_mappings(world, params, &mut cursor)?; - if cursor.position() == end_pos { - return Ok(()); - } + let message_tick = bincode::deserialize_from(&mut cursor)?; + trace!("applying change message for {message_tick:?}"); + world.resource_mut::().0 = message_tick; - apply_despawns(world, params, &mut cursor, message_tick)?; - if cursor.position() == end_pos { - return Ok(()); - } + let last_flag = flags.last(); + for (_, flag) in flags.iter_names() { + let array_kind = if flag != last_flag { + ArrayKind::Sized + } else { + ArrayKind::Dynamic + }; - apply_init_components( - world, - params, - ComponentsKind::Removal, - &mut cursor, - message_tick, - )?; - if cursor.position() == end_pos { - return Ok(()); + match flag { + ChangeMessageFlags::MAPPINGS => { + debug_assert_eq!(array_kind, ArrayKind::Sized); + let len = apply_array(array_kind, &mut cursor, |cursor| { + apply_entity_mapping(world, params, cursor) + })?; + if let Some(stats) = &mut params.stats { + stats.mappings += len; + } + } + ChangeMessageFlags::DESPAWNS => { + let len = apply_array(array_kind, &mut cursor, |cursor| { + apply_despawn(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.despawns += len; + } + } + ChangeMessageFlags::REMOVALS => { + let len = apply_array(array_kind, &mut cursor, |cursor| { + apply_removals(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.entities_changed += len; + } + } + ChangeMessageFlags::CHANGES => { + debug_assert_eq!(array_kind, ArrayKind::Dynamic); + let len = apply_array(array_kind, &mut cursor, |cursor| { + apply_changes(world, params, cursor, message_tick) + })?; + if let Some(stats) = &mut params.stats { + stats.entities_changed += len; + } + } + _ => unreachable!("iteration should yield only named flags"), + } } - apply_init_components( - world, - params, - ComponentsKind::Insert, - &mut cursor, - message_tick, - )?; - Ok(()) } -/// Reads and buffers [`UpdateMessage`](crate::server::replication_messages::UpdateMessage). +/// Reads and buffers mutate message. +/// +/// For details see [`replication_messages`](crate::server::replication_messages). /// -/// Returns update index to be used for acknowledgment. -fn read_update_message( +/// Returns mutate index to be used for acknowledgment. +fn buffer_mutate_message( params: &mut ReceiveParams, - buffered_updates: &mut BufferedUpdates, + buffered_mutations: &mut BufferedMutations, message: Bytes, ) -> bincode::Result { - let end_pos: u64 = message.len().try_into().unwrap(); + let end_pos = message.len(); let mut cursor = Cursor::new(&*message); if let Some(stats) = &mut params.stats { stats.messages += 1; stats.bytes += end_pos; } - let (init_tick, message_tick, update_index) = bincode::deserialize_from(&mut cursor)?; - trace!("received update message for {message_tick:?}"); - buffered_updates.insert(BufferedUpdate { - init_tick, + let change_tick = bincode::deserialize_from(&mut cursor)?; + let message_tick = bincode::deserialize_from(&mut cursor)?; + let mutate_index = cursor.read_varint()?; + trace!("received mutate message for {message_tick:?}"); + buffered_mutations.insert(BufferedMutate { + change_tick, message_tick, message: message.slice(cursor.position() as usize..), }); - Ok(update_index) + Ok(mutate_index) } -/// Applies updates from [`BufferedUpdates`]. +/// Applies mutations from [`BufferedMutations`]. /// -/// If the update message can't be applied yet (because the init message with the +/// If the mutate message can't be applied yet (because the change message with the /// corresponding tick hasn't arrived), it will be kept in the buffer. -fn apply_update_messages( +fn apply_mutate_messages( world: &mut World, params: &mut ReceiveParams, - buffered_updates: &mut BufferedUpdates, - init_tick: ServerInitTick, + buffered_mutations: &mut BufferedMutations, + change_tick: ServerChangeTick, ) -> bincode::Result<()> { let mut result = Ok(()); - buffered_updates.0.retain(|update| { - if update.init_tick > *init_tick { + buffered_mutations.0.retain(|mutate| { + if mutate.change_tick > *change_tick { return true; } - trace!("applying update message for {:?}", update.message_tick); - if let Err(e) = apply_update_components( - world, - params, - &mut Cursor::new(&*update.message), - update.message_tick, - ) { - result = Err(e); + trace!("applying mutate message for {:?}", mutate.message_tick); + let len = apply_array( + ArrayKind::Dynamic, + &mut Cursor::new(&*mutate.message), + |cursor| apply_mutations(world, params, cursor, mutate.message_tick), + ); + + match len { + Ok(len) => { + if let Some(stats) = &mut params.stats { + stats.entities_changed += len; + } + } + Err(e) => result = Err(e), } false @@ -275,237 +314,282 @@ fn apply_update_messages( result } -/// Applies received server mappings from client's pre-spawned entities. -fn apply_entity_mappings( +/// Deserializes and applies server mapping from client's pre-spawned entities. +fn apply_entity_mapping( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, ) -> bincode::Result<()> { - let mappings_len: u16 = bincode::deserialize_from(&mut *cursor)?; - if let Some(stats) = &mut params.stats { - stats.mappings += mappings_len as u32; + let server_entity = deserialize_entity(cursor)?; + let client_entity = deserialize_entity(cursor)?; + + if let Ok(mut entity) = world.get_entity_mut(client_entity) { + debug!("received mapping from {server_entity:?} to {client_entity:?}"); + entity.insert(Replicated); + params.entity_map.insert(server_entity, client_entity); + } else { + // Entity could be despawned on client already. + debug!("received mapping from {server_entity:?} to {client_entity:?}, but the entity doesn't exists"); } - for _ in 0..mappings_len { - let server_entity = deserialize_entity(cursor)?; - let client_entity = deserialize_entity(cursor)?; - - if let Ok(mut entity) = world.get_entity_mut(client_entity) { - debug!("received mapping from {server_entity:?} to {client_entity:?}"); - entity.insert(Replicated); - params.entity_map.insert(server_entity, client_entity); - } else { - // Entity could be despawned on client already. - debug!("received mapping from {server_entity:?} to {client_entity:?}, but the entity doesn't exists"); - } + + Ok(()) +} + +/// Deserializes and applies entity despawn from change message. +fn apply_despawn( + world: &mut World, + params: &mut ReceiveParams, + cursor: &mut Cursor<&[u8]>, + message_tick: RepliconTick, +) -> bincode::Result<()> { + // The entity might have already been despawned because of hierarchy or + // with the last replication message, but the server might not yet have received confirmation + // from the client and could include the deletion in the this message. + let server_entity = deserialize_entity(cursor)?; + if let Some(client_entity) = params + .entity_map + .remove_by_server(server_entity) + .and_then(|entity| world.get_entity_mut(entity).ok()) + { + let ctx = DespawnCtx { message_tick }; + (params.registry.despawn)(&ctx, client_entity); } + Ok(()) } -/// Deserializes replicated components of `components_kind` and applies them to the `world`. -fn apply_init_components( +/// Deserializes and applies component removals for an entity. +fn apply_removals( world: &mut World, params: &mut ReceiveParams, - components_kind: ComponentsKind, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let entities_len: u16 = bincode::deserialize_from(&mut *cursor)?; - for _ in 0..entities_len { - let server_entity = deserialize_entity(cursor)?; - let data_size: u16 = bincode::deserialize_from(&mut *cursor)?; - - let client_entity = params - .entity_map - .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); - - let world_cell = world.as_unsafe_world_cell(); - // SAFETY: have write access and the cell used only to get entities. - let mut client_entity = unsafe { DeferredEntity::new(world_cell, client_entity) }; - let mut commands = Commands::new_from_entities(params.queue, world_cell.entities()); - params - .entity_markers - .read(params.command_markers, &*client_entity); - - if let Some(mut history) = client_entity.get_mut::() { - history.set_last_tick(message_tick); - } else { - commands - .entity(client_entity.id()) - .insert(ConfirmHistory::new(message_tick)); - } + let server_entity = deserialize_entity(cursor)?; - let end_pos = cursor.position() + data_size as u64; - let mut components_len = 0u32; - while cursor.position() < end_pos { - let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; - let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); - match components_kind { - ComponentsKind::Insert => { - let mut ctx = - WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); - - // SAFETY: `rule_fns` and `component_fns` were created for the same type. - unsafe { - component_fns.write( - &mut ctx, - rule_fns, - params.entity_markers, - &mut client_entity, - cursor, - )?; - } - } - ComponentsKind::Removal => { - let mut ctx = RemoveCtx { - commands: &mut commands, - message_tick, - component_id, - }; - component_fns.remove(&mut ctx, params.entity_markers, &mut client_entity); - } - } - components_len += 1; - } + let client_entity = params + .entity_map + .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); - if let Some(stats) = &mut params.stats { - stats.entities_changed += 1; - stats.components_changed += components_len; - } + let mut client_entity = DeferredEntity::new(world, client_entity); + let mut commands = client_entity.commands(params.queue); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + if let Some(mut history) = client_entity.get_mut::() { + history.set_last_tick(message_tick); + } else { + commands + .entity(client_entity.id()) + .insert(ConfirmHistory::new(message_tick)); + } + + let len = apply_array(ArrayKind::Sized, cursor, |cursor| { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, _) = params.registry.get(fns_id); + let mut ctx = RemoveCtx { + commands: &mut commands, + message_tick, + component_id, + }; + component_fns.remove(&mut ctx, params.entity_markers, &mut client_entity); + + Ok(()) + })?; - params.queue.apply(world); + if let Some(stats) = &mut params.stats { + stats.components_changed += len; } + params.queue.apply(world); + Ok(()) } -/// Deserializes despawns and applies them to the `world`. -fn apply_despawns( +/// Deserializes and applies component insertions and/or mutations for an entity. +fn apply_changes( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let entities_len: u16 = bincode::deserialize_from(&mut *cursor)?; - if let Some(stats) = &mut params.stats { - stats.despawns += entities_len as u32; + let server_entity = deserialize_entity(cursor)?; + + let client_entity = params + .entity_map + .get_by_server_or_insert(server_entity, || world.spawn(Replicated).id()); + + let mut client_entity = DeferredEntity::new(world, client_entity); + let mut commands = client_entity.commands(params.queue); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + if let Some(mut history) = client_entity.get_mut::() { + history.set_last_tick(message_tick); + } else { + commands + .entity(client_entity.id()) + .insert(ConfirmHistory::new(message_tick)); } - for _ in 0..entities_len { - // The entity might have already been despawned because of hierarchy or - // with the last replication message, but the server might not yet have received confirmation - // from the client and could include the deletion in the this message. - let server_entity = deserialize_entity(cursor)?; - if let Some(client_entity) = params - .entity_map - .remove_by_server(server_entity) - .and_then(|entity| world.get_entity_mut(entity).ok()) - { - let ctx = DespawnCtx { message_tick }; - (params.registry.despawn)(&ctx, client_entity); + + let len = apply_array(ArrayKind::Sized, cursor, |cursor| { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); + let mut ctx = WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); + + // SAFETY: `rule_fns` and `component_fns` were created for the same type. + unsafe { + component_fns.write( + &mut ctx, + rule_fns, + params.entity_markers, + &mut client_entity, + cursor, + )?; } + + Ok(()) + })?; + + if let Some(stats) = &mut params.stats { + stats.components_changed += len; } + params.queue.apply(world); + Ok(()) } -/// Deserializes replicated component updates and applies them to the `world`. +fn apply_array( + kind: ArrayKind, + cursor: &mut Cursor<&[u8]>, + mut f: impl FnMut(&mut Cursor<&[u8]>) -> bincode::Result<()>, +) -> bincode::Result { + match kind { + ArrayKind::Sized => { + let len = cursor.read_varint()?; + for _ in 0..len { + (f)(cursor)?; + } + + Ok(len) + } + ArrayKind::Dynamic => { + let mut len = 0; + let end = cursor.get_ref().len() as u64; + while cursor.position() < end { + (f)(cursor)?; + len += 1; + } + + Ok(len) + } + } +} + +/// Type of serialized array. +#[derive(PartialEq, Eq, Debug)] +enum ArrayKind { + /// Size is serialized before the array. + Sized, + /// Size is unknown, means that all bytes needs to be consumed. + Dynamic, +} + +/// Deserializes and applies component mutations for all entities. /// /// Consumes all remaining bytes in the cursor. -fn apply_update_components( +fn apply_mutations( world: &mut World, params: &mut ReceiveParams, cursor: &mut Cursor<&[u8]>, message_tick: RepliconTick, ) -> bincode::Result<()> { - let message_end = cursor.get_ref().len() as u64; - while cursor.position() < message_end { - let server_entity = deserialize_entity(cursor)?; - let data_size: u16 = bincode::deserialize_from(&mut *cursor)?; - - let Some(client_entity) = params.entity_map.get_by_server(server_entity) else { - // Update could arrive after a despawn from init message. - debug!("ignoring update received for unknown server's {server_entity:?}"); - cursor.set_position(cursor.position() + data_size as u64); - continue; - }; + let server_entity = deserialize_entity(cursor)?; + let data_size: usize = cursor.read_varint()?; - let world_cell = world.as_unsafe_world_cell(); - // SAFETY: have write access and the cell used only to get entities. - let mut client_entity = unsafe { DeferredEntity::new(world_cell, client_entity) }; - let mut commands = Commands::new_from_entities(params.queue, world_cell.entities()); - params - .entity_markers - .read(params.command_markers, &*client_entity); - - let mut history = client_entity - .get_mut::() - .expect("all entities from update should have confirmed ticks"); - let new_entity = message_tick > history.last_tick(); - if new_entity { - history.set_last_tick(message_tick); - } else { - if !params.entity_markers.need_history() { - trace!( - "ignoring outdated update for client's {:?}", - client_entity.id() - ); - cursor.set_position(cursor.position() + data_size as u64); - continue; - } + let Some(client_entity) = params.entity_map.get_by_server(server_entity) else { + // Mutation could arrive after a despawn from change message. + debug!("ignoring mutations received for unknown server's {server_entity:?}"); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); + }; - let ago = history.last_tick().get().wrapping_sub(message_tick.get()); - if ago >= u64::BITS { - trace!( - "discarding update {ago} ticks old for client's {:?}", - client_entity.id() - ); - cursor.set_position(cursor.position() + data_size as u64); - continue; - } + let mut client_entity = DeferredEntity::new(world, client_entity); + let mut commands = client_entity.commands(params.queue); + params + .entity_markers + .read(params.command_markers, &*client_entity); + + let mut history = client_entity + .get_mut::() + .expect("all entities from mutate message should have confirmed ticks"); + let new_tick = message_tick > history.last_tick(); + if new_tick { + history.set_last_tick(message_tick); + } else { + if !params.entity_markers.need_history() { + trace!( + "ignoring outdated mutations for client's {:?}", + client_entity.id() + ); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); + } - history.set(ago); + let ago = history.last_tick().get().wrapping_sub(message_tick.get()); + if ago >= u64::BITS { + trace!( + "discarding {ago} ticks old mutations for client's {:?}", + client_entity.id() + ); + cursor.set_position(cursor.position() + data_size as u64); + return Ok(()); } - let end_pos = cursor.position() + data_size as u64; - let mut components_count = 0u32; - while cursor.position() < end_pos { - let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; - let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); - let mut ctx = - WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); - - // SAFETY: `rule_fns` and `component_fns` were created for the same type. - unsafe { - if new_entity { - component_fns.write( - &mut ctx, - rule_fns, - params.entity_markers, - &mut client_entity, - cursor, - )?; - } else { - component_fns.consume_or_write( - &mut ctx, - rule_fns, - params.entity_markers, - params.command_markers, - &mut client_entity, - cursor, - )?; - } - } + history.set(ago); + } - components_count += 1; + let end_pos = cursor.position() + data_size as u64; + let mut components_count = 0; + while cursor.position() < end_pos { + let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?; + let (component_id, component_fns, rule_fns) = params.registry.get(fns_id); + let mut ctx = WriteCtx::new(&mut commands, params.entity_map, component_id, message_tick); + + // SAFETY: `rule_fns` and `component_fns` were created for the same type. + unsafe { + if new_tick { + component_fns.write( + &mut ctx, + rule_fns, + params.entity_markers, + &mut client_entity, + cursor, + )?; + } else { + component_fns.consume_or_write( + &mut ctx, + rule_fns, + params.entity_markers, + params.command_markers, + &mut client_entity, + cursor, + )?; + } } - if let Some(stats) = &mut params.stats { - stats.entities_changed += 1; - stats.components_changed += components_count; - } + components_count += 1; + } - params.queue.apply(world); + if let Some(stats) = &mut params.stats { + stats.components_changed += components_count; } + params.queue.apply(world); + Ok(()) } @@ -514,10 +598,10 @@ fn apply_update_components( /// For details see /// [`ReplicationBuffer::write_entity`](crate::server::replication_message::replication_buffer::write_entity). fn deserialize_entity(cursor: &mut Cursor<&[u8]>) -> bincode::Result { - let flagged_index: u64 = cursor.read_u64_varint()?; + let flagged_index: u64 = cursor.read_varint()?; let has_generation = (flagged_index & 1) > 0; let generation = if has_generation { - cursor.read_u32_varint()? + 1 + cursor.read_varint::()? + 1 } else { 1u32 }; @@ -534,19 +618,11 @@ struct ReceiveParams<'a> { queue: &'a mut CommandQueue, entity_markers: &'a mut EntityMarkers, entity_map: &'a mut ServerEntityMap, - stats: Option<&'a mut ClientStats>, + stats: Option<&'a mut ClientReplicationStats>, command_markers: &'a CommandMarkers, registry: &'a ReplicationRegistry, } -/// Type of components replication. -/// -/// Parameter for [`apply_components`]. -enum ComponentsKind { - Insert, - Removal, -} - /// Set with replication and event systems related to client. #[derive(SystemSet, Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum ClientSet { @@ -562,6 +638,12 @@ pub enum ClientSet { /// /// Runs in [`PreUpdate`]. Receive, + /// Systems that populate Bevy's [`Diagnostics`](bevy::diagnostic::Diagnostics). + /// + /// Used by `bevy_replicon`. + /// + /// Runs in [`PreUpdate`]. + Diagnostics, /// Systems that synchronize hierarchy changes in [`ParentSync`](super::parent_sync::ParentSync). /// /// Used by `bevy_replicon`. @@ -602,45 +684,44 @@ pub enum ClientSet { Reset, } -/// Last received tick for init message from server. +/// Last received tick for change messages from the server. /// -/// In other words, last [`RepliconTick`] with a removal, insertion, spawn or despawn. -/// When a component changes, this value is not updated. +/// In other words, the last [`RepliconTick`] with a removal, insertion, spawn or despawn. +/// This value is not updated when mutation messages are received from the server. #[derive(Clone, Copy, Debug, Default, Deref, Resource)] -pub struct ServerInitTick(RepliconTick); +pub struct ServerChangeTick(RepliconTick); -/// All cached buffered updates, used by the replicon client to align replication updates with initialization -/// messages. +/// Cached buffered mutate messages, used to synchronize mutations with change messages. /// /// If [`ClientSet::Reset`] is disabled, then this needs to be cleaned up manually with [`Self::clear`]. #[derive(Default, Resource)] -pub struct BufferedUpdates(Vec); +pub struct BufferedMutations(Vec); -impl BufferedUpdates { +impl BufferedMutations { pub fn clear(&mut self) { self.0.clear(); } - /// Inserts a new update, maintaining sorting by their message tick in descending order. - fn insert(&mut self, update: BufferedUpdate) { + /// Inserts a new buffered message, maintaining sorting by their message tick in descending order. + fn insert(&mut self, mutation: BufferedMutate) { let index = self .0 - .partition_point(|other_update| update.message_tick < other_update.message_tick); - self.0.insert(index, update); + .partition_point(|other_mutation| mutation.message_tick < other_mutation.message_tick); + self.0.insert(index, mutation); } } -/// Caches a partially-deserialized entity update message that is waiting for its tick to appear in an init message. +/// Partially-deserialized mutate message that is waiting for its tick to appear in an change message. /// -/// See also [`crate::server::replication_messages::UpdateMessage`]. -pub(super) struct BufferedUpdate { +/// See also [`crate::server::replication_messages`]. +pub(super) struct BufferedMutate { /// Required tick to wait for. - init_tick: RepliconTick, + change_tick: RepliconTick, - /// The tick this update corresponds to. + /// The tick this mutations corresponds to. message_tick: RepliconTick, - /// Update data. + /// Mutations data. message: Bytes, } @@ -652,17 +733,17 @@ pub(super) struct BufferedUpdate { /// See also [`ClientDiagnosticsPlugin`](diagnostics::ClientDiagnosticsPlugin) /// for automatic integration with Bevy diagnostics. #[derive(Default, Resource, Debug)] -pub struct ClientStats { +pub struct ClientReplicationStats { /// Incremented per entity that changes. - pub entities_changed: u32, + pub entities_changed: usize, /// Incremented for every component that changes. - pub components_changed: u32, + pub components_changed: usize, /// Incremented per client mapping added. - pub mappings: u32, + pub mappings: usize, /// Incremented per entity despawn. - pub despawns: u32, + pub despawns: usize, /// Replication messages received. - pub messages: u32, + pub messages: usize, /// Replication bytes received in message payloads (without internal messaging plugin data). - pub bytes: u64, + pub bytes: usize, } diff --git a/src/client/confirm_history.rs b/src/client/confirm_history.rs index cbf44f0d..4b1268d9 100644 --- a/src/client/confirm_history.rs +++ b/src/client/confirm_history.rs @@ -90,10 +90,11 @@ impl ConfirmHistory { pub fn confirm(&mut self, tick: RepliconTick) { if tick > self.last_tick { self.set_last_tick(tick); - } - let ago = self.last_tick - tick; - if ago < u64::BITS { - self.set(ago); + } else { + let ago = self.last_tick - tick; + if ago < u64::BITS { + self.set(ago); + } } } @@ -195,11 +196,11 @@ mod tests { } #[test] - fn contains_any_with_set() { + fn contains_any_with_older() { let mut history = ConfirmHistory::new(RepliconTick::new(1)); assert_eq!(history.mask(), 0b1); - history.set(2); + history.confirm(RepliconTick::new(u32::MAX)); assert_eq!(history.mask(), 0b101); assert!(history.contains_any(RepliconTick::new(0), RepliconTick::new(1))); @@ -216,32 +217,32 @@ mod tests { } #[test] - fn set() { + fn confirm_newer() { let mut history = ConfirmHistory::new(RepliconTick::new(1)); + history.confirm(RepliconTick::new(2)); + assert_eq!(history.mask(), 0b11); - history.set(1); - - assert!(history.contains(RepliconTick::new(0))); + assert!(!history.contains(RepliconTick::new(0))); assert!(history.contains(RepliconTick::new(1))); - assert!(!history.contains(RepliconTick::new(2))); + assert!(history.contains(RepliconTick::new(2))); } #[test] - fn resize() { - let mut confirmed = ConfirmHistory::new(RepliconTick::new(1)); - - confirmed.set_last_tick(RepliconTick::new(2)); + fn confirm_older() { + let mut history = ConfirmHistory::new(RepliconTick::new(1)); + history.confirm(RepliconTick::new(0)); + assert_eq!(history.mask(), 0b11); - assert!(!confirmed.contains(RepliconTick::new(0))); - assert!(confirmed.contains(RepliconTick::new(1))); - assert!(confirmed.contains(RepliconTick::new(2))); + assert!(history.contains(RepliconTick::new(0))); + assert!(history.contains(RepliconTick::new(1))); + assert!(!history.contains(RepliconTick::new(2))); } #[test] - fn resize_to_same() { + fn confirm_same() { let mut history = ConfirmHistory::new(RepliconTick::new(1)); - - history.set_last_tick(RepliconTick::new(1)); + history.confirm(RepliconTick::new(1)); + assert_eq!(history.mask(), 0b1); assert!(!history.contains(RepliconTick::new(0))); assert!(history.contains(RepliconTick::new(1))); @@ -249,10 +250,10 @@ mod tests { } #[test] - fn resize_with_wrapping() { + fn confirm_with_wrapping() { let mut history = ConfirmHistory::new(RepliconTick::new(1)); - - history.set_last_tick(RepliconTick::new(u64::BITS + 1)); + history.confirm(RepliconTick::new(u64::BITS + 1)); + assert_eq!(history.mask(), 0b1); assert!(history.contains(RepliconTick::new(0))); assert!(history.contains(RepliconTick::new(1))); @@ -263,38 +264,14 @@ mod tests { } #[test] - fn resize_with_overflow() { + fn confirm_with_overflow() { let mut history = ConfirmHistory::new(RepliconTick::new(u32::MAX)); - - history.set_last_tick(RepliconTick::new(1)); + history.confirm(RepliconTick::new(1)); + assert_eq!(history.mask(), 0b101); assert!(!history.contains(RepliconTick::new(0))); assert!(history.contains(RepliconTick::new(1))); assert!(!history.contains(RepliconTick::new(3))); assert!(history.contains(RepliconTick::new(u32::MAX))); } - - #[test] - fn confirm_with_resize() { - let mut history = ConfirmHistory::new(RepliconTick::new(1)); - - history.confirm(RepliconTick::new(2)); - - assert!(!history.contains(RepliconTick::new(0))); - assert!(history.contains(RepliconTick::new(1))); - assert!(history.contains(RepliconTick::new(2))); - } - - #[test] - fn confirm_with_set() { - let mut history = ConfirmHistory::new(RepliconTick::new(1)); - assert_eq!(history.mask(), 0b1); - - history.confirm(RepliconTick::new(0)); - assert_eq!(history.mask(), 0b11); - - assert!(history.contains(RepliconTick::new(0))); - assert!(history.contains(RepliconTick::new(1))); - assert!(!history.contains(RepliconTick::new(2))); - } } diff --git a/src/client/diagnostics.rs b/src/client/diagnostics.rs index fd86076c..376707ef 100644 --- a/src/client/diagnostics.rs +++ b/src/client/diagnostics.rs @@ -2,83 +2,125 @@ use bevy::diagnostic::DiagnosticPath; use bevy::{ diagnostic::{Diagnostic, Diagnostics, RegisterDiagnostic}, prelude::*, - time::common_conditions::on_timer, }; -use std::time::Duration; -use super::ClientStats; +use super::{ClientReplicationStats, ClientSet}; +use crate::core::{common_conditions::client_connected, replicon_client::RepliconClient}; -/// Plugin to write [`Diagnostics`] based on [`ClientStats`] every second. +/// Plugin to write [`Diagnostics`] based on [`ClientReplicationStats`] every second. /// -/// Adds [`ClientStats`] resource and automatically resets it to get diagnostics per second. +/// Adds [`ClientReplicationStats`] resource. pub struct ClientDiagnosticsPlugin; impl Plugin for ClientDiagnosticsPlugin { fn build(&self, app: &mut App) { - app.init_resource::() + app.init_resource::() .add_systems( - Update, - Self::add_measurements.run_if(on_timer(Duration::from_secs(1))), + PreUpdate, + Self::add_measurements + .in_set(ClientSet::Diagnostics) + .run_if(client_connected), ) .register_diagnostic( - Diagnostic::new(Self::ENTITY_CHANGES) - .with_suffix("entities changed per second") + Diagnostic::new(Self::RTT) + .with_suffix(" ms") .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), ) .register_diagnostic( - Diagnostic::new(Self::COMPONENT_CHANGES) - .with_suffix("components changed per second") + Diagnostic::new(Self::PACKET_LOSS) + .with_suffix(" %") + .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), + ) + .register_diagnostic( + Diagnostic::new(Self::SENT_BPS) + .with_suffix(" byte/s") + .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), + ) + .register_diagnostic( + Diagnostic::new(Self::RECEIVED_BPS) + .with_suffix(" byte/s") + .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), + ) + .register_diagnostic( + Diagnostic::new(Self::ENTITIES_CHANGED) + .with_suffix(" entities changed") + .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), + ) + .register_diagnostic( + Diagnostic::new(Self::COMPONENTS_CHANGED) + .with_suffix(" components changed") .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), ) .register_diagnostic( Diagnostic::new(Self::MAPPINGS) - .with_suffix("mappings added per second") + .with_suffix(" mappings") .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), ) .register_diagnostic( Diagnostic::new(Self::DESPAWNS) - .with_suffix("despawns per second") + .with_suffix(" despawns") .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), ) .register_diagnostic( - Diagnostic::new(Self::MESSAGES) - .with_suffix("messages per second") + Diagnostic::new(Self::REPLICATION_MESSAGES) + .with_suffix(" replication messages") .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), ) .register_diagnostic( - Diagnostic::new(Self::BYTES) - .with_suffix("bytes per second") + Diagnostic::new(Self::REPLICATION_BYTES) + .with_suffix(" replication bytes") .with_max_history_length(Self::DIAGNOSTIC_HISTORY_LEN), ); } } impl ClientDiagnosticsPlugin { - /// How many entities modified per second by replication. - pub const ENTITY_CHANGES: DiagnosticPath = - DiagnosticPath::const_new("replication.client.entity_changes"); - /// How many components modified per second by replication. - pub const COMPONENT_CHANGES: DiagnosticPath = - DiagnosticPath::const_new("replication.client.component_changes"); - /// How many client-mappings added per second by replication. - pub const MAPPINGS: DiagnosticPath = DiagnosticPath::const_new("replication.client.mappings"); - /// How many despawns per second from replication. - pub const DESPAWNS: DiagnosticPath = DiagnosticPath::const_new("replication.client.despawns"); - /// How many replication messages processed per second. - pub const MESSAGES: DiagnosticPath = DiagnosticPath::const_new("replication.client.messages"); - /// How many bytes of replication messages payloads per second. - pub const BYTES: DiagnosticPath = DiagnosticPath::const_new("replication.client.bytes"); + /// Round-trip time. + pub const RTT: DiagnosticPath = DiagnosticPath::const_new("client/rtt"); + /// The percent of packet loss. + pub const PACKET_LOSS: DiagnosticPath = DiagnosticPath::const_new("client/packet_loss"); + /// How many messages sent per second. + pub const SENT_BPS: DiagnosticPath = DiagnosticPath::const_new("client/sent_bps"); + /// How many bytes received per second. + pub const RECEIVED_BPS: DiagnosticPath = DiagnosticPath::const_new("client/received_bps"); + + /// How many entities changed by replication. + pub const ENTITIES_CHANGED: DiagnosticPath = + DiagnosticPath::const_new("client/replication/entities_changed"); + /// How many components changed by replication. + pub const COMPONENTS_CHANGED: DiagnosticPath = + DiagnosticPath::const_new("client/replication/components_changed"); + /// How many client-mappings added by replication. + pub const MAPPINGS: DiagnosticPath = DiagnosticPath::const_new("client/replication/mappings"); + /// How many despawns applied by replication. + pub const DESPAWNS: DiagnosticPath = DiagnosticPath::const_new("client/replication/despawns"); + /// How many replication messages received. + pub const REPLICATION_MESSAGES: DiagnosticPath = + DiagnosticPath::const_new("client/replication/messages"); + /// How many replication bytes received. + pub const REPLICATION_BYTES: DiagnosticPath = + DiagnosticPath::const_new("client/replication/bytes"); /// Max diagnostic history length. pub const DIAGNOSTIC_HISTORY_LEN: usize = 60; - fn add_measurements(mut stats: ResMut, mut diagnostics: Diagnostics) { - diagnostics.add_measurement(&Self::ENTITY_CHANGES, || stats.entities_changed as f64); - diagnostics.add_measurement(&Self::COMPONENT_CHANGES, || stats.components_changed as f64); + fn add_measurements( + mut diagnostics: Diagnostics, + stats: Res, + client: Res, + ) { + diagnostics.add_measurement(&Self::RTT, || client.rtt()); + diagnostics.add_measurement(&Self::PACKET_LOSS, || client.packet_loss()); + diagnostics.add_measurement(&Self::SENT_BPS, || client.sent_bps()); + diagnostics.add_measurement(&Self::RECEIVED_BPS, || client.received_bps()); + + diagnostics.add_measurement(&Self::ENTITIES_CHANGED, || stats.entities_changed as f64); + diagnostics.add_measurement(&Self::COMPONENTS_CHANGED, || { + stats.components_changed as f64 + }); diagnostics.add_measurement(&Self::MAPPINGS, || stats.mappings as f64); diagnostics.add_measurement(&Self::DESPAWNS, || stats.despawns as f64); - diagnostics.add_measurement(&Self::BYTES, || stats.bytes as f64); - diagnostics.add_measurement(&Self::MESSAGES, || stats.messages as f64); - *stats = ClientStats::default(); + diagnostics.add_measurement(&Self::REPLICATION_MESSAGES, || stats.messages as f64); + diagnostics.add_measurement(&Self::REPLICATION_BYTES, || stats.bytes as f64); } } diff --git a/src/client/events.rs b/src/client/events.rs index d5d2dc09..ca98fcfc 100644 --- a/src/client/events.rs +++ b/src/client/events.rs @@ -1,8 +1,10 @@ -use super::{ClientPlugin, ClientSet, ServerInitTick}; +use super::{ClientPlugin, ClientSet, ServerChangeTick}; use crate::core::{ common_conditions::*, - ctx::{ClientReceiveCtx, ClientSendCtx}, - event_registry::EventRegistry, + event_registry::{ + ctx::{ClientReceiveCtx, ClientSendCtx}, + EventRegistry, + }, replicon_client::RepliconClient, server_entity_map::ServerEntityMap, }; @@ -83,7 +85,7 @@ impl ClientEventsPlugin { world.resource_scope(|world, registry: Mut| { world.resource_scope(|world, entity_map: Mut| { world.resource_scope(|world, event_registry: Mut| { - let init_tick = **world.resource::(); + let change_tick = **world.resource::(); let mut ctx = ClientReceiveCtx { registry: ®istry.read(), entity_map: &entity_map, @@ -110,7 +112,7 @@ impl ClientEventsPlugin { events.into_inner(), queue.into_inner(), &mut client, - init_tick, + change_tick, ) }; } diff --git a/src/core.rs b/src/core.rs index 7b980434..794d596b 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,13 +1,8 @@ pub mod channels; -pub mod command_markers; pub mod common_conditions; pub mod connected_clients; -pub mod ctx; -pub mod deferred_entity; pub mod event_registry; -pub mod replicated_clients; -pub mod replication_registry; -pub mod replication_rules; +pub mod replication; pub mod replicon_client; pub mod replicon_server; pub mod replicon_tick; @@ -17,10 +12,11 @@ use bevy::prelude::*; use serde::{Deserialize, Serialize}; use channels::RepliconChannels; -use command_markers::CommandMarkers; use event_registry::EventRegistry; -use replication_registry::ReplicationRegistry; -use replication_rules::ReplicationRules; +use replication::{ + command_markers::CommandMarkers, replication_registry::ReplicationRegistry, + replication_rules::ReplicationRules, Replicated, +}; pub struct RepliconCorePlugin; @@ -35,14 +31,6 @@ impl Plugin for RepliconCorePlugin { } } -#[deprecated(note = "use `Replicated` instead")] -pub type Replication = Replicated; - -/// Marks entity for replication. -#[derive(Component, Clone, Copy, Default, Reflect, Debug)] -#[reflect(Component)] -pub struct Replicated; - /// Unique client ID. /// /// Could be a client or a dual server-client. diff --git a/src/core/channels.rs b/src/core/channels.rs index 96c47d74..7a93d72a 100644 --- a/src/core/channels.rs +++ b/src/core/channels.rs @@ -10,18 +10,18 @@ pub enum ReplicationChannel { /// For sending messages with entity mappings, inserts, removals and despawns. /// /// This is an ordered reliable channel. - Init, - /// For sending messages with component updates. + Changes, + /// For sending messages with component mutations. /// /// This is an unreliable channel. - Update, + Mutations, } impl From for RepliconChannel { fn from(value: ReplicationChannel) -> Self { match value { - ReplicationChannel::Init => ChannelKind::Ordered.into(), - ReplicationChannel::Update => ChannelKind::Unreliable.into(), + ReplicationChannel::Changes => ChannelKind::Ordered.into(), + ReplicationChannel::Mutations => ChannelKind::Unreliable.into(), } } } @@ -53,12 +53,12 @@ impl Default for RepliconChannels { fn default() -> Self { Self { server: vec![ - ReplicationChannel::Init.into(), - ReplicationChannel::Update.into(), + ReplicationChannel::Changes.into(), + ReplicationChannel::Mutations.into(), ], client: vec![ - ReplicationChannel::Init.into(), - ReplicationChannel::Update.into(), + ReplicationChannel::Changes.into(), + ReplicationChannel::Mutations.into(), ], default_max_bytes: 5 * 1024 * 1024, } diff --git a/src/core/connected_clients.rs b/src/core/connected_clients.rs index c999f4d0..09738d5d 100644 --- a/src/core/connected_clients.rs +++ b/src/core/connected_clients.rs @@ -6,15 +6,15 @@ use crate::core::ClientId; /// /// Inserted as resource by [`ServerPlugin`](crate::server::ServerPlugin). /// -/// See also [ReplicatedClients](super::replicated_clients::ReplicatedClients). -#[derive(Resource, Default, Deref)] -pub struct ConnectedClients(Vec); +/// See also [ReplicatedClients](super::replication::replicated_clients::ReplicatedClients). +#[derive(Resource, Default, Debug, Deref)] +pub struct ConnectedClients(Vec); impl ConnectedClients { pub(crate) fn add(&mut self, client_id: ClientId) { debug!("adding connected `{client_id:?}`"); - self.0.push(client_id); + self.0.push(ConnectedClient::new(client_id)); } pub(crate) fn remove(&mut self, client_id: ClientId) { @@ -22,8 +22,110 @@ impl ConnectedClients { let index = self .iter() - .position(|test_id| *test_id == client_id) + .position(|client| client.id == client_id) .unwrap_or_else(|| panic!("{client_id:?} should be added before removal")); self.0.remove(index); } + + pub fn iter_mut(&mut self) -> impl Iterator { + self.0.iter_mut() + } +} + +#[derive(Debug, Clone, Copy)] +pub struct ConnectedClient { + id: ClientId, + rtt: f64, + packet_loss: f64, + sent_bps: f64, + received_bps: f64, +} + +impl ConnectedClient { + pub fn new(id: ClientId) -> Self { + Self { + id, + rtt: 0.0, + packet_loss: 0.0, + sent_bps: 0.0, + received_bps: 0.0, + } + } + + /// Returns the associated ID. + pub fn id(&self) -> ClientId { + self.id + } + + /// Returns the round-time trip for the connection. + /// + /// Returns zero if not provided by the backend. + pub fn rtt(&self) -> f64 { + self.rtt + } + + /// Sets the round-time trip for the connection. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
+ pub fn set_rtt(&mut self, rtt: f64) { + self.rtt = rtt; + } + + /// Returns the packet loss for the connection. + /// + /// Returns zero if not provided by the backend. + pub fn packet_loss(&self) -> f64 { + self.packet_loss + } + + /// Sets the packet loss for the connection. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
+ pub fn set_packet_loss(&mut self, packet_loss: f64) { + self.packet_loss = packet_loss; + } + + /// Returns the bytes sent per second for the connection. + /// + /// Returns zero if not provided by the backend. + pub fn sent_bps(&self) -> f64 { + self.sent_bps + } + + /// Sets the bytes sent per second for the connection. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
+ pub fn set_sent_bps(&mut self, sent_bps: f64) { + self.sent_bps = sent_bps; + } + + /// Returns the bytes received per second for the connection. + /// + /// Returns zero if not provided by the backend. + pub fn received_bps(&self) -> f64 { + self.received_bps + } + + /// Sets the bytes received per second for the connection. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
+ pub fn set_received_bps(&mut self, received_bps: f64) { + self.received_bps = received_bps; + } } diff --git a/src/core/deferred_entity.rs b/src/core/deferred_entity.rs deleted file mode 100644 index 546060f9..00000000 --- a/src/core/deferred_entity.rs +++ /dev/null @@ -1,32 +0,0 @@ -use bevy::{ecs::world::unsafe_world_cell::UnsafeWorldCell, prelude::*}; - -/// An entity reference that disallows structural ECS changes. -/// -/// Similar to [`EntityMut`], but additionally provides a read-only access to the world. -#[derive(Deref, DerefMut)] -pub struct DeferredEntity<'w> { - #[deref] - entity: EntityMut<'w>, - world: &'w World, -} - -impl<'w> DeferredEntity<'w> { - /// Creates a new instance from a world cell. - /// - /// # Safety - /// - /// - The cell must have been created using [`World::as_unsafe_world_cell`]. - /// - No structural ECS changes can be done using the cell. - /// - No other mutable references to the entity's components should exist. - pub(crate) unsafe fn new(world_cell: UnsafeWorldCell<'w>, entity: Entity) -> Self { - // Split access, `EntityMut` can't make structural changes and they share the lifetime. - let entity: EntityMut = world_cell.world_mut().entity_mut(entity).into(); - let world = world_cell.world(); - Self { entity, world } - } - - /// Gets read-only access to the world that the current entity belongs to. - pub fn world(&self) -> &World { - self.world - } -} diff --git a/src/core/event_registry.rs b/src/core/event_registry.rs index 503fc223..7fea67bd 100644 --- a/src/core/event_registry.rs +++ b/src/core/event_registry.rs @@ -1,4 +1,5 @@ pub(crate) mod client_event; +pub mod ctx; pub(crate) mod server_event; use bevy::{ecs::component::ComponentId, prelude::*}; diff --git a/src/core/event_registry/client_event.rs b/src/core/event_registry/client_event.rs index b3bc5a24..5f1d7e52 100644 --- a/src/core/event_registry/client_event.rs +++ b/src/core/event_registry/client_event.rs @@ -16,10 +16,12 @@ use bevy::{ use bincode::{DefaultOptions, Options}; use serde::{de::DeserializeOwned, Serialize}; -use super::EventRegistry; +use super::{ + ctx::{ClientSendCtx, ServerReceiveCtx}, + EventRegistry, +}; use crate::core::{ channels::{RepliconChannel, RepliconChannels}, - ctx::{ClientSendCtx, ServerReceiveCtx}, replicon_client::RepliconClient, replicon_server::RepliconServer, ClientId, @@ -77,7 +79,7 @@ pub trait ClientEventAppExt { reflect::serde::{ReflectSerializer, ReflectDeserializer}, }; use bevy_replicon::{ - core::ctx::{ClientSendCtx, ServerReceiveCtx}, + core::event_registry::ctx::{ClientSendCtx, ServerReceiveCtx}, prelude::*, }; use bincode::{DefaultOptions, Options}; diff --git a/src/core/event_registry/ctx.rs b/src/core/event_registry/ctx.rs new file mode 100644 index 00000000..17e990b6 --- /dev/null +++ b/src/core/event_registry/ctx.rs @@ -0,0 +1,63 @@ +use bevy::{prelude::*, reflect::TypeRegistry}; + +use crate::core::server_entity_map::ServerEntityMap; + +/// Event sending context for client. +#[non_exhaustive] +pub struct ClientSendCtx<'a> { + /// Registry of reflected types. + pub registry: &'a TypeRegistry, + + /// Maps server entities to client entities and vice versa. + pub entity_map: &'a ServerEntityMap, +} + +impl EntityMapper for ClientSendCtx<'_> { + fn map_entity(&mut self, entity: Entity) -> Entity { + *self + .entity_map + .to_server() + .get(&entity) + .unwrap_or_else(|| panic!("client {entity:?} should have a mapping")) + } +} + +/// Event receiving context for server. +#[non_exhaustive] +pub struct ServerReceiveCtx<'a> { + /// Registry of reflected types. + pub registry: &'a TypeRegistry, +} + +/// Event sending context for server. +#[non_exhaustive] +pub struct ServerSendCtx<'a> { + /// Registry of reflected types. + pub registry: &'a TypeRegistry, +} + +/// Event receiving context for client. +#[non_exhaustive] +pub struct ClientReceiveCtx<'a> { + /// Registry of reflected types. + pub registry: &'a TypeRegistry, + + /// Maps server entities to client entities and vice versa. + pub entity_map: &'a ServerEntityMap, + + /// Entities that couldn't be mapped by [`EntityMapper::map_entity`]. + /// + /// We needed it because [`EntityMapper`] doesn't provide a way to handle errors. + pub(crate) invalid_entities: Vec, +} + +impl EntityMapper for ClientReceiveCtx<'_> { + fn map_entity(&mut self, entity: Entity) -> Entity { + if let Some(mapped_entity) = self.entity_map.to_client().get(&entity) { + *mapped_entity + } else { + self.invalid_entities.push(entity); + Entity::PLACEHOLDER + } + } +} diff --git a/src/core/event_registry/server_event.rs b/src/core/event_registry/server_event.rs index 326e129a..9ea4957b 100644 --- a/src/core/event_registry/server_event.rs +++ b/src/core/event_registry/server_event.rs @@ -19,13 +19,15 @@ use bytes::Bytes; use ordered_multimap::ListOrderedMultimap; use serde::{de::DeserializeOwned, Serialize}; -use super::EventRegistry; +use super::{ + ctx::{ClientReceiveCtx, ServerSendCtx}, + EventRegistry, +}; use crate::{ core::{ channels::{RepliconChannel, RepliconChannels}, connected_clients::ConnectedClients, - ctx::{ClientReceiveCtx, ServerSendCtx}, - replicated_clients::ReplicatedClients, + replication::replicated_clients::ReplicatedClients, replicon_client::RepliconClient, replicon_server::RepliconServer, replicon_tick::RepliconTick, @@ -84,7 +86,7 @@ pub trait ServerEventAppExt { reflect::serde::{ReflectSerializer, ReflectDeserializer}, }; use bevy_replicon::{ - core::ctx::{ClientReceiveCtx, ServerSendCtx}, + core::event_registry::ctx::{ClientReceiveCtx, ServerSendCtx}, prelude::*, }; use bincode::{DefaultOptions, Options}; @@ -338,9 +340,9 @@ impl ServerEvent { events: PtrMut, queue: PtrMut, client: &mut RepliconClient, - init_tick: RepliconTick, + change_tick: RepliconTick, ) { - (self.receive)(self, ctx, events, queue, client, init_tick); + (self.receive)(self, ctx, events, queue, client, change_tick); } /// Drains events [`ToClients`] and re-emits them as `E` if the server is in the list of the event recipients. @@ -492,12 +494,12 @@ unsafe fn receive( events: PtrMut, queue: PtrMut, client: &mut RepliconClient, - init_tick: RepliconTick, + change_tick: RepliconTick, ) { let events: &mut Events = events.deref_mut(); let queue: &mut ServerEventQueue = queue.deref_mut(); - while let Some((tick, message)) = queue.pop_if_le(init_tick) { + while let Some((tick, message)) = queue.pop_if_le(change_tick) { let mut cursor = Cursor::new(&*message); match event_data.deserialize(ctx, &mut cursor) { Ok(event) => { @@ -527,7 +529,7 @@ unsafe fn receive( continue; } }; - if tick > init_tick { + if tick > change_tick { trace!("queuing event `{}` with `{tick:?}`", any::type_name::()); queue.insert(tick, message.slice(cursor.position() as usize..)); continue; @@ -635,14 +637,14 @@ unsafe fn send_independent_event( match *mode { SendMode::Broadcast => { - for &client_id in connected_clients.iter() { - server.send(client_id, event_data.channel_id, message.clone()); + for client in connected_clients.iter() { + server.send(client.id(), event_data.channel_id, message.clone()); } } SendMode::BroadcastExcept(id) => { - for &client_id in connected_clients.iter() { - if client_id != id { - server.send(client_id, event_data.channel_id, message.clone()); + for client in connected_clients.iter() { + if client.id() != id { + server.send(client.id(), event_data.channel_id, message.clone()); } } } @@ -697,19 +699,19 @@ enum SerializedMessage { } impl SerializedMessage { - /// Optimized to avoid reallocations when clients have the same init tick as other clients receiving the + /// Optimized to avoid reallocations when clients have the same change tick as other clients receiving the /// same message. - fn get_bytes(&mut self, init_tick: RepliconTick) -> bincode::Result { + fn get_bytes(&mut self, change_tick: RepliconTick) -> bincode::Result { match self { // Resolve the raw value into a message with serialized tick. Self::Raw(raw) => { let mut bytes = std::mem::take(raw); - let tick_size = DefaultOptions::new().serialized_size(&init_tick)? as usize; + let tick_size = DefaultOptions::new().serialized_size(&change_tick)? as usize; let padding = RepliconTick::MAX_SERIALIZED_SIZE - tick_size; - DefaultOptions::new().serialize_into(&mut bytes[padding..], &init_tick)?; + DefaultOptions::new().serialize_into(&mut bytes[padding..], &change_tick)?; let bytes = Bytes::from(bytes).slice(padding..); *self = Self::Resolved { - tick: init_tick, + tick: change_tick, tick_size, bytes: bytes.clone(), }; @@ -721,13 +723,13 @@ impl SerializedMessage { tick_size, bytes, } => { - if *tick == init_tick { + if *tick == change_tick { return Ok(bytes.clone()); } - let new_tick_size = DefaultOptions::new().serialized_size(&init_tick)? as usize; + let new_tick_size = DefaultOptions::new().serialized_size(&change_tick)? as usize; let mut new_bytes = Vec::with_capacity(new_tick_size + bytes.len() - *tick_size); - DefaultOptions::new().serialize_into(&mut new_bytes, &init_tick)?; + DefaultOptions::new().serialize_into(&mut new_bytes, &change_tick)?; new_bytes.extend_from_slice(&bytes[*tick_size..]); Ok(new_bytes.into()) } @@ -747,7 +749,7 @@ impl BufferedServerEvent { server: &mut RepliconServer, client: &ReplicatedClient, ) -> bincode::Result<()> { - let message = self.message.get_bytes(client.init_tick())?; + let message = self.message.get_bytes(client.change_tick())?; server.send(client.id(), self.channel, message); Ok(()) } @@ -767,10 +769,10 @@ impl BufferedServerEventSet { } } -/// Caches synchronization-dependent server events until they can be sent with an accurate init tick. +/// Caches synchronization-dependent server events until they can be sent with an accurate change tick. /// /// This exists because replication does not scan the world every tick. If a server event is sent in the same -/// tick as a spawn and the event references that spawn, then the server event's init tick needs to be synchronized +/// tick as a spawn and the event references that spawn, then the server event's change tick needs to be synchronized /// with that spawn on the client. We buffer the event until the spawn can be detected. #[derive(Resource, Default)] pub(crate) struct BufferedServerEvents { @@ -888,9 +890,9 @@ struct ServerEventQueue { impl ServerEventQueue { /// Pops the next event that is at least as old as the specified replicon tick. - fn pop_if_le(&mut self, init_tick: RepliconTick) -> Option<(RepliconTick, Bytes)> { + fn pop_if_le(&mut self, change_tick: RepliconTick) -> Option<(RepliconTick, Bytes)> { let (tick, _) = self.list.front()?; - if *tick > init_tick { + if *tick > change_tick { return None; } self.list diff --git a/src/core/replication.rs b/src/core/replication.rs new file mode 100644 index 00000000..a657f8dd --- /dev/null +++ b/src/core/replication.rs @@ -0,0 +1,16 @@ +pub mod change_message_flags; +pub mod command_markers; +pub mod deferred_entity; +pub mod replicated_clients; +pub mod replication_registry; +pub mod replication_rules; + +use bevy::prelude::*; + +#[deprecated(note = "use `Replicated` instead")] +pub type Replication = Replicated; + +/// Marks entity for replication. +#[derive(Component, Clone, Copy, Default, Reflect, Debug)] +#[reflect(Component)] +pub struct Replicated; diff --git a/src/core/replication/change_message_flags.rs b/src/core/replication/change_message_flags.rs new file mode 100644 index 00000000..834bda3a --- /dev/null +++ b/src/core/replication/change_message_flags.rs @@ -0,0 +1,48 @@ +use bitflags::bitflags; + +bitflags! { + /// Types of data included in the change message if the bit is set. + /// + /// Serialized at the beginning of the message. + #[derive(Default, Clone, Copy, PartialEq, Eq, Debug)] + pub(crate) struct ChangeMessageFlags: u8 { + const MAPPINGS = 0b00000001; + const DESPAWNS = 0b00000010; + const REMOVALS = 0b00000100; + const CHANGES = 0b00001000; + } +} + +impl ChangeMessageFlags { + /// Returns the last set flag in the message. + pub(crate) fn last(self) -> ChangeMessageFlags { + debug_assert!(!self.is_empty()); + let zeroes = u8::BITS - 1 - self.bits().leading_zeros(); + ChangeMessageFlags::from_bits_retain(1 << zeroes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn last() { + assert_eq!( + ChangeMessageFlags::CHANGES.last(), + ChangeMessageFlags::CHANGES + ); + assert_eq!( + ChangeMessageFlags::MAPPINGS.last(), + ChangeMessageFlags::MAPPINGS + ); + assert_eq!( + ChangeMessageFlags::all().last(), + ChangeMessageFlags::CHANGES + ); + assert_eq!( + (ChangeMessageFlags::DESPAWNS | ChangeMessageFlags::REMOVALS).last(), + ChangeMessageFlags::REMOVALS + ); + } +} diff --git a/src/core/command_markers.rs b/src/core/replication/command_markers.rs similarity index 92% rename from src/core/command_markers.rs rename to src/core/replication/command_markers.rs index d51d229a..dc855832 100644 --- a/src/core/command_markers.rs +++ b/src/core/replication/command_markers.rs @@ -2,8 +2,10 @@ use std::cmp::Reverse; use bevy::{ecs::component::ComponentId, prelude::*}; -use super::replication_registry::command_fns::{RemoveFn, WriteFn}; -use crate::core::replication_registry::ReplicationRegistry; +use super::replication_registry::{ + command_fns::{RemoveFn, WriteFn}, + ReplicationRegistry, +}; /// Marker-based functions for [`App`]. /// @@ -50,10 +52,14 @@ pub trait AppMarkerExt { use bevy::{ecs::system::EntityCommands, prelude::*, utils::HashMap}; use bevy_replicon::{ core::{ - command_markers::MarkerConfig, - deferred_entity::DeferredEntity, - ctx::{RemoveCtx, WriteCtx}, - replication_registry::rule_fns::RuleFns, + replication::{ + command_markers::MarkerConfig, + deferred_entity::DeferredEntity, + replication_registry::{ + ctx::{RemoveCtx, WriteCtx}, + rule_fns::RuleFns, + }, + }, replicon_tick::RepliconTick, }, prelude::*, @@ -174,7 +180,7 @@ impl CommandMarkers { /// /// May invalidate previously returned [`CommandMarkerIndex`] due to sorting. /// - /// Use [`ReplicationFns::register_marker`] to register a slot for command functions for this marker. + /// Use [`ReplicationRegistry::register_marker`] to register a slot for command functions for this marker. fn insert(&mut self, marker: CommandMarker) -> CommandMarkerIndex { let key = Reverse(marker.config.priority); let index = self @@ -225,11 +231,11 @@ pub struct MarkerConfig { /// By default set to `0`. pub priority: usize, - /// Represents whether a marker needs to process old updates. + /// Represents whether a marker needs to process old mutations. /// - /// Since updates use [`ChannelKind::Unreliable`](crate::core::channels::ChannelKind), - /// a client may receive an older update for an entity. By default these updates are discarded, - /// but some markers may need them. If this field is set to `true`, old component updates will + /// Since mutations use [`ChannelKind::Unreliable`](crate::core::channels::ChannelKind), + /// a client may receive an older mutation for an entity component. By default these mutations are discarded, + /// but some markers may need them. If this field is set to `true`, old component mutations will /// be passed to the writing function for this marker. /// /// By default set to `false`. @@ -295,7 +301,7 @@ mod tests { use serde::{Deserialize, Serialize}; use super::*; - use crate::core::replication_registry::{command_fns, ReplicationRegistry}; + use crate::core::replication::replication_registry::command_fns; #[test] #[should_panic] diff --git a/src/core/replication/deferred_entity.rs b/src/core/replication/deferred_entity.rs new file mode 100644 index 00000000..cf7b0f14 --- /dev/null +++ b/src/core/replication/deferred_entity.rs @@ -0,0 +1,33 @@ +use bevy::{ecs::world::CommandQueue, prelude::*}; + +/// An entity reference that disallows structural ECS changes. +/// +/// Similar to [`EntityMut`], but additionally provides a read-only access to the world. +#[derive(Deref, DerefMut)] +pub struct DeferredEntity<'w> { + #[deref] + entity: EntityMut<'w>, + world: &'w World, +} + +impl<'w> DeferredEntity<'w> { + pub(crate) fn new(world: &'w mut World, entity: Entity) -> Self { + let world_cell = world.as_unsafe_world_cell(); + // SAFETY: access split, `EntityMut` cannot make structural ECS changes, + // and the world cannot be accessed simultaneously with the entity. + unsafe { + let entity: EntityMut = world_cell.world_mut().entity_mut(entity).into(); + let world = world_cell.world(); + Self { entity, world } + } + } + + pub(crate) fn commands<'s>(&self, queue: &'s mut CommandQueue) -> Commands<'w, 's> { + Commands::new_from_entities(queue, self.world.entities()) + } + + /// Gets read-only access to the world that the current entity belongs to. + pub fn world(&self) -> &World { + self.world + } +} diff --git a/src/core/replicated_clients.rs b/src/core/replication/replicated_clients.rs similarity index 76% rename from src/core/replicated_clients.rs rename to src/core/replication/replicated_clients.rs index f9addca8..1f80a46d 100644 --- a/src/core/replicated_clients.rs +++ b/src/core/replication/replicated_clients.rs @@ -16,7 +16,7 @@ use client_visibility::ClientVisibility; /// /// Inserted as resource by [`ServerPlugin`](crate::server::ServerPlugin). /// -/// See also [ConnectedClients](super::connected_clients::ConnectedClients). +/// See also [ConnectedClients](crate::core::connected_clients::ConnectedClients). #[derive(Resource, Default)] pub struct ReplicatedClients { clients: Vec, @@ -172,7 +172,7 @@ pub struct ReplicatedClient { id: ClientId, /// Lowest tick for use in change detection for each entity. - change_ticks: EntityHashMap, + mutation_ticks: EntityHashMap, /// Entity visibility settings. visibility: ClientVisibility, @@ -180,28 +180,28 @@ pub struct ReplicatedClient { /// The last tick in which a replicated entity had an insertion, removal, or gained/lost a component from the /// perspective of the client. /// - /// It should be included in update messages and server events to avoid needless waiting for the next init + /// It should be included in mutate messages and server events to avoid needless waiting for the next change /// message to arrive. - init_tick: RepliconTick, + change_tick: RepliconTick, - /// Update message indexes mapped to their info. - updates: HashMap, + /// Mutate message indices mapped to their info. + mutations: HashMap, - /// Index for the next update message to be sent to this client. + /// Index for the next mutate message to be sent to this client. /// - /// See also [`Self::register_update`]. - next_update_index: u16, + /// See also [`Self::register_mutate_message`]. + next_mutate_index: u16, } impl ReplicatedClient { fn new(id: ClientId, policy: VisibilityPolicy) -> Self { Self { id, - change_ticks: Default::default(), + mutation_ticks: Default::default(), visibility: ClientVisibility::new(policy), - init_tick: Default::default(), - updates: Default::default(), - next_update_index: Default::default(), + change_tick: Default::default(), + mutations: Default::default(), + next_mutate_index: Default::default(), } } @@ -220,24 +220,24 @@ impl ReplicatedClient { &mut self.visibility } - /// Sets the client's init tick. - pub(crate) fn set_init_tick(&mut self, tick: RepliconTick) { - self.init_tick = tick; + /// Sets the client's change tick. + pub(crate) fn set_change_tick(&mut self, tick: RepliconTick) { + self.change_tick = tick; } /// Returns the last tick in which a replicated entity had an insertion, removal, or gained/lost a component from the /// perspective of the client. - pub fn init_tick(&self) -> RepliconTick { - self.init_tick + pub fn change_tick(&self) -> RepliconTick { + self.change_tick } - /// Clears all entities for unacknowledged updates, returning them as an iterator. + /// Clears all entities for unacknowledged mutate messages, returning them as an iterator. /// /// Keeps the allocated memory for reuse. fn drain_entities(&mut self) -> impl Iterator> + '_ { - self.updates + self.mutations .drain() - .map(|(_, update_info)| update_info.entities) + .map(|(_, mutate_info)| mutate_info.entities) } /// Resets all data. @@ -246,98 +246,98 @@ impl ReplicatedClient { fn reset(&mut self, id: ClientId) { self.id = id; self.visibility.clear(); - self.change_ticks.clear(); - self.updates.clear(); - self.next_update_index = 0; + self.mutation_ticks.clear(); + self.mutations.clear(); + self.next_mutate_index = 0; } - /// Registers update at specified `tick` and `timestamp` and returns its index with entities to fill. + /// Registers mutate message at specified `tick` and `timestamp` and returns its index with entities to fill. /// /// Used later to acknowledge updated entities. #[must_use] - pub(crate) fn register_update( + pub(crate) fn register_mutate_message( &mut self, client_buffers: &mut ClientBuffers, tick: Tick, timestamp: Duration, ) -> (u16, &mut Vec) { - let update_index = self.next_update_index; - self.next_update_index = self.next_update_index.overflowing_add(1).0; + let mutate_index = self.next_mutate_index; + self.next_mutate_index = self.next_mutate_index.overflowing_add(1).0; let mut entities = client_buffers.entities.pop().unwrap_or_default(); entities.clear(); - let update_info = UpdateInfo { + let mutate_info = MutateInfo { tick, timestamp, entities, }; - let update_info = self - .updates - .entry(update_index) - .insert(update_info) + let mutate_info = self + .mutations + .entry(mutate_index) + .insert(mutate_info) .into_mut(); - (update_index, &mut update_info.entities) + (mutate_index, &mut mutate_info.entities) } /// Sets the change tick for an entity that is replicated to this client. /// /// The change tick is the reference point for determining if components on an entity have changed and /// need to be replicated. Component changes older than the change limit are assumed to be acked by the client. - pub(crate) fn set_change_tick(&mut self, entity: Entity, tick: Tick) { - self.change_ticks.insert(entity, tick); + pub(crate) fn set_mutation_tick(&mut self, entity: Entity, tick: Tick) { + self.mutation_ticks.insert(entity, tick); } /// Gets the change tick for an entity that is replicated to this client. - pub fn get_change_tick(&mut self, entity: Entity) -> Option { - self.change_ticks.get(&entity).copied() + pub fn mutation_tick(&self, entity: Entity) -> Option { + self.mutation_ticks.get(&entity).copied() } - /// Marks update with the specified index as acknowledged. + /// Marks mutate message as acknowledged by its index. /// - /// Change limits for all entities from this update will be set to the update's tick if it's higher. + /// Change tick for all entities from this mutate message will be set to the message tick if it's higher. /// /// Keeps allocated memory in the buffers for reuse. - pub(crate) fn acknowledge( + pub(crate) fn ack_mutate_message( &mut self, client_buffers: &mut ClientBuffers, tick: Tick, - update_index: u16, + mutate_index: u16, ) { - let Some(update_info) = self.updates.remove(&update_index) else { + let Some(mutate_info) = self.mutations.remove(&mutate_index) else { debug!( - "received unknown update index {update_index} from {:?}", + "received unknown mutate index {mutate_index} from {:?}", self.id ); return; }; - for entity in &update_info.entities { - let Some(last_tick) = self.change_ticks.get_mut(entity) else { + for entity in &mutate_info.entities { + let Some(last_tick) = self.mutation_ticks.get_mut(entity) else { // We ignore missing entities, since they were probably despawned. continue; }; // Received tick could be outdated because we bump it // if we detect any insertion on the entity in `collect_changes`. - if !last_tick.is_newer_than(update_info.tick, tick) { - *last_tick = update_info.tick; + if !last_tick.is_newer_than(mutate_info.tick, tick) { + *last_tick = mutate_info.tick; } } - client_buffers.entities.push(update_info.entities); + client_buffers.entities.push(mutate_info.entities); trace!( - "{:?} acknowledged an update with {:?}", + "{:?} acknowledged mutate message with {:?}", self.id, - update_info.tick, + mutate_info.tick, ); } /// Removes a despawned entity tracked by this client. pub fn remove_despawned(&mut self, entity: Entity) { - self.change_ticks.remove(&entity); + self.mutation_ticks.remove(&entity); self.visibility.remove_despawned(entity); - // We don't clean up `self.updates` for efficiency reasons. + // We don't clean up `self.mutations` for efficiency reasons. // `Self::acknowledge()` will properly ignore despawned entities. } @@ -345,24 +345,24 @@ impl ReplicatedClient { /// /// Internal cleanup happens lazily during the iteration. pub(crate) fn drain_lost_visibility(&mut self) -> impl Iterator + '_ { - self.visibility.drain_lost_visibility().inspect(|entity| { - self.change_ticks.remove(entity); + self.visibility.drain_lost().inspect(|entity| { + self.mutation_ticks.remove(entity); }) } - /// Removes all updates older then `min_timestamp`. + /// Removes all mutate messages older then `min_timestamp`. /// /// Keeps allocated memory in the buffers for reuse. - pub(crate) fn remove_older_updates( + pub(crate) fn cleanup_older_mutations( &mut self, client_buffers: &mut ClientBuffers, min_timestamp: Duration, ) { - self.updates.retain(|_, update_info| { - if update_info.timestamp < min_timestamp { + self.mutations.retain(|_, mutate_info| { + if mutate_info.timestamp < min_timestamp { client_buffers .entities - .push(mem::take(&mut update_info.entities)); + .push(mem::take(&mut mutate_info.entities)); false } else { true @@ -379,13 +379,13 @@ pub(crate) struct ClientBuffers { /// Stored to reuse allocated memory. clients: Vec, - /// [`Vec`]'s from acknowledged update indexes from [`ReplicatedClient`]. + /// [`Vec`]'s from acknowledged [`MutateInfo`]'s. /// /// Stored to reuse allocated capacity. entities: Vec>, } -struct UpdateInfo { +struct MutateInfo { tick: Tick, timestamp: Duration, entities: Vec, diff --git a/src/core/replicated_clients/client_visibility.rs b/src/core/replication/replicated_clients/client_visibility.rs similarity index 95% rename from src/core/replicated_clients/client_visibility.rs rename to src/core/replication/replicated_clients/client_visibility.rs index 7242e73e..02f30968 100644 --- a/src/core/replicated_clients/client_visibility.rs +++ b/src/core/replication/replicated_clients/client_visibility.rs @@ -9,11 +9,6 @@ use super::VisibilityPolicy; /// Entity visibility settings for a client. pub struct ClientVisibility { filter: VisibilityFilter, - - /// Visibility for a specific entity that has been cached for re-referencing. - /// - /// Used as an optimization by server replication. - cached_visibility: Visibility, } impl ClientVisibility { @@ -36,10 +31,7 @@ impl ClientVisibility { /// Creates a new instance with a specific filter. fn with_filter(filter: VisibilityFilter) -> Self { - Self { - filter, - cached_visibility: Default::default(), - } + Self { filter } } /// Resets the filter state to as it was after [`Self::new`]. @@ -129,7 +121,7 @@ impl ClientVisibility { } /// Drains all entities for which visibility was lost during this tick. - pub(super) fn drain_lost_visibility(&mut self) -> impl Iterator + '_ { + pub(super) fn drain_lost(&mut self) -> impl Iterator + '_ { match &mut self.filter { VisibilityFilter::All { .. } => VisibilityLostIter::AllVisible, VisibilityFilter::Blacklist { added, .. } => VisibilityLostIter::Lost(added.drain()), @@ -177,7 +169,7 @@ impl ClientVisibility { // For blacklisting an entity we don't remove the entity right away. // Instead we mark it as queued for removal and remove it // later in `Self::update`. This allows us to avoid accessing - // the blacklist's `removed` field in `Self::get_visibility_state`. + // the blacklist's `removed` field in `Self::visibility_state`. entry.insert(BlacklistInfo::QueuedForRemoval); removed.insert(entity); } else { @@ -200,7 +192,7 @@ impl ClientVisibility { // Instead we mark it as `WhitelistInfo::JustAdded` and then set it to // 'WhitelistInfo::Visible' in `Self::update`. // This allows us to avoid accessing the whitelist's `added` field in - // `Self::get_visibility_state`. + // `Self::visibility_state`. if *list.entry(entity).or_insert(WhitelistInfo::JustAdded) == WhitelistInfo::JustAdded { @@ -227,26 +219,14 @@ impl ClientVisibility { /// Checks if a specific entity is visible. pub fn is_visible(&self, entity: Entity) -> bool { - match self.get_visibility_state(entity) { + match self.state(entity) { Visibility::Hidden => false, Visibility::Gained | Visibility::Visible => true, } } - /// Caches visibility for a specific entity. - /// - /// Can be obtained later from [`Self::cached_visibility`]. - pub(crate) fn cache_visibility(&mut self, entity: Entity) { - self.cached_visibility = self.get_visibility_state(entity); - } - - /// Returns visibility cached by the last call of [`Self::cache_visibility`]. - pub(crate) fn cached_visibility(&self) -> Visibility { - self.cached_visibility - } - /// Returns visibility of a specific entity. - fn get_visibility_state(&self, entity: Entity) -> Visibility { + pub(crate) fn state(&self, entity: Entity) -> Visibility { match &self.filter { VisibilityFilter::All => Visibility::Visible, VisibilityFilter::Blacklist { list, .. } => match list.get(&entity) { diff --git a/src/core/replication_registry.rs b/src/core/replication/replication_registry.rs similarity index 91% rename from src/core/replication_registry.rs rename to src/core/replication/replication_registry.rs index c53230a7..9ab2f620 100644 --- a/src/core/replication_registry.rs +++ b/src/core/replication/replication_registry.rs @@ -1,14 +1,16 @@ pub mod command_fns; pub mod component_fns; +pub mod ctx; pub mod rule_fns; pub mod test_fns; use bevy::{ecs::component::ComponentId, prelude::*}; use serde::{Deserialize, Serialize}; -use super::{command_markers::CommandMarkerIndex, ctx::DespawnCtx}; +use super::command_markers::CommandMarkerIndex; use command_fns::{RemoveFn, UntypedCommandFns, WriteFn}; use component_fns::ComponentFns; +use ctx::DespawnCtx; use rule_fns::{RuleFns, UntypedRuleFns}; /// Stores configurable replication functions. @@ -103,14 +105,11 @@ impl ReplicationRegistry { &mut self, world: &mut World, rule_fns: RuleFns, - ) -> FnsInfo { + ) -> (ComponentId, FnsId) { let (index, component_id) = self.init_component_fns::(world); self.rules.push((rule_fns.into(), index)); - FnsInfo { - component_id, - fns_id: FnsId(self.rules.len() - 1), - } + (component_id, FnsId(self.rules.len() - 1)) } /// Initializes [`ComponentFns`] for a component and returns its index and ID. @@ -159,31 +158,12 @@ impl Default for ReplicationRegistry { } } -#[deprecated(note = "use `Replicated` instead")] +#[deprecated(note = "use `ReplicationRegistry` instead")] pub type ReplicationFns = ReplicationRegistry; -/// IDs of a registered replication function and its component. -/// -/// Can be obtained from [`ReplicationFns::register_rule_fns`]. -#[derive(Clone, Copy)] -pub struct FnsInfo { - component_id: ComponentId, - fns_id: FnsId, -} - -impl FnsInfo { - pub(crate) fn component_id(&self) -> ComponentId { - self.component_id - } - - pub(crate) fn fns_id(&self) -> FnsId { - self.fns_id - } -} - /// ID of replicaton functions for a component. /// -/// Can be obtained from [`ReplicationFns::register_rule_fns`]. +/// Can be obtained from [`ReplicationRegistry::register_rule_fns`]. #[derive(Clone, Copy, Deserialize, Eq, Hash, PartialEq, Serialize)] pub struct FnsId(usize); diff --git a/src/core/replication_registry/command_fns.rs b/src/core/replication/replication_registry/command_fns.rs similarity index 97% rename from src/core/replication_registry/command_fns.rs rename to src/core/replication/replication_registry/command_fns.rs index 3875216b..05816605 100644 --- a/src/core/replication_registry/command_fns.rs +++ b/src/core/replication/replication_registry/command_fns.rs @@ -6,11 +6,11 @@ use std::{ use bevy::prelude::*; -use super::rule_fns::RuleFns; -use crate::core::{ +use super::{ ctx::{RemoveCtx, WriteCtx}, - deferred_entity::DeferredEntity, + rule_fns::RuleFns, }; +use crate::core::replication::deferred_entity::DeferredEntity; /// Writing and removal functions for a component, like [`Commands`]. #[derive(Clone, Copy)] diff --git a/src/core/replication_registry/component_fns.rs b/src/core/replication/replication_registry/component_fns.rs similarity index 95% rename from src/core/replication_registry/component_fns.rs rename to src/core/replication/replication_registry/component_fns.rs index 1b5d4447..a43f67ee 100644 --- a/src/core/replication_registry/component_fns.rs +++ b/src/core/replication/replication_registry/component_fns.rs @@ -2,10 +2,13 @@ use std::io::Cursor; use bevy::{prelude::*, ptr::Ptr}; -use super::{command_fns::UntypedCommandFns, rule_fns::UntypedRuleFns}; -use crate::core::{ - command_markers::{CommandMarkerIndex, CommandMarkers, EntityMarkers}, +use super::{ + command_fns::UntypedCommandFns, ctx::{RemoveCtx, SerializeCtx, WriteCtx}, + rule_fns::UntypedRuleFns, +}; +use crate::core::replication::{ + command_markers::{CommandMarkerIndex, CommandMarkers, EntityMarkers}, deferred_entity::DeferredEntity, }; @@ -85,9 +88,9 @@ impl ComponentFns { ctx: &SerializeCtx, rule_fns: &UntypedRuleFns, ptr: Ptr, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - (self.serialize)(ctx, rule_fns, ptr, cursor) + (self.serialize)(ctx, rule_fns, ptr, message) } /// Calls the assigned writing function based on entity markers. @@ -171,7 +174,7 @@ impl ComponentFns { /// Signature of component serialization functions that restore the original type. type UntypedSerializeFn = - unsafe fn(&SerializeCtx, &UntypedRuleFns, Ptr, &mut Cursor>) -> bincode::Result<()>; + unsafe fn(&SerializeCtx, &UntypedRuleFns, Ptr, &mut Vec) -> bincode::Result<()>; /// Signature of component writing functions that restore the original type. type UntypedWriteFn = unsafe fn( @@ -195,10 +198,10 @@ unsafe fn untyped_serialize( ctx: &SerializeCtx, rule_fns: &UntypedRuleFns, ptr: Ptr, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { let rule_fns = rule_fns.typed::(); - rule_fns.serialize(ctx, ptr.deref::(), cursor) + rule_fns.serialize(ctx, ptr.deref::(), message) } /// Resolves `rule_fns` to `C` and calls [`UntypedCommandFns::write`] for `C`. diff --git a/src/core/ctx.rs b/src/core/replication/replication_registry/ctx.rs similarity index 52% rename from src/core/ctx.rs rename to src/core/replication/replication_registry/ctx.rs index c518392d..d77d8f5c 100644 --- a/src/core/ctx.rs +++ b/src/core/replication/replication_registry/ctx.rs @@ -1,6 +1,8 @@ -use bevy::{ecs::component::ComponentId, prelude::*, reflect::TypeRegistry}; +use bevy::{ecs::component::ComponentId, prelude::*}; -use super::{replicon_tick::RepliconTick, server_entity_map::ServerEntityMap, Replicated}; +use crate::core::{ + replication::Replicated, replicon_tick::RepliconTick, server_entity_map::ServerEntityMap, +}; /// Replication context for serialization function. #[non_exhaustive] @@ -78,63 +80,3 @@ pub struct DespawnCtx { /// Tick for the currently processing message. pub message_tick: RepliconTick, } - -/// Event sending context for client. -#[non_exhaustive] -pub struct ClientSendCtx<'a> { - /// Registry of reflected types. - pub registry: &'a TypeRegistry, - - /// Maps server entities to client entities and vice versa. - pub entity_map: &'a ServerEntityMap, -} - -impl EntityMapper for ClientSendCtx<'_> { - fn map_entity(&mut self, entity: Entity) -> Entity { - *self - .entity_map - .to_server() - .get(&entity) - .unwrap_or_else(|| panic!("client {entity:?} should have a mapping")) - } -} - -/// Event receiving context for server. -#[non_exhaustive] -pub struct ServerReceiveCtx<'a> { - /// Registry of reflected types. - pub registry: &'a TypeRegistry, -} - -/// Event sending context for server. -#[non_exhaustive] -pub struct ServerSendCtx<'a> { - /// Registry of reflected types. - pub registry: &'a TypeRegistry, -} - -/// Event receiving context for client. -#[non_exhaustive] -pub struct ClientReceiveCtx<'a> { - /// Registry of reflected types. - pub registry: &'a TypeRegistry, - - /// Maps server entities to client entities and vice versa. - pub entity_map: &'a ServerEntityMap, - - /// Entities that couldn't be mapped by [`EntityMapper::map_entity`]. - /// - /// We needed it because [`EntityMapper`] doesn't provide a way to handle errors. - pub(crate) invalid_entities: Vec, -} - -impl EntityMapper for ClientReceiveCtx<'_> { - fn map_entity(&mut self, entity: Entity) -> Entity { - if let Some(mapped_entity) = self.entity_map.to_client().get(&entity) { - *mapped_entity - } else { - self.invalid_entities.push(entity); - Entity::PLACEHOLDER - } - } -} diff --git a/src/core/replication_registry/rule_fns.rs b/src/core/replication/replication_registry/rule_fns.rs similarity index 93% rename from src/core/replication_registry/rule_fns.rs rename to src/core/replication/replication_registry/rule_fns.rs index ada29c17..92805199 100644 --- a/src/core/replication_registry/rule_fns.rs +++ b/src/core/replication/replication_registry/rule_fns.rs @@ -8,11 +8,11 @@ use bevy::{ecs::entity::MapEntities, prelude::*}; use bincode::{DefaultOptions, Options}; use serde::{de::DeserializeOwned, Serialize}; -use crate::core::ctx::{SerializeCtx, WriteCtx}; +use super::ctx::{SerializeCtx, WriteCtx}; /// Type-erased version of [`RuleFns`]. /// -/// Stored inside [`ReplicationFns`](super::ReplicationFns) after registration. +/// Stored inside [`ReplicationRegistry`](super::ReplicationRegistry) after registration. pub(crate) struct UntypedRuleFns { type_id: TypeId, type_name: &'static str, @@ -71,8 +71,8 @@ impl From> for UntypedRuleFns { /// Serialization and deserialization functions for a component. /// -/// See also [`AppRuleExt`](crate::core::replication_rules::AppRuleExt) -/// and [`ReplicationRule`](crate::core::replication_rules::ReplicationRule). +/// See also [`AppRuleExt`](crate::core::replication::replication_rules::AppRuleExt) +/// and [`ReplicationRule`](crate::core::replication::replication_rules::ReplicationRule). pub struct RuleFns { serialize: SerializeFn, deserialize: DeserializeFn, @@ -113,7 +113,7 @@ impl RuleFns { /// If you want to ignore a component, just use its expected size to advance the cursor /// without deserializing (but be careful if the component is dynamically sized). /// - /// See [`MarkerConfig::need_history`](crate::core::command_markers::MarkerConfig::need_history) + /// See [`MarkerConfig::need_history`](crate::core::replication::command_markers::MarkerConfig::need_history) /// for details. pub fn with_consume(mut self, consume: ConsumeFn) -> Self { self.consume = consume; @@ -125,9 +125,9 @@ impl RuleFns { &self, ctx: &SerializeCtx, component: &C, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - (self.serialize)(ctx, component, cursor) + (self.serialize)(ctx, component, message) } /// Deserializes a component from a cursor. @@ -187,7 +187,7 @@ impl Default for RuleFns { } /// Signature of component serialization functions. -pub type SerializeFn = fn(&SerializeCtx, &C, &mut Cursor>) -> bincode::Result<()>; +pub type SerializeFn = fn(&SerializeCtx, &C, &mut Vec) -> bincode::Result<()>; /// Signature of component deserialization functions. pub type DeserializeFn = fn(&mut WriteCtx, &mut Cursor<&[u8]>) -> bincode::Result; @@ -204,7 +204,7 @@ pub type ConsumeFn = pub fn default_serialize( _ctx: &SerializeCtx, component: &C, - cursor: &mut Cursor>, + cursor: &mut Vec, ) -> bincode::Result<()> { DefaultOptions::new().serialize_into(cursor, component) } diff --git a/src/core/replication_registry/test_fns.rs b/src/core/replication/replication_registry/test_fns.rs similarity index 68% rename from src/core/replication_registry/test_fns.rs rename to src/core/replication/replication_registry/test_fns.rs index a0ad603d..6d198f7d 100644 --- a/src/core/replication_registry/test_fns.rs +++ b/src/core/replication/replication_registry/test_fns.rs @@ -2,17 +2,21 @@ use std::io::Cursor; use bevy::{ecs::world::CommandQueue, prelude::*}; -use super::{FnsInfo, ReplicationRegistry}; -use crate::core::{ - command_markers::{CommandMarkers, EntityMarkers}, +use super::{ ctx::{DespawnCtx, RemoveCtx, SerializeCtx, WriteCtx}, - deferred_entity::DeferredEntity, + FnsId, ReplicationRegistry, +}; +use crate::core::{ + replication::{ + command_markers::{CommandMarkers, EntityMarkers}, + deferred_entity::DeferredEntity, + }, replicon_tick::RepliconTick, server_entity_map::ServerEntityMap, }; /** -Extension for [`EntityWorldMut`] to call registered replication functions for [`FnsInfo`]. +Extension for [`EntityWorldMut`] to call registered replication functions for [`FnsId`]. See also [`ReplicationRegistry::register_rule_fns`]. @@ -24,7 +28,7 @@ This example shows how to call registered functions on an entity: use bevy::prelude::*; use bevy_replicon::{ core::{ - replication_registry::{ + replication::replication_registry::{ rule_fns::RuleFns, test_fns::TestFnsEntityExt, ReplicationRegistry, }, replicon_tick::RepliconTick, @@ -38,21 +42,21 @@ app.add_plugins((MinimalPlugins, RepliconPlugins)); let tick = RepliconTick::default(); -// Register rule functions manually to obtain `FnsInfo`. -let fns_info = app +// Register rule functions manually to obtain `FnsId`. +let (_, fns_id) = app .world_mut() .resource_scope(|world, mut registry: Mut| { registry.register_rule_fns(world, RuleFns::::default()) }); let mut entity = app.world_mut().spawn(DummyComponent); -let data = entity.serialize(fns_info, tick); +let data = entity.serialize(fns_id, tick); entity.remove::(); -entity.apply_write(&data, fns_info, tick); +entity.apply_write(&data, fns_id, tick); assert!(entity.contains::()); -entity.apply_remove(fns_info, tick); +entity.apply_remove(fns_id, tick); assert!(!entity.contains::()); entity.apply_despawn(tick); @@ -67,60 +71,50 @@ pub trait TestFnsEntityExt { /// /// See also [`ReplicationRegistry::register_rule_fns`]. #[must_use] - fn serialize(&mut self, fns_info: FnsInfo, server_tick: RepliconTick) -> Vec; + fn serialize(&mut self, fns_id: FnsId, server_tick: RepliconTick) -> Vec; /// Deserializes a component using a registered function for it and /// writes it into an entity using a write function based on markers. /// - /// See also [`AppMarkerExt`](crate::core::command_markers::AppMarkerExt). - fn apply_write( - &mut self, - data: &[u8], - fns_info: FnsInfo, - message_tick: RepliconTick, - ) -> &mut Self; + /// See also [`AppMarkerExt`](crate::core::replication::command_markers::AppMarkerExt). + fn apply_write(&mut self, data: &[u8], fns_id: FnsId, message_tick: RepliconTick) -> &mut Self; /// Remvoes a component using a registered function for it. /// - /// See also [`AppMarkerExt`](crate::core::command_markers::AppMarkerExt). - fn apply_remove(&mut self, fns_info: FnsInfo, message_tick: RepliconTick) -> &mut Self; + /// See also [`AppMarkerExt`](crate::core::replication::command_markers::AppMarkerExt). + fn apply_remove(&mut self, fns_id: FnsId, message_tick: RepliconTick) -> &mut Self; /// Despawns an entity using [`ReplicationRegistry::despawn`]. fn apply_despawn(self, message_tick: RepliconTick); } impl TestFnsEntityExt for EntityWorldMut<'_> { - fn serialize(&mut self, fns_info: FnsInfo, server_tick: RepliconTick) -> Vec { + fn serialize(&mut self, fns_id: FnsId, server_tick: RepliconTick) -> Vec { let registry = self.world().resource::(); - let (component_id, component_fns, rule_fns) = registry.get(fns_info.fns_id()); - let mut cursor = Cursor::default(); + let (component_id, component_fns, rule_fns) = registry.get(fns_id); + let mut message = Vec::new(); let ctx = SerializeCtx { server_tick, component_id, }; - let ptr = self.get_by_id(fns_info.component_id()).unwrap_or_else(|_| { + let ptr = self.get_by_id(component_id).unwrap_or_else(|_| { let components = self.world().components(); let component_name = components - .get_name(fns_info.component_id()) + .get_name(component_id) .expect("function should require valid component ID"); panic!("serialization function require entity to have {component_name}"); }); unsafe { component_fns - .serialize(&ctx, rule_fns, ptr, &mut cursor) + .serialize(&ctx, rule_fns, ptr, &mut message) .expect("serialization into memory should never fail"); } - cursor.into_inner() + message } - fn apply_write( - &mut self, - data: &[u8], - fns_info: FnsInfo, - message_tick: RepliconTick, - ) -> &mut Self { + fn apply_write(&mut self, data: &[u8], fns_id: FnsId, message_tick: RepliconTick) -> &mut Self { let mut entity_markers = self.world_scope(EntityMarkers::from_world); let command_markers = self.world().resource::(); entity_markers.read(command_markers, &*self); @@ -129,14 +123,11 @@ impl TestFnsEntityExt for EntityWorldMut<'_> { self.world_scope(|world| { world.resource_scope(|world, mut entity_map: Mut| { world.resource_scope(|world, registry: Mut| { - let world_cell = world.as_unsafe_world_cell(); - // SAFETY: have write access and the cell used only to get entities. - let mut entity = unsafe { DeferredEntity::new(world_cell, entity) }; let mut queue = CommandQueue::default(); - let mut commands = - Commands::new_from_entities(&mut queue, world_cell.entities()); + let mut entity = DeferredEntity::new(world, entity); + let mut commands = entity.commands(&mut queue); - let (component_id, component_fns, rule_fns) = registry.get(fns_info.fns_id()); + let (component_id, component_fns, rule_fns) = registry.get(fns_id); let mut cursor = Cursor::new(data); let mut ctx = WriteCtx::new(&mut commands, &mut entity_map, component_id, message_tick); @@ -161,7 +152,7 @@ impl TestFnsEntityExt for EntityWorldMut<'_> { self } - fn apply_remove(&mut self, fns_info: FnsInfo, message_tick: RepliconTick) -> &mut Self { + fn apply_remove(&mut self, fns_id: FnsId, message_tick: RepliconTick) -> &mut Self { let mut entity_markers = self.world_scope(EntityMarkers::from_world); let command_markers = self.world().resource::(); entity_markers.read(command_markers, &*self); @@ -169,13 +160,11 @@ impl TestFnsEntityExt for EntityWorldMut<'_> { let entity = self.id(); self.world_scope(|world| { world.resource_scope(|world, registry: Mut| { - let world_cell = world.as_unsafe_world_cell(); - // SAFETY: have write access and the cell used only to get entities. - let mut entity = unsafe { DeferredEntity::new(world_cell, entity) }; let mut queue = CommandQueue::default(); - let mut commands = Commands::new_from_entities(&mut queue, world_cell.entities()); + let mut entity = DeferredEntity::new(world, entity); + let mut commands = entity.commands(&mut queue); - let (component_id, component_fns, _) = registry.get(fns_info.fns_id()); + let (component_id, component_fns, _) = registry.get(fns_id); let mut ctx = RemoveCtx { commands: &mut commands, message_tick, diff --git a/src/core/replication_rules.rs b/src/core/replication/replication_rules.rs similarity index 91% rename from src/core/replication_rules.rs rename to src/core/replication/replication_rules.rs index 927a9498..7a4f3307 100644 --- a/src/core/replication_rules.rs +++ b/src/core/replication/replication_rules.rs @@ -7,7 +7,7 @@ use bevy::{ }; use serde::{de::DeserializeOwned, Serialize}; -use super::replication_registry::{rule_fns::RuleFns, FnsInfo, ReplicationRegistry}; +use super::replication_registry::{rule_fns::RuleFns, FnsId, ReplicationRegistry}; /// Replication functions for [`App`]. pub trait AppRuleExt { @@ -80,9 +80,9 @@ pub trait AppRuleExt { use bevy::prelude::*; use bevy_replicon::{ - core::{ + core::replication::replication_registry::{ ctx::{SerializeCtx, WriteCtx}, - replication_registry::rule_fns::RuleFns, + rule_fns::RuleFns, }, prelude::*, }; @@ -98,9 +98,9 @@ pub trait AppRuleExt { fn serialize_translation( _ctx: &SerializeCtx, transform: &Transform, - cursor: &mut Cursor>, + message: &mut Vec, ) -> bincode::Result<()> { - bincode::serialize_into(cursor, &transform.translation) + bincode::serialize_into(message, &transform.translation) } /// Deserializes `translation` and creates [`Transform`] from it. @@ -195,7 +195,7 @@ impl AppRuleExt for App { /// All registered rules for components replication. #[derive(Default, Deref, Resource)] -pub(crate) struct ReplicationRules(Vec); +pub struct ReplicationRules(Vec); impl ReplicationRules { /// Inserts a new rule, maintaining sorting by their priority in descending order. @@ -217,12 +217,12 @@ pub struct ReplicationRule { pub priority: usize, /// Rule components and their serialization/deserialization/removal functions. - pub components: Vec, + pub components: Vec<(ComponentId, FnsId)>, } impl ReplicationRule { /// Creates a new rule with priority equal to the number of serializable components. - pub fn new(components: Vec) -> Self { + pub fn new(components: Vec<(ComponentId, FnsId)>) -> Self { Self { priority: components.len(), components, @@ -233,7 +233,7 @@ impl ReplicationRule { pub(crate) fn matches(&self, archetype: &Archetype) -> bool { self.components .iter() - .all(|fns_info| archetype.contains(fns_info.component_id())) + .all(|&(component_id, _)| archetype.contains(component_id)) } /// Determines whether the rule is applicable to an archetype with removals included and contains at least one removal. @@ -248,10 +248,10 @@ impl ReplicationRule { removed_components: &HashSet, ) -> bool { let mut matches = false; - for fns_info in &self.components { - if removed_components.contains(&fns_info.component_id()) { + for &(component_id, _) in &self.components { + if removed_components.contains(&component_id) { matches = true; - } else if !post_removal_archetype.contains(fns_info.component_id()) { + } else if !post_removal_archetype.contains(component_id) { return false; } } @@ -272,9 +272,12 @@ use std::io::Cursor; use bevy::prelude::*; use bevy_replicon::{ - core::{ - ctx::{SerializeCtx, WriteCtx}, - replication_registry::{rule_fns::RuleFns, ReplicationFns}, + core::replication::{ + replication_registry::{ + ctx::{SerializeCtx, WriteCtx}, + rule_fns::RuleFns, + ReplicationRegistry, + }, replication_rules::{GroupReplication, ReplicationRule}, }, prelude::*, @@ -296,7 +299,7 @@ struct PlayerBundle { struct Player; impl GroupReplication for PlayerBundle { - fn register(world: &mut World, registry: &mut ReplicationFns) -> ReplicationRule { + fn register(world: &mut World, registry: &mut ReplicationRegistry) -> ReplicationRule { // Customize serlialization to serialize only `translation`. let transform_info = registry.register_rule_fns( world, @@ -314,7 +317,7 @@ impl GroupReplication for PlayerBundle { } } -# fn serialize_translation(_: &SerializeCtx, _: &Transform, _: &mut Cursor>) -> bincode::Result<()> { unimplemented!() } +# fn serialize_translation(_: &SerializeCtx, _: &Transform, _: &mut Vec) -> bincode::Result<()> { unimplemented!() } # fn deserialize_translation(_: &mut WriteCtx, _: &mut Cursor<&[u8]>) -> bincode::Result { unimplemented!() } ``` **/ @@ -347,7 +350,7 @@ mod tests { use serde::{Deserialize, Serialize}; use super::*; - use crate::{core::replication_registry::ReplicationRegistry, AppRuleExt}; + use crate::AppRuleExt; #[test] fn sorting() { diff --git a/src/core/replicon_client.rs b/src/core/replicon_client.rs index dc4b32ba..72205554 100644 --- a/src/core/replicon_client.rs +++ b/src/core/replicon_client.rs @@ -29,6 +29,11 @@ pub struct RepliconClient { /// List of sent messages and their channels since the last tick. sent_messages: Vec<(u8, Bytes)>, + + rtt: f64, + packet_loss: f64, + sent_bps: f64, + received_bps: f64, } impl RepliconClient { @@ -53,6 +58,12 @@ impl RepliconClient { /// Receives all available messages from the server over a channel. /// /// All messages will be drained. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
pub fn receive>(&mut self, channel_id: I) -> impl Iterator + '_ { if !self.is_connected() { // We can't return here because we need to return an empty iterator. @@ -74,6 +85,12 @@ impl RepliconClient { } /// Sends a message to the server over a channel. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
pub fn send, B: Into>(&mut self, channel_id: I, message: B) { if !self.is_connected() { warn!("trying to send a message when the client is not connected"); @@ -90,9 +107,14 @@ impl RepliconClient { /// Sets the client connection status. /// - /// Should be called only from the messaging backend when the client status changes. /// Discards all messages if the state changes from [`RepliconClientStatus::Connected`]. /// See also [`Self::status`]. + /// + ///
+ /// + /// Should only be called from the messaging backend when the client status changes. + /// + ///
pub fn set_status(&mut self, status: RepliconClientStatus) { debug!("changing `RepliconClient` status to `{status:?}`"); @@ -101,6 +123,11 @@ impl RepliconClient { channel_messages.clear(); } self.sent_messages.clear(); + + self.rtt = 0.0; + self.packet_loss = 0.0; + self.sent_bps = 0.0; + self.received_bps = 0.0; } self.status = status; @@ -153,14 +180,22 @@ impl RepliconClient { /// Removes all sent messages, returning them as an iterator with channel. /// - /// Should be called only from the messaging backend. + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
pub fn drain_sent(&mut self) -> impl Iterator + '_ { self.sent_messages.drain(..) } /// Adds a message from the server to the list of received messages. /// - /// Should be called only from the messaging backend. + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
pub fn insert_received, B: Into>(&mut self, channel_id: I, message: B) { if !self.is_connected() { warn!("trying to insert a received message when the client is not connected"); @@ -175,6 +210,78 @@ impl RepliconClient { channel_messages.push(message.into()); } + + /// Returns the round-time trip for the connection. + /// + /// Returns zero if not provided by the backend. + pub fn rtt(&self) -> f64 { + self.rtt + } + + /// Sets the round-time trip for the connection. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
+ pub fn set_rtt(&mut self, rtt: f64) { + self.rtt = rtt; + } + + /// Returns the packet loss for the connection. + /// + /// Returns zero if not provided by the backend. + pub fn packet_loss(&self) -> f64 { + self.packet_loss + } + + /// Sets the packet loss for the connection. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
+ pub fn set_packet_loss(&mut self, packet_loss: f64) { + self.packet_loss = packet_loss; + } + + /// Returns the bytes sent per second for the connection. + /// + /// Returns zero if not provided by the backend. + pub fn sent_bps(&self) -> f64 { + self.sent_bps + } + + /// Sets the bytes sent per second for the connection. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
+ pub fn set_sent_bps(&mut self, sent_bps: f64) { + self.sent_bps = sent_bps; + } + + /// Returns the bytes received per second for the connection. + /// + /// Returns zero if not provided by the backend. + pub fn received_bps(&self) -> f64 { + self.received_bps + } + + /// Sets the bytes received per second for the connection. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
+ pub fn set_received_bps(&mut self, received_bps: f64) { + self.received_bps = received_bps; + } } /// Connection status of the [`RepliconClient`]. diff --git a/src/core/replicon_server.rs b/src/core/replicon_server.rs index fda865ea..d07ab063 100644 --- a/src/core/replicon_server.rs +++ b/src/core/replicon_server.rs @@ -48,6 +48,12 @@ impl RepliconServer { /// Receives all available messages from clients over a channel. /// /// All messages will be drained. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
pub fn receive>( &mut self, channel_id: I, @@ -72,6 +78,12 @@ impl RepliconServer { } /// Sends a message to a client over a channel. + /// + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
pub fn send, B: Into>( &mut self, client_id: ClientId, @@ -93,7 +105,11 @@ impl RepliconServer { /// Marks the server as running or stopped. /// - /// Should be called only from the messaging backend when the server changes its state. + ///
+ /// + /// Should only be called from the messaging backend when the server changes its state. + /// + ///
pub fn set_running(&mut self, running: bool) { debug!("changing `RepliconServer` running status to `{running}`"); @@ -125,14 +141,22 @@ impl RepliconServer { /// Removes all sent messages, returning them as an iterator with client ID and channel. /// - /// Should be called only from the messaging backend. + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
pub fn drain_sent(&mut self) -> impl Iterator + '_ { self.sent_messages.drain(..) } /// Adds a message from a client to the list of received messages. /// - /// Should be called only from the messaging backend. + ///
+ /// + /// Should only be called from the messaging backend. + /// + ///
pub fn insert_received, B: Into>( &mut self, client_id: ClientId, diff --git a/src/core/replicon_tick.rs b/src/core/replicon_tick.rs index 86a5524e..446e2612 100644 --- a/src/core/replicon_tick.rs +++ b/src/core/replicon_tick.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; /// /// All operations on it are wrapping. /// -/// See also [`ServerInitTick`](crate::client::ServerInitTick) and +/// See also [`ServerChangeTick`](crate::client::ServerChangeTick) and /// [`ServerTick`](crate::server::server_tick::ServerTick). #[derive(Clone, Copy, Debug, Default, Deserialize, Eq, Hash, PartialEq, Serialize)] pub struct RepliconTick(u32); diff --git a/src/lib.rs b/src/lib.rs index fdba1701..2153f6b9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -579,27 +579,18 @@ For a higher level API consider using [`bevy_replicon_attributes`](https://docs. All events, inserts, removals and despawns will be applied to clients in the same order as on the server. -Entity component updates are grouped by entity, and component groupings may be applied to clients in a different order than on the server. -For example, if two entities are spawned in tick 1 on the server and their components are updated in tick 2, -then the client is guaranteed to see the spawns at the same time, but the component updates may appear in different client ticks. +Entity component mutations are grouped by entity, and component groupings may be applied to clients in a different order than on the server. +For example, if two entities are spawned in tick 1 on the server and their components are mutated in tick 2, +then the client is guaranteed to see the spawns at the same time, but the component mutations may appear in different client ticks. -If a component is dependent on other data, updates to the component will only be applied to the client when that data has arrived. -So if your component references another entity, updates to that component will only be applied when the referenced entity has been spawned on the client. +If a component is dependent on other data, mutations to the component will only be applied to the client when that data has arrived. +So if your component references another entity, mutations to that component will only be applied when the referenced entity has been spawned on the client. -Updates for despawned entities will be discarded automatically, but events or components may reference despawned entities and should be handled with that in mind. +Mutations for despawned entities will be discarded automatically, but events or components may reference despawned entities and should be handled with that in mind. Clients should never assume their world state is the same as the server's on any given tick value-wise. World state on the client is only "eventually consistent" with the server's. -# Limits - -To reduce packet size there are the following limits per replication update: - -- Up to [`u16::MAX`] entities that have added components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that have changed components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that have removed components with up to [`u16::MAX`] bytes of component data. -- Up to [`u16::MAX`] entities that were despawned. - # Troubleshooting If you face any issue, try to enable logging to see what is going on. @@ -633,32 +624,37 @@ pub mod test_app; pub mod prelude { #[allow(deprecated)] - pub use super::core::Replication; + pub use super::core::replication::Replication; pub use super::{ core::{ channels::{ChannelKind, RepliconChannel, RepliconChannels}, - command_markers::AppMarkerExt, common_conditions::*, connected_clients::ConnectedClients, event_registry::{ client_event::{ClientEventAppExt, FromClient}, server_event::{SendMode, ServerEventAppExt, ToClients}, }, - replicated_clients::{ - client_visibility::ClientVisibility, ReplicatedClient, ReplicatedClients, - VisibilityPolicy, + replication::{ + command_markers::AppMarkerExt, + replicated_clients::{ + client_visibility::ClientVisibility, ReplicatedClient, ReplicatedClients, + VisibilityPolicy, + }, + replication_rules::AppRuleExt, + Replicated, }, - replication_rules::AppRuleExt, replicon_client::{RepliconClient, RepliconClientStatus}, replicon_server::RepliconServer, - ClientId, Replicated, RepliconCorePlugin, + ClientId, RepliconCorePlugin, }, RepliconPlugins, }; #[cfg(feature = "client")] - pub use super::client::{events::ClientEventsPlugin, ClientPlugin, ClientSet, ClientStats}; + pub use super::client::{ + events::ClientEventsPlugin, ClientPlugin, ClientReplicationStats, ClientSet, + }; #[cfg(feature = "server")] pub use super::server::{ diff --git a/src/parent_sync.rs b/src/parent_sync.rs index d78f1a12..e5ea0e9c 100644 --- a/src/parent_sync.rs +++ b/src/parent_sync.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; #[cfg(feature = "client")] use crate::client::ClientSet; -use crate::core::{common_conditions::*, replication_rules::AppRuleExt}; +use crate::core::{common_conditions::*, replication::replication_rules::AppRuleExt}; #[cfg(feature = "server")] use crate::server::ServerSet; diff --git a/src/scene.rs b/src/scene.rs index b53a1e9b..756e1d49 100644 --- a/src/scene.rs +++ b/src/scene.rs @@ -1,6 +1,6 @@ use bevy::{ecs::entity::EntityHashMap, prelude::*, scene::DynamicEntity}; -use crate::{core::replication_rules::ReplicationRules, Replicated}; +use crate::{core::replication::replication_rules::ReplicationRules, Replicated}; /** Fills scene with all replicated entities and their components. @@ -73,7 +73,7 @@ pub fn replicate_into(scene: &mut DynamicScene, world: &World) { for component_id in rule .components .iter() - .map(|fns_info| fns_info.component_id()) + .map(|&(component_id, _)| component_id) { // SAFETY: replication rules can be registered only with valid component IDs. let replicated_component = diff --git a/src/server.rs b/src/server.rs index 4107a1a2..623b826c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,13 +6,12 @@ pub(super) mod replicated_archetypes; pub(super) mod replication_messages; pub mod server_tick; -use std::{io::Cursor, mem, time::Duration}; +use std::{io::Cursor, mem, ops::Range, time::Duration}; use bevy::{ ecs::{ archetype::ArchetypeEntity, component::{ComponentId, ComponentTicks, StorageType}, - entity::EntityHashSet, storage::{SparseSets, Table}, system::SystemChangeTick, }, @@ -25,13 +24,17 @@ use crate::core::{ channels::{ReplicationChannel, RepliconChannels}, common_conditions::{server_just_stopped, server_running}, connected_clients::ConnectedClients, - ctx::SerializeCtx, event_registry::server_event::BufferedServerEvents, - replicated_clients::{ - client_visibility::Visibility, ClientBuffers, ReplicatedClients, VisibilityPolicy, + replication::{ + replicated_clients::{ + client_visibility::Visibility, ClientBuffers, ReplicatedClients, VisibilityPolicy, + }, + replication_registry::{ + component_fns::ComponentFns, ctx::SerializeCtx, rule_fns::UntypedRuleFns, + ReplicationRegistry, + }, + replication_rules::ReplicationRules, }, - replication_registry::ReplicationRegistry, - replication_rules::ReplicationRules, replicon_server::RepliconServer, replicon_tick::RepliconTick, ClientId, @@ -39,8 +42,8 @@ use crate::core::{ use client_entity_map::ClientEntityMap; use despawn_buffer::{DespawnBuffer, DespawnBufferPlugin}; use removal_buffer::{RemovalBuffer, RemovalBufferPlugin}; -use replicated_archetypes::ReplicatedArchetypes; -use replication_messages::ReplicationMessages; +use replicated_archetypes::{ReplicatedArchetypes, ReplicatedComponent}; +use replication_messages::{serialized_data::SerializedData, ReplicationMessages}; use server_tick::ServerTick; pub struct ServerPlugin { @@ -50,10 +53,10 @@ pub struct ServerPlugin { /// Visibility configuration. pub visibility_policy: VisibilityPolicy, - /// The time after which updates will be considered lost if an acknowledgment is not received for them. + /// The time after which mutations will be considered lost if an acknowledgment is not received for them. /// - /// In practice updates will live at least `update_timeout`, and at most `2*update_timeout`. - pub update_timeout: Duration, + /// In practice mutations will live at least `mutations_timeout`, and at most `2*mutations_timeout`. + pub mutations_timeout: Duration, /// If enabled, replication will be started automatically after connection. /// @@ -70,7 +73,7 @@ impl Default for ServerPlugin { Self { tick_policy: TickPolicy::MaxTickRate(30), visibility_policy: Default::default(), - update_timeout: Duration::from_secs(10), + mutations_timeout: Duration::from_secs(10), replicate_after_connect: true, } } @@ -119,7 +122,8 @@ impl Plugin for ServerPlugin { Self::handle_connections, Self::enable_replication, Self::receive_acks, - Self::cleanup_acks(self.update_timeout).run_if(on_timer(self.update_timeout)), + Self::cleanup_acks(self.mutations_timeout) + .run_if(on_timer(self.mutations_timeout)), ) .chain() .in_set(ServerSet::Receive) @@ -212,14 +216,14 @@ impl ServerPlugin { } fn cleanup_acks( - update_timeout: Duration, + mutations_timeout: Duration, ) -> impl FnMut(ResMut, ResMut, Res