Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor replication to use a buffer cache per client #46

Merged
merged 8 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 105 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::io::Cursor;

use bevy::{
ecs::world::EntityMut,
prelude::*,
utils::{Entry, HashMap},
};
use bevy_renet::transport::client_connected;
use bevy_renet::{renet::Bytes, transport::client_connected};
use bevy_renet::{renet::RenetClient, transport::NetcodeClientPlugin, RenetClientPlugin};
use bincode::{DefaultOptions, Options};

use crate::{
replicon_core::{Mapper, NetworkTick, WorldDiff, REPLICATION_CHANNEL_ID},
replicon_core::{Mapper, NetworkTick, ReplicationRules, REPLICATION_CHANNEL_ID},
Replication,
};

Expand All @@ -29,6 +32,7 @@ impl Plugin for ClientPlugin {
.add_systems(
PreUpdate,
Self::diff_receiving_system
.pipe(unwrap)
.in_set(ClientSet::Receive)
.run_if(client_connected()),
)
Expand All @@ -45,13 +49,38 @@ impl Plugin for ClientPlugin {
}

impl ClientPlugin {
fn diff_receiving_system(world: &mut World) {
fn diff_receiving_system(world: &mut World) -> Result<(), bincode::Error> {
world.resource_scope(|world, mut client: Mut<RenetClient>| {
while let Some(message) = client.receive_message(REPLICATION_CHANNEL_ID) {
WorldDiff::deserialize_to_world(world, message)
.expect("server should send only valid world diffs");
}
});
world.resource_scope(|world, mut entity_map: Mut<NetworkEntityMap>| {
world.resource_scope(|world, replication_rules: Mut<ReplicationRules>| {
while let Some(message) = client.receive_message(REPLICATION_CHANNEL_ID) {
let mut cursor = Cursor::new(message);

if !deserialize_tick(&mut cursor, world)? {
continue;
}

deserialize_component_diffs(
&mut cursor,
world,
&mut entity_map,
&replication_rules,
DiffKind::Change,
)?;
deserialize_component_diffs(
&mut cursor,
world,
&mut entity_map,
&replication_rules,
DiffKind::Removal,
)?;
deserialize_despawns(&mut cursor, world, &mut entity_map)?;
}

Ok(())
})
})
})
}

fn ack_sending_system(last_tick: Res<LastTick>, mut client: ResMut<RenetClient>) {
Expand All @@ -66,6 +95,74 @@ impl ClientPlugin {
}
}

/// Deserializes server tick and applies it to [`LastTick`] if it is newer.
///
/// Returns true if [`LastTick`] has been updated.
fn deserialize_tick(cursor: &mut Cursor<Bytes>, world: &mut World) -> Result<bool, bincode::Error> {
let tick = bincode::deserialize_from(cursor)?;

let mut last_tick = world.resource_mut::<LastTick>();
if last_tick.0 < tick {
last_tick.0 = tick;
Ok(true)
} else {
Ok(false)
}
}

/// Deserializes component [`DiffKind`] and applies them to the [`World`].
fn deserialize_component_diffs(
cursor: &mut Cursor<Bytes>,
world: &mut World,
entity_map: &mut NetworkEntityMap,
replication_rules: &ReplicationRules,
diff_kind: DiffKind,
) -> Result<(), bincode::Error> {
let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..entities_count {
let entity = DefaultOptions::new().deserialize_from(&mut *cursor)?;
let mut entity = entity_map.get_by_server_or_spawn(world, entity);
let components_count: u8 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..components_count {
let replication_id = DefaultOptions::new().deserialize_from(&mut *cursor)?;
let replication_info = replication_rules.get_info(replication_id);
match diff_kind {
DiffKind::Change => {
(replication_info.deserialize)(&mut entity, entity_map, cursor)?
}
DiffKind::Removal => (replication_info.remove)(&mut entity),
}
}
}

Ok(())
}

enum DiffKind {
Change,
Removal,
}

/// Deserializes despawns and applies them to the [`World`].
fn deserialize_despawns(
cursor: &mut Cursor<Bytes>,
world: &mut World,
entity_map: &mut NetworkEntityMap,
) -> Result<(), bincode::Error> {
let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..entities_count {
// The entity might have already been deleted with the last diff,
// but the server might not yet have received confirmation from the
// client and could include the deletion in the latest diff.
let server_entity = DefaultOptions::new().deserialize_from(&mut *cursor)?;
if let Some(client_entity) = entity_map.remove_by_server(server_entity) {
world.entity_mut(client_entity).despawn_recursive();
}
}

Ok(())
}

/// Last received tick from server.
///
/// Exists only on clients, sent to the server.
Expand Down
12 changes: 10 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ app.replicate_with::<Transform>(serialize_transform, deserialize_transform);
/// Serializes only translation.
fn serialize_transform(
component: Ptr,
cursor: &mut Cursor<&mut Vec<u8>>,
cursor: &mut Cursor<Vec<u8>>,
) -> Result<(), bincode::Error> {
// SAFETY: Function called for registered `ComponentId`.
let transform: &Transform = unsafe { component.deref() };
Expand Down Expand Up @@ -171,7 +171,7 @@ fn player_init_system(

#[derive(Component, Deserialize, Serialize)]
struct Player;
# fn serialize_transform(_: Ptr, _: &mut Cursor<&mut Vec<u8>>) -> Result<(), bincode::Error> { unimplemented!() }
# fn serialize_transform(_: Ptr, _: &mut Cursor<Vec<u8>>) -> Result<(), bincode::Error> { unimplemented!() }
# fn deserialize_transform(_: &mut EntityMut, _: &mut NetworkEntityMap, _: &mut Cursor<Bytes>) -> Result<(), bincode::Error> { unimplemented!() }
```

Expand Down Expand Up @@ -348,6 +348,14 @@ To check if you running server or client, you can use conditions based on
They rarely used for gameplay systems (since you write the same logic for
multiplayer and single-player!), but could be used for server
creation / connection systems and corresponding UI.

## Limits

To reduce packet size there are the following limits per replication update:

- Up to [`u16::MAX`] entities that have changed/added components with up to [`u8::MAX`] such components.
- Up to [`u16::MAX`] entities that have removed components with up to [`u8::MAX`] such components.
- Up to [`u16::MAX`] entities that were despawned.
*/

pub mod client;
Expand Down
131 changes: 3 additions & 128 deletions src/replicon_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use bevy::{
};
use bevy_renet::renet::{Bytes, ChannelConfig, SendType};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use strum::EnumDiscriminants;

use crate::client::{ClientMapper, LastTick, NetworkEntityMap};
use crate::client::{ClientMapper, NetworkEntityMap};

pub struct RepliconCorePlugin;

Expand Down Expand Up @@ -191,7 +190,7 @@ impl FromWorld for ReplicationRules {
}

/// Signature of serialization function stored in [`ReplicationInfo`].
pub type SerializeFn = fn(Ptr, &mut Cursor<&mut Vec<u8>>) -> Result<(), bincode::Error>;
pub type SerializeFn = fn(Ptr, &mut Cursor<Vec<u8>>) -> Result<(), bincode::Error>;

/// Signature of deserialization function stored in [`ReplicationInfo`].
pub type DeserializeFn =
Expand Down Expand Up @@ -230,7 +229,7 @@ pub struct ReplicationId(usize);
/// Default serialization function.
fn serialize_component<C: Component + Serialize>(
component: Ptr,
cursor: &mut Cursor<&mut Vec<u8>>,
cursor: &mut Cursor<Vec<u8>>,
) -> Result<(), bincode::Error> {
// SAFETY: Function called for registered `ComponentId`.
let component: &C = unsafe { component.deref() };
Expand Down Expand Up @@ -287,130 +286,6 @@ pub trait Mapper {
#[derive(Component, Clone, Copy)]
pub struct Replication;

/// Changed world data and current tick from server.
///
/// Sent from server to clients.
pub(super) struct WorldDiff<'a> {
pub(super) tick: NetworkTick,
pub(super) entities: HashMap<Entity, Vec<ComponentDiff<'a>>>,
pub(super) despawns: Vec<Entity>,
}

impl WorldDiff<'_> {
/// Creates a new [`WorldDiff`] with a tick and empty entities.
pub(super) fn new(tick: NetworkTick) -> Self {
Self {
tick,
entities: Default::default(),
despawns: Default::default(),
}
}

/// Serializes itself into a buffer.
///
/// We use custom implementation because serde impls require to use generics that can't be stored in [`ReplicationInfo`].
pub(super) fn serialize(
&self,
replication_rules: &ReplicationRules,
message: &mut Vec<u8>,
) -> Result<(), bincode::Error> {
let mut cursor = Cursor::new(message);

bincode::serialize_into(&mut cursor, &self.tick)?;

bincode::serialize_into(&mut cursor, &self.entities.len())?;
for (entity, components) in &self.entities {
bincode::serialize_into(&mut cursor, entity)?;
bincode::serialize_into(&mut cursor, &components.len())?;
for &component_diff in components {
bincode::serialize_into(&mut cursor, &ComponentDiffKind::from(component_diff))?;
match component_diff {
ComponentDiff::Changed((replication_id, ptr)) => {
bincode::serialize_into(&mut cursor, &replication_id)?;
let replication_info = replication_rules.get_info(replication_id);
(replication_info.serialize)(ptr, &mut cursor)?;
}
ComponentDiff::Removed(replication_id) => {
bincode::serialize_into(&mut cursor, &replication_id)?;
}
}
}
}

bincode::serialize_into(&mut cursor, &self.despawns)?;

Ok(())
}

/// Deserializes itself from bytes directly into the world by applying all changes.
///
/// Does nothing if world already received a more recent diff.
/// See also [`LastTick`].
pub(super) fn deserialize_to_world(
world: &mut World,
message: Bytes,
) -> Result<(), bincode::Error> {
let mut cursor = Cursor::new(message);

let tick = bincode::deserialize_from(&mut cursor)?;
let mut last_tick = world.resource_mut::<LastTick>();
if last_tick.0 >= tick {
return Ok(());
}
last_tick.0 = tick;

world.resource_scope(|world, replication_rules: Mut<ReplicationRules>| {
world.resource_scope(|world, mut entity_map: Mut<NetworkEntityMap>| {
let entities_count: usize = bincode::deserialize_from(&mut cursor)?;
for _ in 0..entities_count {
let entity = bincode::deserialize_from(&mut cursor)?;
let mut entity = entity_map.get_by_server_or_spawn(world, entity);
let components_count: usize = bincode::deserialize_from(&mut cursor)?;
for _ in 0..components_count {
let diff_kind = bincode::deserialize_from(&mut cursor)?;
let replication_id = bincode::deserialize_from(&mut cursor)?;
let replication_info = replication_rules.get_info(replication_id);
match diff_kind {
ComponentDiffKind::Changed => {
(replication_info.deserialize)(
&mut entity,
&mut entity_map,
&mut cursor,
)?;
}
ComponentDiffKind::Removed => {
(replication_info.remove)(&mut entity);
}
}
}
}

let despawns: Vec<Entity> = bincode::deserialize_from(&mut cursor)?;
for server_entity in despawns {
// The entity might have already been deleted with the last diff,
// but the server might not yet have received confirmation from the
// client and could include the deletion in the latest diff.
if let Some(client_entity) = entity_map.remove_by_server(server_entity) {
world.entity_mut(client_entity).despawn_recursive();
}
}

Ok(())
})
})
}
}

/// Type of component change.
#[derive(EnumDiscriminants, Clone, Copy)]
#[strum_discriminants(name(ComponentDiffKind), derive(Deserialize, Serialize))]
pub(super) enum ComponentDiff<'a> {
/// Indicates that a component was added or changed, contains its ID and pointer.
Changed((ReplicationId, Ptr<'a>)),
/// Indicates that a component was removed, contains its ID.
Removed(ReplicationId),
}

/// Corresponds to the number of server update.
///
/// See also [`crate::server::TickPolicy`].
Expand Down
Loading