Skip to content

Commit

Permalink
send entity mappings as array at start of packet
Browse files Browse the repository at this point in the history
  • Loading branch information
RJ committed Oct 5, 2023
1 parent da7a988 commit b8348bb
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 96 deletions.
78 changes: 38 additions & 40 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use bevy::{
use bevy_renet::{renet::Bytes, transport::client_connected};
use bevy_renet::{renet::RenetClient, transport::NetcodeClientPlugin, RenetClientPlugin};
use bincode::{DefaultOptions, Options};
use varint_rs::VarintReader;

use crate::replicon_core::{
replication_rules::{Mapper, Replication, ReplicationRules},
replicon_tick::RepliconTick,
REPLICATION_CHANNEL_ID,
};
use crate::server::replication_buffer::ReplicationBuffer;

pub struct ClientPlugin;

Expand Down Expand Up @@ -66,6 +66,8 @@ impl ClientPlugin {
continue;
}

deserialize_entity_mappings(&mut cursor, world, &mut entity_map)?;

deserialize_component_diffs(
&mut cursor,
world,
Expand Down Expand Up @@ -127,7 +129,7 @@ fn deserialize_tick(
cursor: &mut Cursor<Bytes>,
world: &mut World,
) -> Result<Option<RepliconTick>, bincode::Error> {
let tick = bincode::deserialize_from(cursor)?;
let tick = ReplicationBuffer::read_replicon_tick(cursor)?;

let mut last_tick = world.resource_mut::<LastRepliconTick>();
if last_tick.0 < tick {
Expand All @@ -138,6 +140,31 @@ fn deserialize_tick(
}
}

fn deserialize_entity_mappings(
cursor: &mut Cursor<Bytes>,
world: &mut World,
entity_map: &mut NetworkEntityMap,
) -> Result<(), bincode::Error> {
let mappings = ReplicationBuffer::read_entity_mappings(cursor)?;
for (server_entity, client_entity) in mappings.iter() {
// does this server entity already map to a client entity?
if let Some(existing_mapping) = entity_map.get_mapping_from_server(*server_entity) {
println!("Received mapping for s:{server_entity:?} -> c:{client_entity:?}, but already mapped to c:{existing_mapping:?}");
continue;
}
// does client entity actually exist? maybe we despawned it due to timings
if let Some(mut cmd) = world.get_entity_mut(*client_entity) {
println!("Adding entity mapping s:{server_entity:?} -> c:{client_entity:?}");
cmd.insert(Replication);
entity_map.insert(*server_entity, *client_entity);
} else {
println!("Received mapping for s:{server_entity:?} -> c:{client_entity:?}, but client entity doesn't exist");
continue;
}
}
Ok(())
}

/// Deserializes component diffs of `diff_kind` and applies them to the `world`.
fn deserialize_component_diffs(
cursor: &mut Cursor<Bytes>,
Expand All @@ -149,12 +176,8 @@ fn deserialize_component_diffs(
) -> Result<(), bincode::Error> {
let entities_count: u16 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..entities_count {
let entity = deserialize_entity(&mut *cursor)?;
let predicted_entity = match deserialize_entity(&mut *cursor)? {
Entity::PLACEHOLDER => None,
e => Some(e),
};
let mut entity = entity_map.get_by_server_or_spawn(world, entity, predicted_entity);
let entity = ReplicationBuffer::read_entity(cursor)?;
let mut entity = entity_map.get_by_server_or_spawn(world, entity);
let components_count: u8 = bincode::deserialize_from(&mut *cursor)?;
for _ in 0..components_count {
let replication_id = DefaultOptions::new().deserialize_from(&mut *cursor)?;
Expand Down Expand Up @@ -185,7 +208,7 @@ fn deserialize_despawns(
// The entity might have already been despawned because of hierarchy or
// with the last diff, but the server might not yet have received confirmation
// from the client and could include the deletion in the latest diff.
let server_entity = deserialize_entity(&mut *cursor)?;
let server_entity = ReplicationBuffer::read_entity(cursor)?;
if let Some(client_entity) = entity_map
.remove_by_server(server_entity)
.and_then(|entity| world.get_entity_mut(entity))
Expand All @@ -197,21 +220,6 @@ fn deserialize_despawns(
Ok(())
}

/// Deserializes `entity` from compressed index and generation, for details see [`ReplicationBuffer::write_entity()`].
fn deserialize_entity(cursor: &mut Cursor<Bytes>) -> Result<Entity, bincode::Error> {
let flagged_index: u64 = cursor.read_u64_varint()?;
let has_generation = (flagged_index & 1) > 0;
let generation = if has_generation {
cursor.read_u32_varint()?
} else {
0u32
};

let bits = (generation as u64) << 32 | (flagged_index >> 1);

Ok(Entity::from_bits(bits))
}

/// Type of component change.
///
/// Parameter for [`deserialize_component_diffs`].
Expand Down Expand Up @@ -259,30 +267,20 @@ impl NetworkEntityMap {
self.client_to_server.insert(client_entity, server_entity);
}

// Gets client Entity mapped from server Entity, if a mapping exists
pub(super) fn get_mapping_from_server(&self, server_entity: Entity) -> Option<&Entity> {
self.server_to_client.get(&server_entity)
}

pub(super) fn get_by_server_or_spawn<'a>(
&mut self,
world: &'a mut World,
server_entity: Entity,
predicted_entity: Option<Entity>,
) -> EntityMut<'a> {
match self.server_to_client.entry(server_entity) {
Entry::Occupied(entry) => world.entity_mut(*entry.get()),
Entry::Vacant(entry) => {
// test if a predicted entity exists
let client_entity = match predicted_entity {
Some(e) => {
if world.get_entity(e).is_some() {
let mut ent_mut = world.get_entity_mut(e).unwrap();
ent_mut.insert(Replication);
ent_mut
} else {
// miss, spawn a new entity.
world.spawn(Replication)
}
}
None => world.spawn(Replication),
};

let client_entity = world.spawn(Replication);
entry.insert(client_entity.id());
self.client_to_server
.insert(client_entity.id(), server_entity);
Expand Down
39 changes: 32 additions & 7 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,13 @@ impl ServerPlugin {
acked_ticks.register_tick(*replicon_tick, change_tick.this_run());

let buffers = prepare_buffers(&mut buffers, &acked_ticks, *replicon_tick)?;

collect_mappings(buffers, &acked_ticks, &predictions)?;
collect_changes(
buffers,
set.p0(),
change_tick.this_run(),
&replication_rules,
&predictions,
)?;
collect_removals(buffers, &removal_trackers, change_tick.this_run())?;
collect_despawns(buffers, &despawn_tracker, change_tick.this_run())?;
Expand Down Expand Up @@ -218,15 +219,41 @@ fn prepare_buffers<'a>(
Ok(&mut buffers[..acked_ticks.clients.len()])
}

/// Collect and write any new entity mappings into buffers since last acknowledged tick
fn collect_mappings(
buffers: &mut [ReplicationBuffer],
acked_ticks: &ResMut<AckedTicks>,
predictions: &Res<PredictionTracker>,
) -> Result<(), bincode::Error> {
for buffer in &mut *buffers {
// Include all entity mappings since the last acknowledged tick.
//
// if the spawn command for a mapped client entity was lost, a mapped component on another
// entity could arrive first, referencing the mapped entity, so we include all until acked.
//
// could this grow very large? probably not, since mappings only get created in response to
// clients sending specific types of command to the server, and if there is any packet loss
// resulting in a larger unacked backlog, it's unlikely the server received those commands
// anyway, so won't have created more mappings during the packet loss.
let acked_tick = acked_ticks
.acked_ticks()
.get(&buffer.client_id())
.unwrap_or(&RepliconTick(0));
let mappings = predictions.get_mappings(buffer.client_id(), *acked_tick);
buffer.write_entity_mappings(mappings)?;
}
Ok(())
}

/// Collect component changes into buffers based on last acknowledged tick.
fn collect_changes(
buffers: &mut [ReplicationBuffer],
world: &World,
system_tick: Tick,
replication_rules: &ReplicationRules,
predictions: &Res<PredictionTracker>,
) -> Result<(), bincode::Error> {
for buffer in &mut *buffers {
// start the array for entity change data:
buffer.start_array();
}

Expand All @@ -245,10 +272,7 @@ fn collect_changes(

for archetype_entity in archetype.entities() {
for buffer in &mut *buffers {
let predicted_entity = predictions
.get_predicted_entity(buffer.client_id(), archetype_entity.entity())
.copied();
buffer.start_entity_data(archetype_entity.entity(), predicted_entity);
buffer.start_entity_data(archetype_entity.entity());
}

for component_id in archetype.components() {
Expand Down Expand Up @@ -314,6 +338,7 @@ fn collect_changes(
}

for buffer in &mut *buffers {
// ending the array of entity change data
buffer.end_array()?;
}

Expand All @@ -332,7 +357,7 @@ fn collect_removals(

for (entity, removal_tracker) in removal_trackers {
for buffer in &mut *buffers {
buffer.start_entity_data(entity, None);
buffer.start_entity_data(entity);
for (&replication_id, &tick) in &removal_tracker.0 {
if tick.is_newer_than(buffer.system_tick(), system_tick) {
buffer.write_removal(replication_id)?;
Expand Down
52 changes: 29 additions & 23 deletions src/server/prediction_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ use super::RepliconTick;
/// Check for this in a system to perform cleanup:
///
/// ```rust
/// # use bevy::prelude::*;
/// # #[derive(Component)]
/// # struct Prediction;
/// # #[derive(Component)]
/// # struct Replication;
/// fn cleanup_successful_predictions(
/// q: Query<Entity, (With<Prediction>, Added<Replication>)>,
/// mut commands: Commands,
Expand All @@ -70,11 +75,11 @@ use super::RepliconTick;
pub struct PredictionTracker {
mappings: HashMap<u64, Vec<EntityMapping>>,
}
type EntityMapping = (RepliconTick, ServerEntity, ClientEntity);
pub(crate) type EntityMapping = (RepliconTick, ServerEntity, ClientEntity);

// Internal aliases for clarity in the PredictionTracker types above.
type ServerEntity = Entity;
type ClientEntity = Entity;
// Aliases for clarity in APIs dealing with `Entity`s that exist on servers and clients.
pub(crate) type ServerEntity = Entity;
pub(crate) type ClientEntity = Entity;

impl PredictionTracker {
/// Register that the server spawned `server_entity` as a result of `client_id` sending a
Expand All @@ -89,43 +94,44 @@ impl PredictionTracker {
tick: RepliconTick,
) {
let new_entry = (tick, server_entity, client_entity);
if let Some(mut v) = self.mappings.get_mut(&client_id) {
if let Some(v) = self.mappings.get_mut(&client_id) {
v.push(new_entry);
} else {
self.mappings.insert(client_id, vec![new_entry]);
}
}
/// gives an optional iter over (tick, server_entity, client_entity)
pub(crate) fn get_mappings(
&self,
client_id: u64,
tick: RepliconTick,
) -> impl Iterator<Item = &EntityMapping> {
// let Some(v) = self.mappings.get(&client_id) else {
// return None;
// };
let v = match self.mappings
.get(&client_id) {
Some(v) => v.iter(),
None => std::iter::empty().into(),
};


.map_(std::iter::empty())
.into_iter()
.filter(|(entry_tick, _, _)| *entry_tick >= tick)
) -> Option<Vec<(ServerEntity, ClientEntity)>> {
let Some(v) = self.mappings.get(&client_id) else {
return None;
};
Some(
v.iter()
.filter_map(|(entry_tick, server_entity, client_entity)| {
if *entry_tick >= tick {
Some((*server_entity, *client_entity))
} else {
None
}
})
.collect::<Vec<(Entity, Entity)>>(),
)
}
/// remove predicted entities in cases where the RepliconTick at which that entity was spawned
/// has been acked by a client.
pub(crate) fn cleanup_acked(&mut self, client_id: u64, acked_tick: RepliconTick) {
let Some(v) = self.tick_map.get_mut(&client_id) else {
let Some(v) = self.mappings.get_mut(&client_id) else {
return;
};
v.retain(|(tick, server_entity)| {
if tick.get() > acked_tick.get() {
v.retain(|(tick, _, _)| {
if *tick > acked_tick {
// not acked yet, retain it
return true;
}
self.entity_map.remove(&(client_id, *server_entity));
false
});
}
Expand Down
Loading

0 comments on commit b8348bb

Please sign in to comment.