Skip to content

Commit

Permalink
Add API for checking for mutate messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Shatur committed Nov 21, 2024
1 parent be32d0a commit 52bca7e
Show file tree
Hide file tree
Showing 8 changed files with 489 additions and 5 deletions.
32 changes: 32 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ pub mod confirm_history;
#[cfg(feature = "client_diagnostics")]
pub mod diagnostics;
pub mod events;
pub mod server_mutate_ticks;

use std::{io::Cursor, mem};

use bevy::{ecs::world::CommandQueue, prelude::*};
use bincode::{DefaultOptions, Options};
use bytes::Bytes;
use integer_encoding::{FixedIntReader, VarIntReader, VarIntWriter};
use server_mutate_ticks::ServerMutateTicks;

use crate::core::{
channels::{ReplicationChannel, RepliconChannels},
Expand All @@ -21,6 +23,7 @@ use crate::core::{
ctx::{DespawnCtx, RemoveCtx, WriteCtx},
ReplicationRegistry,
},
track_mutate_messages::TrackMutateMessages,
Replicated,
},
replicon_client::RepliconClient,
Expand Down Expand Up @@ -67,6 +70,12 @@ impl Plugin for ClientPlugin {
)
.add_systems(PreUpdate, Self::reset.in_set(ClientSet::Reset));
}

fn finish(&self, app: &mut App) {
if **app.world().resource::<TrackMutateMessages>() {
app.init_resource::<ServerMutateTicks>();
}
}
}

impl ClientPlugin {
Expand Down Expand Up @@ -101,10 +110,12 @@ impl ClientPlugin {
world.resource_scope(|world, command_markers: Mut<CommandMarkers>| {
world.resource_scope(|world, registry: Mut<ReplicationRegistry>| {
let mut stats = world.remove_resource::<ClientStats>();
let mut mutate_ticks = world.remove_resource::<ServerMutateTicks>();
let mut params = ReceiveParams {
queue: &mut queue,
entity_markers: &mut entity_markers,
entity_map: &mut entity_map,
mutate_ticks: mutate_ticks.as_mut(),
stats: stats.as_mut(),
command_markers: &command_markers,
registry: &registry,
Expand All @@ -120,6 +131,9 @@ impl ClientPlugin {
if let Some(stats) = stats {
world.insert_resource(stats);
}
if let Some(mutate_ticks) = mutate_ticks {
world.insert_resource(mutate_ticks);
}

Ok(())
})
Expand Down Expand Up @@ -267,11 +281,17 @@ fn buffer_mutate_message(

let change_tick = DefaultOptions::new().deserialize_from(&mut cursor)?;
let message_tick = DefaultOptions::new().deserialize_from(&mut cursor)?;
let messages_count = if params.mutate_ticks.is_some() {
cursor.read_varint()?
} else {
1
};
let mutate_index = cursor.read_varint()?;
trace!("received mutate message for {message_tick:?}");
buffered_mutations.insert(BufferedMutate {
change_tick,
message_tick,
messages_count,
message: message.slice(cursor.position() as usize..),
});

Expand Down Expand Up @@ -308,6 +328,10 @@ fn apply_mutate_messages(
Err(e) => result = Err(e),
}

if let Some(mutate_ticks) = &mut params.mutate_ticks {
mutate_ticks.confirm(mutate.message_tick, mutate.messages_count);
}

false
});

Expand Down Expand Up @@ -611,6 +635,7 @@ struct ReceiveParams<'a> {
queue: &'a mut CommandQueue,
entity_markers: &'a mut EntityMarkers,
entity_map: &'a mut ServerEntityMap,
mutate_ticks: Option<&'a mut ServerMutateTicks>,
stats: Option<&'a mut ClientStats>,
command_markers: &'a CommandMarkers,
registry: &'a ReplicationRegistry,
Expand Down Expand Up @@ -675,6 +700,8 @@ pub enum ClientSet {
///
/// In other words, last [`RepliconTick`] with a removal, insertion, spawn or despawn.
/// When a component mutates, this value is not updated.
///
/// See also [`ServerMutateTicks`](server_mutate_ticks::ServerMutateTicks).
#[derive(Clone, Copy, Debug, Default, Deref, Resource)]
pub struct ServerChangeTick(RepliconTick);

Expand Down Expand Up @@ -708,6 +735,11 @@ pub(super) struct BufferedMutate {
/// The tick this mutations corresponds to.
message_tick: RepliconTick,

/// Number of all mutate messages sent by server for this tick.
///
/// May not be equal to the number of received messages.
messages_count: usize,

/// Mutations data.
message: Bytes,
}
Expand Down
Loading

0 comments on commit 52bca7e

Please sign in to comment.