Skip to content

Commit

Permalink
Fix bug where events are not being dropped (bevyengine#11528)
Browse files Browse the repository at this point in the history
# Objective

Fix an issue where events are not being dropped after being read. I
believe bevyengine#10077 introduced this issue. The code currently works as
follows:

1. `EventUpdateSignal` is **shared for all event types**
2. During the fixed update phase, `EventUpdateSignal` is set to true
3. `event_update_system`, **unique per event type**, runs to update
Events<T>
4. `event_update_system` reads value of `EventUpdateSignal` to check if
it should update, and then **resets** the value to false

If there are multiple event types, the first `event_update_system` run
will reset the shared `EventUpdateSignal` signal, preventing other
events from being cleared.

## Solution

I've updated the code to have separate signals per event type and added
a shared signal to notify all systems that the time plugin is installed.

## Changelog

- Fixed bug where events were not being dropped
  • Loading branch information
Dig-Doug authored and tjamaan committed Feb 6, 2024
1 parent f3caf07 commit 1b8589f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 7 deletions.
2 changes: 2 additions & 0 deletions crates/bevy_app/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ pub enum PluginsState {

// Dummy plugin used to temporary hold the place in the plugin registry
struct PlaceholderPlugin;

impl Plugin for PlaceholderPlugin {
fn build(&self, _app: &mut App) {}
}
Expand Down Expand Up @@ -505,6 +506,7 @@ impl App {
self.init_resource::<Events<T>>().add_systems(
First,
bevy_ecs::event::event_update_system::<T>
.in_set(bevy_ecs::event::EventUpdates)
.run_if(bevy_ecs::event::event_update_condition::<T>),
);
}
Expand Down
24 changes: 19 additions & 5 deletions crates/bevy_ecs/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate as bevy_ecs;
use crate::system::{Local, Res, ResMut, Resource, SystemParam};
pub use bevy_ecs_macros::Event;
use bevy_ecs_macros::SystemSet;
use bevy_utils::detailed_trace;
use std::ops::{Deref, DerefMut};
use std::{
Expand All @@ -13,6 +14,7 @@ use std::{
marker::PhantomData,
slice::Iter,
};

/// A type that can be stored in an [`Events<E>`] resource
/// You can conveniently access events using the [`EventReader`] and [`EventWriter`] system parameter.
///
Expand All @@ -33,6 +35,7 @@ pub struct EventId<E: Event> {
}

impl<E: Event> Copy for EventId<E> {}

impl<E: Event> Clone for EventId<E> {
fn clone(&self) -> Self {
*self
Expand Down Expand Up @@ -790,22 +793,33 @@ impl<'a, E: Event> ExactSizeIterator for EventIteratorWithId<'a, E> {
#[derive(Resource, Default)]
pub struct EventUpdateSignal(bool);

/// A system that queues a call to [`Events::update`].
pub fn event_queue_update_system(signal: Option<ResMut<EventUpdateSignal>>) {
#[doc(hidden)]
#[derive(SystemSet, Clone, Debug, PartialEq, Eq, Hash)]
pub struct EventUpdates;

/// Signals the [`event_update_system`] to run after `FixedUpdate` systems.
pub fn signal_event_update_system(signal: Option<ResMut<EventUpdateSignal>>) {
if let Some(mut s) = signal {
s.0 = true;
}
}

/// Resets the `EventUpdateSignal`
pub fn reset_event_update_signal_system(signal: Option<ResMut<EventUpdateSignal>>) {
if let Some(mut s) = signal {
s.0 = false;
}
}

/// A system that calls [`Events::update`].
pub fn event_update_system<T: Event>(
signal: Option<ResMut<EventUpdateSignal>>,
update_signal: Option<Res<EventUpdateSignal>>,
mut events: ResMut<Events<T>>,
) {
if let Some(mut s) = signal {
if let Some(signal) = update_signal {
// If we haven't got a signal to update the events, but we *could* get such a signal
// return early and update the events later.
if !std::mem::replace(&mut s.0, false) {
if !signal.0 {
return;
}
}
Expand Down
70 changes: 68 additions & 2 deletions crates/bevy_time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod prelude {
}

use bevy_app::{prelude::*, RunFixedMainLoop};
use bevy_ecs::event::{event_queue_update_system, EventUpdateSignal};
use bevy_ecs::event::{signal_event_update_system, EventUpdateSignal, EventUpdates};
use bevy_ecs::prelude::*;
use bevy_utils::{tracing::warn, Duration, Instant};
pub use crossbeam_channel::TrySendError;
Expand Down Expand Up @@ -61,7 +61,11 @@ impl Plugin for TimePlugin {

// ensure the events are not dropped until `FixedMain` systems can observe them
app.init_resource::<EventUpdateSignal>()
.add_systems(FixedPostUpdate, event_queue_update_system);
.add_systems(
First,
bevy_ecs::event::reset_event_update_signal_system.after(EventUpdates),
)
.add_systems(FixedPostUpdate, signal_event_update_system);

#[cfg(feature = "bevy_ci_testing")]
if let Some(ci_testing_config) = app
Expand Down Expand Up @@ -142,3 +146,65 @@ fn time_system(
TimeUpdateStrategy::ManualDuration(duration) => time.update_with_duration(*duration),
}
}

#[cfg(test)]
mod tests {
use crate::{Fixed, Time, TimePlugin, TimeUpdateStrategy};
use bevy_app::{App, Startup, Update};
use bevy_ecs::event::{Event, EventReader, EventWriter};
use std::error::Error;

#[derive(Event)]
struct TestEvent<T: Default> {
sender: std::sync::mpsc::Sender<T>,
}

impl<T: Default> Drop for TestEvent<T> {
fn drop(&mut self) {
self.sender
.send(T::default())
.expect("Failed to send drop signal");
}
}

#[test]
fn events_get_dropped_regression_test_11528() -> Result<(), impl Error> {
let (tx1, rx1) = std::sync::mpsc::channel();
let (tx2, rx2) = std::sync::mpsc::channel();
let mut app = App::new();
app.add_plugins(TimePlugin)
.add_event::<TestEvent<i32>>()
.add_event::<TestEvent<()>>()
.add_systems(Startup, move |mut ev2: EventWriter<TestEvent<()>>| {
ev2.send(TestEvent {
sender: tx2.clone(),
});
})
.add_systems(Update, move |mut ev1: EventWriter<TestEvent<i32>>| {
// Keep adding events so this event type is processed every update
ev1.send(TestEvent {
sender: tx1.clone(),
});
})
.add_systems(
Update,
|mut ev1: EventReader<TestEvent<i32>>, mut ev2: EventReader<TestEvent<()>>| {
// Read events so they can be dropped
for _ in ev1.read() {}
for _ in ev2.read() {}
},
)
.insert_resource(TimeUpdateStrategy::ManualDuration(
Time::<Fixed>::default().timestep(),
));

for _ in 0..10 {
app.update();
}

// Check event type 1 as been dropped at least once
let _drop_signal = rx1.try_recv()?;
// Check event type 2 has been dropped
rx2.try_recv()
}
}

0 comments on commit 1b8589f

Please sign in to comment.