Skip to content

Commit

Permalink
Better history API (#359)
Browse files Browse the repository at this point in the history
Co-authored-by: UkoeHB <[email protected]>
  • Loading branch information
Shatur and UkoeHB authored Dec 1, 2024
1 parent 9c0e1ed commit c99bfca
Show file tree
Hide file tree
Showing 12 changed files with 893 additions and 48 deletions.
147 changes: 108 additions & 39 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod confirm_history;
#[cfg(feature = "client_diagnostics")]
pub mod diagnostics;
pub mod events;
pub mod server_mutate_ticks;

use std::{io::Cursor, mem};

Expand All @@ -21,13 +22,15 @@ use crate::core::{
ctx::{DespawnCtx, RemoveCtx, WriteCtx},
ReplicationRegistry,
},
track_mutate_messages::TrackMutateMessages,
Replicated,
},
replicon_client::RepliconClient,
replicon_tick::RepliconTick,
server_entity_map::ServerEntityMap,
};
use confirm_history::ConfirmHistory;
use confirm_history::{ConfirmHistory, EntityReplicated};
use server_mutate_ticks::{MutateTickReceived, ServerMutateTicks};

/// Client functionality and replication receiving.
///
Expand All @@ -40,6 +43,8 @@ impl Plugin for ClientPlugin {
.init_resource::<ServerEntityMap>()
.init_resource::<ServerChangeTick>()
.init_resource::<BufferedMutations>()
.add_event::<EntityReplicated>()
.add_event::<MutateTickReceived>()
.configure_sets(
PreUpdate,
(
Expand Down Expand Up @@ -67,6 +72,12 @@ impl Plugin for ClientPlugin {
)
.add_systems(PreUpdate, Self::reset.in_set(ClientSet::Reset));
}

fn finish(&self, app: &mut App) {
if **app.world().resource::<TrackMutateMessages>() {
app.init_resource::<ServerMutateTicks>();
}
}
}

impl ClientPlugin {
Expand Down Expand Up @@ -100,28 +111,40 @@ impl ClientPlugin {
world.resource_scope(|world, mut buffered_mutations: Mut<BufferedMutations>| {
world.resource_scope(|world, command_markers: Mut<CommandMarkers>| {
world.resource_scope(|world, registry: Mut<ReplicationRegistry>| {
let mut stats = world.remove_resource::<ClientReplicationStats>();
let mut params = ReceiveParams {
queue: &mut queue,
entity_markers: &mut entity_markers,
entity_map: &mut entity_map,
stats: stats.as_mut(),
command_markers: &command_markers,
registry: &registry,
};

apply_replication(
world,
&mut params,
&mut client,
&mut buffered_mutations,
)?;

if let Some(stats) = stats {
world.insert_resource(stats);
}

Ok(())
world.resource_scope(
|world, mut replicated_events: Mut<Events<EntityReplicated>>| {
let mut stats =
world.remove_resource::<ClientReplicationStats>();
let mut mutate_ticks =
world.remove_resource::<ServerMutateTicks>();
let mut params = ReceiveParams {
queue: &mut queue,
entity_markers: &mut entity_markers,
entity_map: &mut entity_map,
replicated_events: &mut replicated_events,
mutate_ticks: mutate_ticks.as_mut(),
stats: stats.as_mut(),
command_markers: &command_markers,
registry: &registry,
};

apply_replication(
world,
&mut params,
&mut client,
&mut buffered_mutations,
)?;

if let Some(stats) = stats {
world.insert_resource(stats);
}
if let Some(mutate_ticks) = mutate_ticks {
world.insert_resource(mutate_ticks);
}

Ok(())
},
)
})
})
})
Expand All @@ -133,12 +156,14 @@ impl ClientPlugin {
mut change_tick: ResMut<ServerChangeTick>,
mut entity_map: ResMut<ServerEntityMap>,
mut buffered_mutations: ResMut<BufferedMutations>,
mut stats: ResMut<ClientReplicationStats>,
stats: Option<ResMut<ClientReplicationStats>>,
) {
*change_tick = Default::default();
entity_map.clear();
buffered_mutations.clear();
*stats = Default::default();
if let Some(mut stats) = stats {
*stats = Default::default();
}
}
}

Expand Down Expand Up @@ -265,11 +290,17 @@ fn buffer_mutate_message(

let change_tick = bincode::deserialize_from(&mut cursor)?;
let message_tick = bincode::deserialize_from(&mut cursor)?;
let messages_count = if params.mutate_ticks.is_some() {
cursor.read_varint()?
} else {
1
};
let mutate_index = cursor.read_varint()?;
trace!("received mutate message for {message_tick:?}");
buffered_mutations.insert(BufferedMutate {
change_tick,
message_tick,
messages_count,
message: message.slice(cursor.position() as usize..),
});

Expand Down Expand Up @@ -308,6 +339,14 @@ fn apply_mutate_messages(
Err(e) => result = Err(e),
}

if let Some(mutate_ticks) = &mut params.mutate_ticks {
if mutate_ticks.confirm(mutate.message_tick, mutate.messages_count) {
world.send_event(MutateTickReceived {
tick: mutate.message_tick,
});
}
}

false
});

Expand Down Expand Up @@ -377,13 +416,12 @@ fn apply_removals(
.entity_markers
.read(params.command_markers, &*client_entity);

if let Some(mut history) = client_entity.get_mut::<ConfirmHistory>() {
history.set_last_tick(message_tick);
} else {
commands
.entity(client_entity.id())
.insert(ConfirmHistory::new(message_tick));
}
confirm_tick(
&mut commands,
&mut client_entity,
params.replicated_events,
message_tick,
);

let len = apply_array(ArrayKind::Sized, cursor, |cursor| {
let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?;
Expand Down Expand Up @@ -426,13 +464,12 @@ fn apply_changes(
.entity_markers
.read(params.command_markers, &*client_entity);

if let Some(mut history) = client_entity.get_mut::<ConfirmHistory>() {
history.set_last_tick(message_tick);
} else {
commands
.entity(client_entity.id())
.insert(ConfirmHistory::new(message_tick));
}
confirm_tick(
&mut commands,
&mut client_entity,
params.replicated_events,
message_tick,
);

let len = apply_array(ArrayKind::Sized, cursor, |cursor| {
let fns_id = DefaultOptions::new().deserialize_from(&mut *cursor)?;
Expand Down Expand Up @@ -498,6 +535,25 @@ enum ArrayKind {
Dynamic,
}

fn confirm_tick(
commands: &mut Commands,
entity: &mut DeferredEntity,
replicated_events: &mut Events<EntityReplicated>,
tick: RepliconTick,
) {
if let Some(mut history) = entity.get_mut::<ConfirmHistory>() {
history.set_last_tick(tick);
} else {
commands
.entity(entity.id())
.insert(ConfirmHistory::new(tick));
}
replicated_events.send(EntityReplicated {
entity: entity.id(),
tick,
});
}

/// Deserializes and applies component mutations for all entities.
///
/// Consumes all remaining bytes in the cursor.
Expand Down Expand Up @@ -551,6 +607,10 @@ fn apply_mutations(

history.set(ago);
}
params.replicated_events.send(EntityReplicated {
entity: client_entity.id(),
tick: message_tick,
});

let end_pos = cursor.position() + data_size as u64;
let mut components_count = 0;
Expand Down Expand Up @@ -618,6 +678,8 @@ struct ReceiveParams<'a> {
queue: &'a mut CommandQueue,
entity_markers: &'a mut EntityMarkers,
entity_map: &'a mut ServerEntityMap,
replicated_events: &'a mut Events<EntityReplicated>,
mutate_ticks: Option<&'a mut ServerMutateTicks>,
stats: Option<&'a mut ClientReplicationStats>,
command_markers: &'a CommandMarkers,
registry: &'a ReplicationRegistry,
Expand Down Expand Up @@ -688,6 +750,8 @@ pub enum ClientSet {
///
/// In other words, the last [`RepliconTick`] with a removal, insertion, spawn or despawn.
/// This value is not updated when mutation messages are received from the server.
///
/// See also [`ServerMutateTicks`].
#[derive(Clone, Copy, Debug, Default, Deref, Resource)]
pub struct ServerChangeTick(RepliconTick);

Expand Down Expand Up @@ -721,6 +785,11 @@ pub(super) struct BufferedMutate {
/// The tick this mutations corresponds to.
message_tick: RepliconTick,

/// Total number of mutate messages sent by the server for this tick.
///
/// May not be equal to the number of received messages.
messages_count: usize,

/// Mutations data.
message: Bytes,
}
Expand Down
14 changes: 14 additions & 0 deletions src/client/confirm_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::core::replicon_tick::RepliconTick;
///
/// For efficiency we store only the last received tick and
/// a bitmask indicating whether the most recent 64 ticks were received.
///
/// See also [`EntityReplicated`].
#[derive(Component)]
pub struct ConfirmHistory {
/// Previously confirmed ticks, including the last tick at position 0.
Expand Down Expand Up @@ -124,6 +126,18 @@ impl ConfirmHistory {
}
}

/// Triggered for an entity when it receives changes for a tick.
///
/// See also [`ConfirmHistory`].
#[derive(Debug, Event, Clone, Copy)]
pub struct EntityReplicated {
/// Entity that received changes.
pub entity: Entity,

/// Message tick.
pub tick: RepliconTick,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit c99bfca

Please sign in to comment.