Skip to content

Commit

Permalink
Allow enabling replication manually (#316)
Browse files Browse the repository at this point in the history
Co-authored-by: Hennadii Chernyshchyk <[email protected]>
Co-authored-by: UkoeHB <[email protected]>
  • Loading branch information
3 people authored Aug 22, 2024
1 parent 5df79b0 commit 3d48f5a
Show file tree
Hide file tree
Showing 11 changed files with 485 additions and 65 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `ServerEventAppExt::make_independent` to let events be triggered without waiting for replication on the same tick.
- `ConnectedClients` (the same name as the old resource that was renamed into `ReplicatedClients`) with client IDs for all connected clients (but may not be replicated yet).
- `ServerPlugin::replicate_after_connect` to enable replication right after connection (enabled by default, same as old behavior).

### Changed

Expand Down
1 change: 1 addition & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod channels;
pub mod command_markers;
pub mod common_conditions;
pub mod connected_clients;
pub mod ctx;
pub mod event_registry;
pub mod replicated_clients;
Expand Down
29 changes: 29 additions & 0 deletions src/core/connected_clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use bevy::prelude::*;

use crate::core::ClientId;

/// Contains all connected clients.
///
/// Inserted as resource by [`ServerPlugin`](crate::server::ServerPlugin).
///
/// See also [ReplicatedClients](super::replicated_clients::ReplicatedClients).
#[derive(Resource, Default, Deref)]
pub struct ConnectedClients(Vec<ClientId>);

impl ConnectedClients {
pub(crate) fn add(&mut self, client_id: ClientId) {
debug!("adding connected `{client_id:?}`");

self.0.push(client_id);
}

pub(crate) fn remove(&mut self, client_id: ClientId) {
debug!("removing disconnected `{client_id:?}`");

let index = self
.iter()
.position(|test_id| *test_id == client_id)
.unwrap_or_else(|| panic!("{client_id:?} should be added before removal"));
self.0.remove(index);
}
}
145 changes: 113 additions & 32 deletions src/core/event_registry/server_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use serde::{de::DeserializeOwned, Serialize};
use super::EventRegistry;
use crate::core::{
channels::{RepliconChannel, RepliconChannels},
connected_clients::ConnectedClients,
ctx::{ClientReceiveCtx, ServerSendCtx},
replicated_clients::{ReplicatedClient, ReplicatedClients},
replicated_clients::ReplicatedClients,
replicon_client::RepliconClient,
replicon_server::RepliconServer,
replicon_tick::RepliconTick,
Expand Down Expand Up @@ -306,9 +307,17 @@ impl ServerEvent {
ctx: &mut ServerSendCtx,
server_events: &Ptr,
server: &mut RepliconServer,
connected_clients: &ConnectedClients,
replicated_clients: &ReplicatedClients,
) {
(self.send)(self, ctx, server_events, server, replicated_clients);
(self.send)(
self,
ctx,
server_events,
server,
connected_clients,
replicated_clients,
);
}

/// Receives an event from the server.
Expand Down Expand Up @@ -399,8 +408,14 @@ pub type SerializeFn<E> = fn(&mut ServerSendCtx, &E, &mut Cursor<Vec<u8>>) -> bi
pub type DeserializeFn<E> = fn(&mut ClientReceiveCtx, &mut Cursor<&[u8]>) -> bincode::Result<E>;

/// Signature of server event sending functions.
type SendFn =
unsafe fn(&ServerEvent, &mut ServerSendCtx, &Ptr, &mut RepliconServer, &ReplicatedClients);
type SendFn = unsafe fn(
&ServerEvent,
&mut ServerSendCtx,
&Ptr,
&mut RepliconServer,
&ConnectedClients,
&ReplicatedClients,
);

/// Signature of server event receiving functions.
type ReceiveFn = unsafe fn(
Expand Down Expand Up @@ -429,15 +444,22 @@ unsafe fn send<E: Event>(
ctx: &mut ServerSendCtx,
server_events: &Ptr,
server: &mut RepliconServer,
connected_clients: &ConnectedClients,
replicated_clients: &ReplicatedClients,
) {
let events: &Events<ToClients<E>> = server_events.deref();
// For server events we don't track read events because
// all of them will always be drained in the local resending system.
for ToClients { event, mode } in events.get_reader().read(events) {
trace!("sending event `{}` with `{mode:?}`", any::type_name::<E>());
send_with(event_data, ctx, event, mode, server, replicated_clients)
.expect("server event should be serializable");

if event_data.is_independent() {
send_independent_event(event_data, ctx, event, mode, server, connected_clients)
.expect("independent server event should be serializable");
} else {
send_event(event_data, ctx, event, mode, server, replicated_clients)
.expect("server event should be serializable");
}
}
}

Expand Down Expand Up @@ -468,21 +490,23 @@ unsafe fn receive<E: Event>(

for message in client.receive(event_data.channel_id) {
let mut cursor = Cursor::new(&*message);
let (tick, event) = deserialize_with(ctx, event_data, &mut cursor)
.expect("server should send valid events");

if event_data.is_independent() {
trace!(
"applying independent event `{}` with `{tick:?}`",
any::type_name::<E>()
);
events.send(event);
} else if tick <= init_tick {
trace!("applying event `{}` with `{tick:?}`", any::type_name::<E>());
let event = event_data
.deserialize(ctx, &mut cursor)
.expect("server should send valid events");
trace!("applying independent event `{}`", any::type_name::<E>());
events.send(event);
} else {
trace!("queuing event `{}` with `{tick:?}`", any::type_name::<E>());
queue.insert(tick, event);
let (tick, event) = deserialize_with_tick(ctx, event_data, &mut cursor)
.expect("server should send valid events");

if tick <= init_tick {
trace!("applying event `{}` with `{tick:?}`", any::type_name::<E>());
events.send(event);
} else {
trace!("queuing event `{}` with `{tick:?}`", any::type_name::<E>());
queue.insert(tick, event);
}
}
}
}
Expand Down Expand Up @@ -535,7 +559,9 @@ unsafe fn reset<E: Event>(queue: PtrMut) {
/// # Safety
///
/// The caller must ensure that `event_data` was created for `E`.
unsafe fn send_with<E: Event>(
///
/// For independent events see [`send_independent_event`].
unsafe fn send_event<E: Event>(
event_data: &ServerEvent,
ctx: &mut ServerSendCtx,
event: &E,
Expand All @@ -547,7 +573,13 @@ unsafe fn send_with<E: Event>(
SendMode::Broadcast => {
let mut previous_message = None;
for client in replicated_clients.iter() {
let message = serialize_with(event_data, ctx, event, client, previous_message)?;
let message = serialize_with_tick(
event_data,
ctx,
event,
client.init_tick(),
previous_message,
)?;
server.send(client.id(), event_data.channel_id, message.bytes.clone());
previous_message = Some(message);
}
Expand All @@ -558,15 +590,22 @@ unsafe fn send_with<E: Event>(
if client.id() == client_id {
continue;
}
let message = serialize_with(event_data, ctx, event, client, previous_message)?;
let message = serialize_with_tick(
event_data,
ctx,
event,
client.init_tick(),
previous_message,
)?;
server.send(client.id(), event_data.channel_id, message.bytes.clone());
previous_message = Some(message);
}
}
SendMode::Direct(client_id) => {
if client_id != ClientId::SERVER {
if let Some(client) = replicated_clients.get_client(client_id) {
let message = serialize_with(event_data, ctx, event, client, None)?;
let message =
serialize_with_tick(event_data, ctx, event, client.init_tick(), None)?;
server.send(client.id(), event_data.channel_id, message.bytes);
}
}
Expand All @@ -576,6 +615,48 @@ unsafe fn send_with<E: Event>(
Ok(())
}

/// Sends independent event `E` based on a mode.
///
/// # Safety
///
/// The caller must ensure that `event_data` was created for `E`.
///
/// For regular events see [`send_event`].
unsafe fn send_independent_event<E: Event>(
event_data: &ServerEvent,
ctx: &mut ServerSendCtx,
event: &E,
mode: &SendMode,
server: &mut RepliconServer,
connected_clients: &ConnectedClients,
) -> bincode::Result<()> {
let mut cursor = Default::default();
event_data.serialize(ctx, event, &mut cursor)?;
let message: Bytes = cursor.into_inner().into();

match *mode {
SendMode::Broadcast => {
for &client_id in connected_clients.iter() {
server.send(client_id, event_data.channel_id, message.clone());
}
}
SendMode::BroadcastExcept(id) => {
for &client_id in connected_clients.iter() {
if client_id != id {
server.send(client_id, event_data.channel_id, message.clone());
}
}
}
SendMode::Direct(client_id) => {
if client_id != ClientId::SERVER {
server.send(client_id, event_data.channel_id, message.clone());
}
}
}

Ok(())
}

/// Helper for serializing a server event.
///
/// Will prepend the client's change tick to the injected message.
Expand All @@ -584,36 +665,36 @@ unsafe fn send_with<E: Event>(
/// # Safety
///
/// The caller must ensure that `event_data` was created for `E`.
unsafe fn serialize_with<E: Event>(
unsafe fn serialize_with_tick<E: Event>(
event_data: &ServerEvent,
ctx: &mut ServerSendCtx,
event: &E,
client: &ReplicatedClient,
init_tick: RepliconTick,
previous_message: Option<SerializedMessage>,
) -> bincode::Result<SerializedMessage> {
if let Some(previous_message) = previous_message {
if previous_message.tick == client.init_tick() {
if previous_message.init_tick == init_tick {
return Ok(previous_message);
}

let tick_size = DefaultOptions::new().serialized_size(&client.init_tick())? as usize;
let tick_size = DefaultOptions::new().serialized_size(&init_tick)? as usize;
let mut bytes = Vec::with_capacity(tick_size + previous_message.event_bytes().len());
DefaultOptions::new().serialize_into(&mut bytes, &client.init_tick())?;
DefaultOptions::new().serialize_into(&mut bytes, &init_tick)?;
bytes.extend_from_slice(previous_message.event_bytes());
let message = SerializedMessage {
tick: client.init_tick(),
init_tick,
tick_size,
bytes: bytes.into(),
};

Ok(message)
} else {
let mut cursor = Cursor::new(Vec::new());
DefaultOptions::new().serialize_into(&mut cursor, &client.init_tick())?;
DefaultOptions::new().serialize_into(&mut cursor, &init_tick)?;
let tick_size = cursor.get_ref().len();
event_data.serialize(ctx, event, &mut cursor)?;
let message = SerializedMessage {
tick: client.init_tick(),
init_tick,
tick_size,
bytes: cursor.into_inner().into(),
};
Expand All @@ -627,7 +708,7 @@ unsafe fn serialize_with<E: Event>(
/// # Safety
///
/// The caller must ensure that `event_data` was created for `E`.
unsafe fn deserialize_with<E: Event>(
unsafe fn deserialize_with_tick<E: Event>(
ctx: &mut ClientReceiveCtx,
event_data: &ServerEvent,
cursor: &mut Cursor<&[u8]>,
Expand All @@ -640,7 +721,7 @@ unsafe fn deserialize_with<E: Event>(

/// Cached message for use in [`serialize_with`].
struct SerializedMessage {
tick: RepliconTick,
init_tick: RepliconTick,
tick_size: usize,
bytes: Bytes,
}
Expand Down
28 changes: 23 additions & 5 deletions src/core/replicated_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ use client_visibility::ClientVisibility;
/// Stores information about connected clients which are enabled for replication.
///
/// Inserted as resource by [`ServerPlugin`](crate::server::ServerPlugin).
///
/// See also [ConnectedClients](super::connected_clients::ConnectedClients).
#[derive(Resource, Default)]
pub struct ReplicatedClients {
clients: Vec<ReplicatedClient>,
policy: VisibilityPolicy,
replicate_after_connect: bool,
}

impl ReplicatedClients {
pub(crate) fn new(policy: VisibilityPolicy) -> Self {
pub(crate) fn new(policy: VisibilityPolicy, replicate_after_connect: bool) -> Self {
Self {
clients: Default::default(),
policy,
replicate_after_connect,
}
}

Expand All @@ -34,6 +38,11 @@ impl ReplicatedClients {
self.policy
}

/// Returns if clients will automatically have replication enabled for them after they connect.
pub fn replicate_after_connect(&self) -> bool {
self.replicate_after_connect
}

/// Returns a reference to a connected client.
///
/// This operation is *O*(*n*).
Expand Down Expand Up @@ -107,6 +116,11 @@ impl ReplicatedClients {
///
/// Reuses the memory from the buffers if available.
pub(crate) fn add(&mut self, client_buffers: &mut ClientBuffers, client_id: ClientId) {
if self.clients.iter().any(|client| client.id == client_id) {
warn!("ignoring attempt to start replication for `{client_id:?}` that already has replication enabled");
return;
}

debug!("starting replication for `{client_id:?}`");

let client = if let Some(mut client) = client_buffers.clients.pop() {
Expand All @@ -123,13 +137,17 @@ impl ReplicatedClients {
///
/// Keeps allocated memory in the buffers for reuse.
pub(crate) fn remove(&mut self, client_buffers: &mut ClientBuffers, client_id: ClientId) {
debug!("stopping replication for `{client_id:?}`");

let index = self
let Some(index) = self
.clients
.iter()
.position(|client| client.id == client_id)
.unwrap_or_else(|| panic!("{client_id:?} should be added before removal"));
else {
// It's valid to remove a client which is connected but not replicating yet,
// which is just a no-op.
return;
};

debug!("stopping replication for `{client_id:?}`");
let mut client = self.clients.remove(index);
client_buffers.entities.extend(client.drain_entities());
client_buffers.clients.push(client);
Expand Down
Loading

0 comments on commit 3d48f5a

Please sign in to comment.