Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename change message and into update message #372

Merged
merged 1 commit into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Provide replication statistics by sum instead of per second and use `usize` for it.
- Use fixed integer encoding for ticks for server events.
- Rename `ServerPlugin::change_timeout` into `ServerPlugin::mutations_timeout`.
- Rename `ServerInitTick` into `ServerChangeTick`.
- Rename `ServerInitTick` into `ServerUpdateTick`.
- Rename `ReplicatedClient::init_tick` into `ReplicatedClient::change_tick`.
- Rename `ReplicatedClient::get_change_tick` into `ReplicatedClient::mutation_tick`.
- Rename `ReplicationChannel::Init` into `ReplicationChannel::Changes`.
- Rename `ReplicationChannel::Init` into `ReplicationChannel::Updates`.
- Rename `ReplicationChannel::Update` into `ReplicationChannel::Mutations`.
- Rename `ClientStats` into `ClientReplicationStats`.
- Rename `ClientDiagnosticsPlugin::MESSAGES` into `ClientDiagnosticsPlugin::REPLICATION_MESSAGES`.
Expand Down
68 changes: 34 additions & 34 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use crate::core::{
channels::{ReplicationChannel, RepliconChannels},
common_conditions::{client_connected, client_just_connected, client_just_disconnected},
replication::{
change_message_flags::ChangeMessageFlags,
command_markers::{CommandMarkers, EntityMarkers},
deferred_entity::DeferredEntity,
replication_registry::{
ctx::{DespawnCtx, RemoveCtx, WriteCtx},
ReplicationRegistry,
},
track_mutate_messages::TrackMutateMessages,
update_message_flags::UpdateMessageFlags,
Replicated,
},
replicon_client::RepliconClient,
Expand All @@ -41,7 +41,7 @@ impl Plugin for ClientPlugin {
fn build(&self, app: &mut App) {
app.init_resource::<RepliconClient>()
.init_resource::<ServerEntityMap>()
.init_resource::<ServerChangeTick>()
.init_resource::<ServerUpdateTick>()
.init_resource::<BufferedMutations>()
.add_event::<EntityReplicated>()
.add_event::<MutateTickReceived>()
Expand Down Expand Up @@ -87,12 +87,12 @@ impl ClientPlugin {

/// Receives and applies replication messages from the server.
///
/// Change messages are sent over the [`ReplicationChannel::Changes`] and are applied first to ensure valid state
/// Update messages are sent over the [`ReplicationChannel::Updates`] and are applied first to ensure valid state
/// for component mutations.
///
/// Mutate messages are sent over [`ReplicationChannel::Mutations`], which means they may appear
/// ahead-of or behind change messages from the same server tick. A mutation will only be applied if its
/// change tick has already appeared in an change message, otherwise it will be buffered while waiting.
/// ahead-of or behind update messages from the same server tick. A mutation will only be applied if its
/// update tick has already appeared in an update message, otherwise it will be buffered while waiting.
/// Since component mutations can arrive in any order, they will only be applied if they correspond to a more
/// recent server tick than the last acked server tick for each entity.
///
Expand Down Expand Up @@ -153,12 +153,12 @@ impl ClientPlugin {
}

fn reset(
mut change_tick: ResMut<ServerChangeTick>,
mut update_tick: ResMut<ServerUpdateTick>,
mut entity_map: ResMut<ServerEntityMap>,
mut buffered_mutations: ResMut<BufferedMutations>,
stats: Option<ResMut<ClientReplicationStats>>,
) {
*change_tick = Default::default();
*update_tick = Default::default();
entity_map.clear();
buffered_mutations.clear();
if let Some(mut stats) = stats {
Expand All @@ -176,33 +176,33 @@ fn apply_replication(
client: &mut RepliconClient,
buffered_mutations: &mut BufferedMutations,
) -> bincode::Result<()> {
for message in client.receive(ReplicationChannel::Changes) {
apply_change_message(world, params, &message)?;
for message in client.receive(ReplicationChannel::Updates) {
apply_update_message(world, params, &message)?;
}

// Unlike change messages, we read all mutate messages first, sort them by tick
// Unlike update messages, we read all mutate messages first, sort them by tick
// in descending order to ensure that the last mutation will be applied first.
// Since mutate messages manually split by packet size, we apply all messages,
// but skip outdated data per-entity by checking last received tick for it
// (unless user requested history via marker).
let change_tick = *world.resource::<ServerChangeTick>();
let update_tick = *world.resource::<ServerUpdateTick>();
let acks_size = mem::size_of::<u16>() * client.received_count(ReplicationChannel::Mutations);
if acks_size != 0 {
let mut acks = Vec::with_capacity(acks_size);
for message in client.receive(ReplicationChannel::Mutations) {
let mutate_index = buffer_mutate_message(params, buffered_mutations, message)?;
bincode::serialize_into(&mut acks, &mutate_index)?;
}
client.send(ReplicationChannel::Changes, acks);
client.send(ReplicationChannel::Updates, acks);
}

apply_mutate_messages(world, params, buffered_mutations, change_tick)
apply_mutate_messages(world, params, buffered_mutations, update_tick)
}

/// Reads and applies a change message.
/// Reads and applies an update message.
///
/// For details see [`replication_messages`](crate::server::replication_messages).
fn apply_change_message(
fn apply_update_message(
world: &mut World,
params: &mut ReceiveParams,
message: &[u8],
Expand All @@ -214,12 +214,12 @@ fn apply_change_message(
stats.bytes += end_pos;
}

let flags = ChangeMessageFlags::from_bits_retain(cursor.read_fixedint()?);
let flags = UpdateMessageFlags::from_bits_retain(cursor.read_fixedint()?);
debug_assert!(!flags.is_empty(), "message can't be empty");

let message_tick = bincode::deserialize_from(&mut cursor)?;
trace!("applying change message for {message_tick:?}");
world.resource_mut::<ServerChangeTick>().0 = message_tick;
trace!("applying update message for {message_tick:?}");
world.resource_mut::<ServerUpdateTick>().0 = message_tick;

let last_flag = flags.last();
for (_, flag) in flags.iter_names() {
Expand All @@ -230,7 +230,7 @@ fn apply_change_message(
};

match flag {
ChangeMessageFlags::MAPPINGS => {
UpdateMessageFlags::MAPPINGS => {
debug_assert_eq!(array_kind, ArrayKind::Sized);
let len = apply_array(array_kind, &mut cursor, |cursor| {
apply_entity_mapping(world, params, cursor)
Expand All @@ -239,23 +239,23 @@ fn apply_change_message(
stats.mappings += len;
}
}
ChangeMessageFlags::DESPAWNS => {
UpdateMessageFlags::DESPAWNS => {
let len = apply_array(array_kind, &mut cursor, |cursor| {
apply_despawn(world, params, cursor, message_tick)
})?;
if let Some(stats) = &mut params.stats {
stats.despawns += len;
}
}
ChangeMessageFlags::REMOVALS => {
UpdateMessageFlags::REMOVALS => {
let len = apply_array(array_kind, &mut cursor, |cursor| {
apply_removals(world, params, cursor, message_tick)
})?;
if let Some(stats) = &mut params.stats {
stats.entities_changed += len;
}
}
ChangeMessageFlags::CHANGES => {
UpdateMessageFlags::CHANGES => {
debug_assert_eq!(array_kind, ArrayKind::Dynamic);
let len = apply_array(array_kind, &mut cursor, |cursor| {
apply_changes(world, params, cursor, message_tick)
Expand Down Expand Up @@ -288,7 +288,7 @@ fn buffer_mutate_message(
stats.bytes += end_pos;
}

let change_tick = bincode::deserialize_from(&mut cursor)?;
let update_tick = bincode::deserialize_from(&mut cursor)?;
let message_tick = bincode::deserialize_from(&mut cursor)?;
let messages_count = if params.mutate_ticks.is_some() {
cursor.read_varint()?
Expand All @@ -298,7 +298,7 @@ fn buffer_mutate_message(
let mutate_index = cursor.read_varint()?;
trace!("received mutate message for {message_tick:?}");
buffered_mutations.insert(BufferedMutate {
change_tick,
update_tick,
message_tick,
messages_count,
message: message.slice(cursor.position() as usize..),
Expand All @@ -309,17 +309,17 @@ fn buffer_mutate_message(

/// Applies mutations from [`BufferedMutations`].
///
/// If the mutate message can't be applied yet (because the change message with the
/// If the mutate message can't be applied yet (because the update message with the
/// corresponding tick hasn't arrived), it will be kept in the buffer.
fn apply_mutate_messages(
world: &mut World,
params: &mut ReceiveParams,
buffered_mutations: &mut BufferedMutations,
change_tick: ServerChangeTick,
update_tick: ServerUpdateTick,
) -> bincode::Result<()> {
let mut result = Ok(());
buffered_mutations.0.retain(|mutate| {
if mutate.change_tick > *change_tick {
if mutate.update_tick > *update_tick {
return true;
}

Expand Down Expand Up @@ -374,7 +374,7 @@ fn apply_entity_mapping(
Ok(())
}

/// Deserializes and applies entity despawn from change message.
/// Deserializes and applies entity despawn from update message.
fn apply_despawn(
world: &mut World,
params: &mut ReceiveParams,
Expand Down Expand Up @@ -567,7 +567,7 @@ fn apply_mutations(
let data_size: usize = cursor.read_varint()?;

let Some(client_entity) = params.entity_map.get_by_server(server_entity) else {
// Mutation could arrive after a despawn from change message.
// Mutation could arrive after a despawn from update message.
debug!("ignoring mutations received for unknown server's {server_entity:?}");
cursor.set_position(cursor.position() + data_size as u64);
return Ok(());
Expand Down Expand Up @@ -746,16 +746,16 @@ pub enum ClientSet {
Reset,
}

/// Last received tick for change messages from the server.
/// Last received tick for update messages from the server.
///
/// In other words, the last [`RepliconTick`] with a removal, insertion, spawn or despawn.
/// This value is not updated when mutation messages are received from the server.
///
/// See also [`ServerMutateTicks`].
#[derive(Clone, Copy, Debug, Default, Deref, Resource)]
pub struct ServerChangeTick(RepliconTick);
pub struct ServerUpdateTick(RepliconTick);

/// Cached buffered mutate messages, used to synchronize mutations with change messages.
/// Cached buffered mutate messages, used to synchronize mutations with update messages.
///
/// If [`ClientSet::Reset`] is disabled, then this needs to be cleaned up manually with [`Self::clear`].
#[derive(Default, Resource)]
Expand All @@ -775,12 +775,12 @@ impl BufferedMutations {
}
}

/// Partially-deserialized mutate message that is waiting for its tick to appear in an change message.
/// Partially-deserialized mutate message that is waiting for its tick to appear in an update message.
///
/// See also [`crate::server::replication_messages`].
pub(super) struct BufferedMutate {
/// Required tick to wait for.
change_tick: RepliconTick,
update_tick: RepliconTick,

/// The tick this mutations corresponds to.
message_tick: RepliconTick,
Expand Down
4 changes: 2 additions & 2 deletions src/client/confirm_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ impl ConfirmHistory {
}
}

/// Triggered for an entity when it receives changes for a tick.
/// Triggered for an entity when it receives updates for a tick.
///
/// See also [`ConfirmHistory`].
#[derive(Debug, Event, Clone, Copy)]
pub struct EntityReplicated {
/// Entity that received changes.
/// Entity that received an update.
pub entity: Entity,

/// Message tick.
Expand Down
6 changes: 3 additions & 3 deletions src/client/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{ClientPlugin, ClientSet, ServerChangeTick};
use super::{ClientPlugin, ClientSet, ServerUpdateTick};
use crate::core::{
common_conditions::*,
event_registry::{
Expand Down Expand Up @@ -85,7 +85,7 @@ impl ClientEventsPlugin {
world.resource_scope(|world, registry: Mut<AppTypeRegistry>| {
world.resource_scope(|world, entity_map: Mut<ServerEntityMap>| {
world.resource_scope(|world, event_registry: Mut<EventRegistry>| {
let change_tick = **world.resource::<ServerChangeTick>();
let update_tick = **world.resource::<ServerUpdateTick>();
let mut ctx = ClientReceiveCtx {
registry: &registry.read(),
entity_map: &entity_map,
Expand All @@ -112,7 +112,7 @@ impl ClientEventsPlugin {
events.into_inner(),
queue.into_inner(),
&mut client,
change_tick,
update_tick,
)
};
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/server_mutate_ticks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::core::replicon_tick::RepliconTick;
/// [`TrackAppExt::track_mutate_messages`](crate::core::replication::track_mutate_messages::TrackAppExt::track_mutate_messages)
/// were called.
///
/// See also [`MutateTickReceived`] and [`ServerChangeTick`](super::ServerChangeTick).
/// See also [`MutateTickReceived`] and [`ServerUpdateTick`](super::ServerUpdateTick).
#[derive(Debug, Resource)]
pub struct ServerMutateTicks {
ticks: VecDeque<TickMessages>,
Expand Down
8 changes: 4 additions & 4 deletions src/core/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub enum ReplicationChannel {
/// For sending messages with entity mappings, inserts, removals and despawns.
///
/// This is an ordered reliable channel.
Changes,
Updates,
/// For sending messages with component mutations.
///
/// This is an unreliable channel.
Expand All @@ -20,7 +20,7 @@ pub enum ReplicationChannel {
impl From<ReplicationChannel> for RepliconChannel {
fn from(value: ReplicationChannel) -> Self {
match value {
ReplicationChannel::Changes => ChannelKind::Ordered.into(),
ReplicationChannel::Updates => ChannelKind::Ordered.into(),
ReplicationChannel::Mutations => ChannelKind::Unreliable.into(),
}
}
Expand Down Expand Up @@ -53,11 +53,11 @@ impl Default for RepliconChannels {
fn default() -> Self {
Self {
server: vec![
ReplicationChannel::Changes.into(),
ReplicationChannel::Updates.into(),
ReplicationChannel::Mutations.into(),
],
client: vec![
ReplicationChannel::Changes.into(),
ReplicationChannel::Updates.into(),
ReplicationChannel::Mutations.into(),
],
default_max_bytes: 5 * 1024 * 1024,
Expand Down
Loading