Skip to content

Commit

Permalink
Rename change message and into update message (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shatur authored Dec 1, 2024
1 parent 42537ea commit 30caf07
Show file tree
Hide file tree
Showing 21 changed files with 202 additions and 202 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Provide replication statistics by sum instead of per second and use `usize` for it.
- Use fixed integer encoding for ticks for server events.
- Rename `ServerPlugin::change_timeout` into `ServerPlugin::mutations_timeout`.
- Rename `ServerInitTick` into `ServerChangeTick`.
- Rename `ServerInitTick` into `ServerUpdateTick`.
- Rename `ReplicatedClient::init_tick` into `ReplicatedClient::change_tick`.
- Rename `ReplicatedClient::get_change_tick` into `ReplicatedClient::mutation_tick`.
- Rename `ReplicationChannel::Init` into `ReplicationChannel::Changes`.
- Rename `ReplicationChannel::Init` into `ReplicationChannel::Updates`.
- Rename `ReplicationChannel::Update` into `ReplicationChannel::Mutations`.
- Rename `ClientStats` into `ClientReplicationStats`.
- Rename `ClientDiagnosticsPlugin::MESSAGES` into `ClientDiagnosticsPlugin::REPLICATION_MESSAGES`.
Expand Down
68 changes: 34 additions & 34 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use crate::core::{
channels::{ReplicationChannel, RepliconChannels},
common_conditions::{client_connected, client_just_connected, client_just_disconnected},
replication::{
change_message_flags::ChangeMessageFlags,
command_markers::{CommandMarkers, EntityMarkers},
deferred_entity::DeferredEntity,
replication_registry::{
ctx::{DespawnCtx, RemoveCtx, WriteCtx},
ReplicationRegistry,
},
track_mutate_messages::TrackMutateMessages,
update_message_flags::UpdateMessageFlags,
Replicated,
},
replicon_client::RepliconClient,
Expand All @@ -41,7 +41,7 @@ impl Plugin for ClientPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<RepliconClient>()
.init_resource::<ServerEntityMap>()
.init_resource::<ServerChangeTick>()
.init_resource::<ServerUpdateTick>()
.init_resource::<BufferedMutations>()
.add_event::<EntityReplicated>()
.add_event::<MutateTickReceived>()
Expand Down Expand Up @@ -87,12 +87,12 @@ impl ClientPlugin {

/// Receives and applies replication messages from the server.
///
/// Change messages are sent over the [`ReplicationChannel::Changes`] and are applied first to ensure valid state
/// Update messages are sent over the [`ReplicationChannel::Updates`] and are applied first to ensure valid state
/// for component mutations.
///
/// Mutate messages are sent over [`ReplicationChannel::Mutations`], which means they may appear
/// ahead-of or behind change messages from the same server tick. A mutation will only be applied if its
/// change tick has already appeared in an change message, otherwise it will be buffered while waiting.
/// ahead-of or behind update messages from the same server tick. A mutation will only be applied if its
/// update tick has already appeared in an update message, otherwise it will be buffered while waiting.
/// Since component mutations can arrive in any order, they will only be applied if they correspond to a more
/// recent server tick than the last acked server tick for each entity.
///
Expand Down Expand Up @@ -153,12 +153,12 @@ impl ClientPlugin {
}

fn reset(
mut change_tick: ResMut<ServerChangeTick>,
mut update_tick: ResMut<ServerUpdateTick>,
mut entity_map: ResMut<ServerEntityMap>,
mut buffered_mutations: ResMut<BufferedMutations>,
stats: Option<ResMut<ClientReplicationStats>>,
) {
*change_tick = Default::default();
*update_tick = Default::default();
entity_map.clear();
buffered_mutations.clear();
if let Some(mut stats) = stats {
Expand All @@ -176,33 +176,33 @@ fn apply_replication(
client: &mut RepliconClient,
buffered_mutations: &mut BufferedMutations,
) -> bincode::Result<()> {
for message in client.receive(ReplicationChannel::Changes) {
apply_change_message(world, params, &message)?;
for message in client.receive(ReplicationChannel::Updates) {
apply_update_message(world, params, &message)?;
}

// Unlike change messages, we read all mutate messages first, sort them by tick
// Unlike update messages, we read all mutate messages first, sort them by tick
// in descending order to ensure that the last mutation will be applied first.
// Since mutate messages manually split by packet size, we apply all messages,
// but skip outdated data per-entity by checking last received tick for it
// (unless user requested history via marker).
let change_tick = *world.resource::<ServerChangeTick>();
let update_tick = *world.resource::<ServerUpdateTick>();
let acks_size = mem::size_of::<u16>() * client.received_count(ReplicationChannel::Mutations);
if acks_size != 0 {
let mut acks = Vec::with_capacity(acks_size);
for message in client.receive(ReplicationChannel::Mutations) {
let mutate_index = buffer_mutate_message(params, buffered_mutations, message)?;
bincode::serialize_into(&mut acks, &mutate_index)?;
}
client.send(ReplicationChannel::Changes, acks);
client.send(ReplicationChannel::Updates, acks);
}

apply_mutate_messages(world, params, buffered_mutations, change_tick)
apply_mutate_messages(world, params, buffered_mutations, update_tick)
}

/// Reads and applies a change message.
/// Reads and applies an update message.
///
/// For details see [`replication_messages`](crate::server::replication_messages).
fn apply_change_message(
fn apply_update_message(
world: &mut World,
params: &mut ReceiveParams,
message: &[u8],
Expand All @@ -214,12 +214,12 @@ fn apply_change_message(
stats.bytes += end_pos;
}

let flags = ChangeMessageFlags::from_bits_retain(cursor.read_fixedint()?);
let flags = UpdateMessageFlags::from_bits_retain(cursor.read_fixedint()?);
debug_assert!(!flags.is_empty(), "message can't be empty");

let message_tick = bincode::deserialize_from(&mut cursor)?;
trace!("applying change message for {message_tick:?}");
world.resource_mut::<ServerChangeTick>().0 = message_tick;
trace!("applying update message for {message_tick:?}");
world.resource_mut::<ServerUpdateTick>().0 = message_tick;

let last_flag = flags.last();
for (_, flag) in flags.iter_names() {
Expand All @@ -230,7 +230,7 @@ fn apply_change_message(
};

match flag {
ChangeMessageFlags::MAPPINGS => {
UpdateMessageFlags::MAPPINGS => {
debug_assert_eq!(array_kind, ArrayKind::Sized);
let len = apply_array(array_kind, &mut cursor, |cursor| {
apply_entity_mapping(world, params, cursor)
Expand All @@ -239,23 +239,23 @@ fn apply_change_message(
stats.mappings += len;
}
}
ChangeMessageFlags::DESPAWNS => {
UpdateMessageFlags::DESPAWNS => {
let len = apply_array(array_kind, &mut cursor, |cursor| {
apply_despawn(world, params, cursor, message_tick)
})?;
if let Some(stats) = &mut params.stats {
stats.despawns += len;
}
}
ChangeMessageFlags::REMOVALS => {
UpdateMessageFlags::REMOVALS => {
let len = apply_array(array_kind, &mut cursor, |cursor| {
apply_removals(world, params, cursor, message_tick)
})?;
if let Some(stats) = &mut params.stats {
stats.entities_changed += len;
}
}
ChangeMessageFlags::CHANGES => {
UpdateMessageFlags::CHANGES => {
debug_assert_eq!(array_kind, ArrayKind::Dynamic);
let len = apply_array(array_kind, &mut cursor, |cursor| {
apply_changes(world, params, cursor, message_tick)
Expand Down Expand Up @@ -288,7 +288,7 @@ fn buffer_mutate_message(
stats.bytes += end_pos;
}

let change_tick = bincode::deserialize_from(&mut cursor)?;
let update_tick = bincode::deserialize_from(&mut cursor)?;
let message_tick = bincode::deserialize_from(&mut cursor)?;
let messages_count = if params.mutate_ticks.is_some() {
cursor.read_varint()?
Expand All @@ -298,7 +298,7 @@ fn buffer_mutate_message(
let mutate_index = cursor.read_varint()?;
trace!("received mutate message for {message_tick:?}");
buffered_mutations.insert(BufferedMutate {
change_tick,
update_tick,
message_tick,
messages_count,
message: message.slice(cursor.position() as usize..),
Expand All @@ -309,17 +309,17 @@ fn buffer_mutate_message(

/// Applies mutations from [`BufferedMutations`].
///
/// If the mutate message can't be applied yet (because the change message with the
/// If the mutate message can't be applied yet (because the update message with the
/// corresponding tick hasn't arrived), it will be kept in the buffer.
fn apply_mutate_messages(
world: &mut World,
params: &mut ReceiveParams,
buffered_mutations: &mut BufferedMutations,
change_tick: ServerChangeTick,
update_tick: ServerUpdateTick,
) -> bincode::Result<()> {
let mut result = Ok(());
buffered_mutations.0.retain(|mutate| {
if mutate.change_tick > *change_tick {
if mutate.update_tick > *update_tick {
return true;
}

Expand Down Expand Up @@ -374,7 +374,7 @@ fn apply_entity_mapping(
Ok(())
}

/// Deserializes and applies entity despawn from change message.
/// Deserializes and applies entity despawn from update message.
fn apply_despawn(
world: &mut World,
params: &mut ReceiveParams,
Expand Down Expand Up @@ -567,7 +567,7 @@ fn apply_mutations(
let data_size: usize = cursor.read_varint()?;

let Some(client_entity) = params.entity_map.get_by_server(server_entity) else {
// Mutation could arrive after a despawn from change message.
// Mutation could arrive after a despawn from update message.
debug!("ignoring mutations received for unknown server's {server_entity:?}");
cursor.set_position(cursor.position() + data_size as u64);
return Ok(());
Expand Down Expand Up @@ -746,16 +746,16 @@ pub enum ClientSet {
Reset,
}

/// Last received tick for change messages from the server.
/// Last received tick for update messages from the server.
///
/// In other words, the last [`RepliconTick`] with a removal, insertion, spawn or despawn.
/// This value is not updated when mutation messages are received from the server.
///
/// See also [`ServerMutateTicks`].
#[derive(Clone, Copy, Debug, Default, Deref, Resource)]
pub struct ServerChangeTick(RepliconTick);
pub struct ServerUpdateTick(RepliconTick);

/// Cached buffered mutate messages, used to synchronize mutations with change messages.
/// Cached buffered mutate messages, used to synchronize mutations with update messages.
///
/// If [`ClientSet::Reset`] is disabled, then this needs to be cleaned up manually with [`Self::clear`].
#[derive(Default, Resource)]
Expand All @@ -775,12 +775,12 @@ impl BufferedMutations {
}
}

/// Partially-deserialized mutate message that is waiting for its tick to appear in an change message.
/// Partially-deserialized mutate message that is waiting for its tick to appear in an update message.
///
/// See also [`crate::server::replication_messages`].
pub(super) struct BufferedMutate {
/// Required tick to wait for.
change_tick: RepliconTick,
update_tick: RepliconTick,

/// The tick this mutations corresponds to.
message_tick: RepliconTick,
Expand Down
4 changes: 2 additions & 2 deletions src/client/confirm_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ impl ConfirmHistory {
}
}

/// Triggered for an entity when it receives changes for a tick.
/// Triggered for an entity when it receives updates for a tick.
///
/// See also [`ConfirmHistory`].
#[derive(Debug, Event, Clone, Copy)]
pub struct EntityReplicated {
/// Entity that received changes.
/// Entity that received an update.
pub entity: Entity,

/// Message tick.
Expand Down
6 changes: 3 additions & 3 deletions src/client/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{ClientPlugin, ClientSet, ServerChangeTick};
use super::{ClientPlugin, ClientSet, ServerUpdateTick};
use crate::core::{
common_conditions::*,
event_registry::{
Expand Down Expand Up @@ -85,7 +85,7 @@ impl ClientEventsPlugin {
world.resource_scope(|world, registry: Mut<AppTypeRegistry>| {
world.resource_scope(|world, entity_map: Mut<ServerEntityMap>| {
world.resource_scope(|world, event_registry: Mut<EventRegistry>| {
let change_tick = **world.resource::<ServerChangeTick>();
let update_tick = **world.resource::<ServerUpdateTick>();
let mut ctx = ClientReceiveCtx {
registry: &registry.read(),
entity_map: &entity_map,
Expand All @@ -112,7 +112,7 @@ impl ClientEventsPlugin {
events.into_inner(),
queue.into_inner(),
&mut client,
change_tick,
update_tick,
)
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/server_mutate_ticks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::core::replicon_tick::RepliconTick;
/// [`TrackAppExt::track_mutate_messages`](crate::core::replication::track_mutate_messages::TrackAppExt::track_mutate_messages)
/// were called.
///
/// See also [`MutateTickReceived`] and [`ServerChangeTick`](super::ServerChangeTick).
/// See also [`MutateTickReceived`] and [`ServerUpdateTick`](super::ServerUpdateTick).
#[derive(Debug, Resource)]
pub struct ServerMutateTicks {
ticks: VecDeque<TickMessages>,
Expand Down
8 changes: 4 additions & 4 deletions src/core/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub enum ReplicationChannel {
/// For sending messages with entity mappings, inserts, removals and despawns.
///
/// This is an ordered reliable channel.
Changes,
Updates,
/// For sending messages with component mutations.
///
/// This is an unreliable channel.
Expand All @@ -20,7 +20,7 @@ pub enum ReplicationChannel {
impl From<ReplicationChannel> for RepliconChannel {
fn from(value: ReplicationChannel) -> Self {
match value {
ReplicationChannel::Changes => ChannelKind::Ordered.into(),
ReplicationChannel::Updates => ChannelKind::Ordered.into(),
ReplicationChannel::Mutations => ChannelKind::Unreliable.into(),
}
}
Expand Down Expand Up @@ -53,11 +53,11 @@ impl Default for RepliconChannels {
fn default() -> Self {
Self {
server: vec![
ReplicationChannel::Changes.into(),
ReplicationChannel::Updates.into(),
ReplicationChannel::Mutations.into(),
],
client: vec![
ReplicationChannel::Changes.into(),
ReplicationChannel::Updates.into(),
ReplicationChannel::Mutations.into(),
],
default_max_bytes: 5 * 1024 * 1024,
Expand Down
Loading

0 comments on commit 30caf07

Please sign in to comment.