Skip to content

Commit

Permalink
Refactor replication to use a buffer cache per client (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shatur authored Sep 22, 2023
1 parent d640c60 commit 0c681da
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 291 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Reuse memory for serialization.
- Correctly handle old values on packet reordering.
- Bevy's `Tick` was replaced with dedicated type `NetworkTick` that increments on server update to provide information to client about time. `AckedTick` was replaced with `ServerTicks` that also contains mappings from `NetworkTick` to Bevy's `Tick` and current `NetworkTick`.
- Functions in `AppReplicationExt::replicate_with` now accept bytes cursor for memory reuse and return serialization errors.
Expand Down
116 changes: 108 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::io::Cursor;

use bevy::{
ecs::world::EntityMut,
prelude::*,
utils::{Entry, HashMap},
};
use bevy_renet::transport::client_connected;
use bevy_renet::{renet::Bytes, transport::client_connected};
use bevy_renet::{renet::RenetClient, transport::NetcodeClientPlugin, RenetClientPlugin};
use bincode::{DefaultOptions, Options};

use crate::{
replicon_core::{Mapper, NetworkTick, WorldDiff, REPLICATION_CHANNEL_ID},
replicon_core::{Mapper, NetworkTick, ReplicationRules, REPLICATION_CHANNEL_ID},
Replication,
};

Expand All @@ -29,6 +32,7 @@ impl Plugin for ClientPlugin {
.add_systems(
PreUpdate,
Self::diff_receiving_system
.pipe(unwrap)
.in_set(ClientSet::Receive)
.run_if(client_connected()),
)
Expand All @@ -45,13 +49,38 @@ impl Plugin for ClientPlugin {
}

impl ClientPlugin {
fn diff_receiving_system(world: &mut World) {
fn diff_receiving_system(world: &mut World) -> Result<(), bincode::Error> {
world.resource_scope(|world, mut client: Mut<RenetClient>| {
while let Some(message) = client.receive_message(REPLICATION_CHANNEL_ID) {
WorldDiff::deserialize_to_world(world, message)
.expect("server should send only valid world diffs");
}
});
world.resource_scope(|world, mut entity_map: Mut<NetworkEntityMap>| {
world.resource_scope(|world, replication_rules: Mut<ReplicationRules>| {
while let Some(message) = client.receive_message(REPLICATION_CHANNEL_ID) {
let mut cursor = Cursor::new(message);

if !deserialize_tick(&mut cursor, world)? {
continue;
}

deserialize_component_diffs(
&mut cursor,
world,
&mut entity_map,
&replication_rules,
DiffKind::Change,
)?;
deserialize_component_diffs(
&mut cursor,
world,
&mut entity_map,
&replication_rules,
DiffKind::Removal,
)?;
deserialize_despawns(&mut cursor, world, &mut entity_map)?;
}

Ok(())
})
})
})
}

fn ack_sending_system(last_tick: Res<LastTick>, mut client: ResMut<RenetClient>) {
Expand All @@ -66,6 +95,77 @@ impl ClientPlugin {
}
}

/// Deserializes server tick and applies it to [`LastTick`] if it is newer.
///
/// Returns true if [`LastTick`] has been updated.
fn deserialize_tick(cursor: &mut Cursor<Bytes>, world: &mut World) -> Result<bool, bincode::Error> {
let tick = bincode::deserialize_from(cursor)?;

let mut last_tick = world.resource_mut::<LastTick>();
if last_tick.0 < tick {
last_tick.0 = tick;
Ok(true)
} else {
Ok(false)
}
}

/// Deserializes component [`DiffKind`] and applies them to the [`World`].
fn deserialize_component_diffs(
cursor: &mut Cursor<Bytes>,
world: &mut World,
entity_map: &mut NetworkEntityMap,
replication_rules: &ReplicationRules,
diff_kind: DiffKind,
) -> Result<(), bincode::Error> {
let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..entities_count {
let entity = DefaultOptions::new().deserialize_from(&mut *cursor)?;
let mut entity = entity_map.get_by_server_or_spawn(world, entity);
let components_count: u8 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..components_count {
let replication_id = DefaultOptions::new().deserialize_from(&mut *cursor)?;
let replication_info = replication_rules.get_info(replication_id);
match diff_kind {
DiffKind::Change => {
(replication_info.deserialize)(&mut entity, entity_map, cursor)?
}
DiffKind::Removal => (replication_info.remove)(&mut entity),
}
}
}

Ok(())
}

/// Deserializes despawns and applies them to the [`World`].
fn deserialize_despawns(
cursor: &mut Cursor<Bytes>,
world: &mut World,
entity_map: &mut NetworkEntityMap,
) -> Result<(), bincode::Error> {
let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..entities_count {
// The entity might have already been deleted with the last diff,
// but the server might not yet have received confirmation from the
// client and could include the deletion in the latest diff.
let server_entity = DefaultOptions::new().deserialize_from(&mut *cursor)?;
if let Some(client_entity) = entity_map.remove_by_server(server_entity) {
world.entity_mut(client_entity).despawn_recursive();
}
}

Ok(())
}

/// Type of component change.
///
/// Parameter for [`deserialize_component_diffs`].
enum DiffKind {
Change,
Removal,
}

/// Last received tick from server.
///
/// Exists only on clients, sent to the server.
Expand Down
12 changes: 10 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ app.replicate_with::<Transform>(serialize_transform, deserialize_transform);
/// Serializes only translation.
fn serialize_transform(
component: Ptr,
cursor: &mut Cursor<&mut Vec<u8>>,
cursor: &mut Cursor<Vec<u8>>,
) -> Result<(), bincode::Error> {
// SAFETY: Function called for registered `ComponentId`.
let transform: &Transform = unsafe { component.deref() };
Expand Down Expand Up @@ -171,7 +171,7 @@ fn player_init_system(
#[derive(Component, Deserialize, Serialize)]
struct Player;
# fn serialize_transform(_: Ptr, _: &mut Cursor<&mut Vec<u8>>) -> Result<(), bincode::Error> { unimplemented!() }
# fn serialize_transform(_: Ptr, _: &mut Cursor<Vec<u8>>) -> Result<(), bincode::Error> { unimplemented!() }
# fn deserialize_transform(_: &mut EntityMut, _: &mut NetworkEntityMap, _: &mut Cursor<Bytes>) -> Result<(), bincode::Error> { unimplemented!() }
```
Expand Down Expand Up @@ -348,6 +348,14 @@ To check if you running server or client, you can use conditions based on
They rarely used for gameplay systems (since you write the same logic for
multiplayer and single-player!), but could be used for server
creation / connection systems and corresponding UI.
## Limits
To reduce packet size there are the following limits per replication update:
- Up to [`u16::MAX`] entities that have changed/added components with up to [`u8::MAX`] such components.
- Up to [`u16::MAX`] entities that have removed components with up to [`u8::MAX`] such components.
- Up to [`u16::MAX`] entities that were despawned.
*/

pub mod client;
Expand Down
131 changes: 3 additions & 128 deletions src/replicon_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use bevy::{
};
use bevy_renet::renet::{Bytes, ChannelConfig, SendType};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use strum::EnumDiscriminants;

use crate::client::{ClientMapper, LastTick, NetworkEntityMap};
use crate::client::{ClientMapper, NetworkEntityMap};

pub struct RepliconCorePlugin;

Expand Down Expand Up @@ -191,7 +190,7 @@ impl FromWorld for ReplicationRules {
}

/// Signature of serialization function stored in [`ReplicationInfo`].
pub type SerializeFn = fn(Ptr, &mut Cursor<&mut Vec<u8>>) -> Result<(), bincode::Error>;
pub type SerializeFn = fn(Ptr, &mut Cursor<Vec<u8>>) -> Result<(), bincode::Error>;

/// Signature of deserialization function stored in [`ReplicationInfo`].
pub type DeserializeFn =
Expand Down Expand Up @@ -230,7 +229,7 @@ pub struct ReplicationId(usize);
/// Default serialization function.
fn serialize_component<C: Component + Serialize>(
component: Ptr,
cursor: &mut Cursor<&mut Vec<u8>>,
cursor: &mut Cursor<Vec<u8>>,
) -> Result<(), bincode::Error> {
// SAFETY: Function called for registered `ComponentId`.
let component: &C = unsafe { component.deref() };
Expand Down Expand Up @@ -287,130 +286,6 @@ pub trait Mapper {
#[derive(Component, Clone, Copy)]
pub struct Replication;

/// Changed world data and current tick from server.
///
/// Sent from server to clients.
pub(super) struct WorldDiff<'a> {
pub(super) tick: NetworkTick,
pub(super) entities: HashMap<Entity, Vec<ComponentDiff<'a>>>,
pub(super) despawns: Vec<Entity>,
}

impl WorldDiff<'_> {
/// Creates a new [`WorldDiff`] with a tick and empty entities.
pub(super) fn new(tick: NetworkTick) -> Self {
Self {
tick,
entities: Default::default(),
despawns: Default::default(),
}
}

/// Serializes itself into a buffer.
///
/// We use custom implementation because serde impls require to use generics that can't be stored in [`ReplicationInfo`].
pub(super) fn serialize(
&self,
replication_rules: &ReplicationRules,
message: &mut Vec<u8>,
) -> Result<(), bincode::Error> {
let mut cursor = Cursor::new(message);

bincode::serialize_into(&mut cursor, &self.tick)?;

bincode::serialize_into(&mut cursor, &self.entities.len())?;
for (entity, components) in &self.entities {
bincode::serialize_into(&mut cursor, entity)?;
bincode::serialize_into(&mut cursor, &components.len())?;
for &component_diff in components {
bincode::serialize_into(&mut cursor, &ComponentDiffKind::from(component_diff))?;
match component_diff {
ComponentDiff::Changed((replication_id, ptr)) => {
bincode::serialize_into(&mut cursor, &replication_id)?;
let replication_info = replication_rules.get_info(replication_id);
(replication_info.serialize)(ptr, &mut cursor)?;
}
ComponentDiff::Removed(replication_id) => {
bincode::serialize_into(&mut cursor, &replication_id)?;
}
}
}
}

bincode::serialize_into(&mut cursor, &self.despawns)?;

Ok(())
}

/// Deserializes itself from bytes directly into the world by applying all changes.
///
/// Does nothing if world already received a more recent diff.
/// See also [`LastTick`].
pub(super) fn deserialize_to_world(
world: &mut World,
message: Bytes,
) -> Result<(), bincode::Error> {
let mut cursor = Cursor::new(message);

let tick = bincode::deserialize_from(&mut cursor)?;
let mut last_tick = world.resource_mut::<LastTick>();
if last_tick.0 >= tick {
return Ok(());
}
last_tick.0 = tick;

world.resource_scope(|world, replication_rules: Mut<ReplicationRules>| {
world.resource_scope(|world, mut entity_map: Mut<NetworkEntityMap>| {
let entities_count: usize = bincode::deserialize_from(&mut cursor)?;
for _ in 0..entities_count {
let entity = bincode::deserialize_from(&mut cursor)?;
let mut entity = entity_map.get_by_server_or_spawn(world, entity);
let components_count: usize = bincode::deserialize_from(&mut cursor)?;
for _ in 0..components_count {
let diff_kind = bincode::deserialize_from(&mut cursor)?;
let replication_id = bincode::deserialize_from(&mut cursor)?;
let replication_info = replication_rules.get_info(replication_id);
match diff_kind {
ComponentDiffKind::Changed => {
(replication_info.deserialize)(
&mut entity,
&mut entity_map,
&mut cursor,
)?;
}
ComponentDiffKind::Removed => {
(replication_info.remove)(&mut entity);
}
}
}
}

let despawns: Vec<Entity> = bincode::deserialize_from(&mut cursor)?;
for server_entity in despawns {
// The entity might have already been deleted with the last diff,
// but the server might not yet have received confirmation from the
// client and could include the deletion in the latest diff.
if let Some(client_entity) = entity_map.remove_by_server(server_entity) {
world.entity_mut(client_entity).despawn_recursive();
}
}

Ok(())
})
})
}
}

/// Type of component change.
#[derive(EnumDiscriminants, Clone, Copy)]
#[strum_discriminants(name(ComponentDiffKind), derive(Deserialize, Serialize))]
pub(super) enum ComponentDiff<'a> {
/// Indicates that a component was added or changed, contains its ID and pointer.
Changed((ReplicationId, Ptr<'a>)),
/// Indicates that a component was removed, contains its ID.
Removed(ReplicationId),
}

/// Corresponds to the number of server update.
///
/// See also [`crate::server::TickPolicy`].
Expand Down
Loading

0 comments on commit 0c681da

Please sign in to comment.