Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow respawning task on another executor #3633

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions embassy-executor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- `integrated-timers` disallows respawning a task on an other executor.
- `Executor::spawn` is now fallible.
- `raw::Executor` now has an `fn initialize` that must be called once before starting to poll it.

## 0.6.3 - 2024-11-12
Expand Down
76 changes: 72 additions & 4 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,65 @@ use self::run_queue::{RunQueue, RunQueueItem};
use self::state::State;
use self::util::{SyncUnsafeCell, UninitCell};
pub use self::waker::task_from_waker;
use super::SpawnToken;
use super::{SpawnError, SpawnToken};

/// Raw task header for use in task pointers.
///
/// A task can be in one of the following states:
///
/// - Not spawned: the task is ready to spawn.
/// - `SPAWNED`: the task is currently spawned and may be running.
/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`.
/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without
/// polling the task's future.
/// - `TIMER_ENQUEUED`: the task is currently enqueued in the timer queue. When its expiration is
/// due, the task will be enqueued in the run queue.
///
/// A task's complete life cycle is as follows:
///
/// ```text
/// ┌────────────────────────────────────────────────────────┐
/// 10 ┌──────────────────────────────────────────────────11┐ │
/// ┌───┴─▼──────┐ ┌────────────────────────┐ ┌────────────┴─▼───────────┐
/// ┌─►│Not spawned │◄13┤Not spawned|Run enqueued│ │Not spawned|Timer enqueued│
/// │ │ │ │ │ │ │
/// │ └─────┬──────┘ └──────▲────┬───▲────────┘ └──────────────────────────┘
/// │ 1 │ 14 │
/// │ │ │ │ │
/// │ │ │ │ │ ┌───────────────────────────────────────┐
/// │ │ │ │ └15┤Not spawned|Run enqueued|Timer enqueued│
/// │ │ ┌────────────┘ └─────►│ │
/// │ │ │ └───────────────────────────────────────┘
/// │ │ │
/// │ │ 12
/// │ ┌─────▼────┴─────────┐ ┌───────────────────────────────────┐
/// │ │Spawned|Run enqueued├7────────────────►│Spawned|Run enqueued|Timer enqueued│
/// │ │ │◄────────────────8┤ │
/// │ └─────┬▲───▲─────────┘ └───────────────────▲───────────────┘
/// │ 2│ └─────────────────────────────────────┐ │
/// │ │3 6 9
/// │ ┌─────▼┴─────┐ ┌┴──────────┴──────────┐
/// └─4┤ Spawned ├5────────────────────────────────►│Spawned|Timer enqueued│
/// │ │ │ │
/// └────────────┘ └──────────────────────┘
/// ```
///
/// Transitions:
/// - 1: Task is claimed for spawning - `AvailableTask::claim -> Executor::spawn`
/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue`
/// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue`
/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready`
/// - 5: Task schedules itself - `TimerQueue::schedule_wake -> TimerQueue::update`
/// - 6: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend -> State::run_enqueue`
/// - 7: `schedule_wake -> State::update` enqueues the task in the timer queue.
/// - 8: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend` -> task is already RUN_ENQUEUED.
/// - 9: A waker wakes a task that is in the timer queue. `Waker::wake -> wake_task -> State::run_enqueue`
/// - 10: A race condition happens: A task exits, then a different thread calls its `schedule_wake`, then the task calls its `TimerQueue::update`
/// - 11: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend -> task not SPAWNED`
/// - 12: A run-queued task exits - `TaskStorage::poll -> Poll::Ready`
/// - 13: During poll - `State::run_dequeue`, task is then ignored.
/// - 14: A race condition happens between the task clearing its `expires_at` value and another thread calling `schedule_wake`.
/// - 15: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend` -> task is already RUN_ENQUEUED.
pub(crate) struct TaskHeader {
pub(crate) state: State,
pub(crate) run_queue_item: RunQueueItem,
Expand Down Expand Up @@ -162,6 +218,7 @@ impl<F: Future + 'static> TaskStorage<F> {
this.raw.state.despawn();

#[cfg(feature = "integrated-timers")]
// FIXME: There is a data race between `TimerQueue::schedule_wake` and this line.
this.raw.expires_at.set(u64::MAX);
}
Poll::Pending => {}
Expand Down Expand Up @@ -366,13 +423,23 @@ impl SyncExecutor {
this.pender.pend();
}

pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
task.header().executor.set(Some(self));
pub(super) unsafe fn spawn(&'static self, task: TaskRef) -> Result<(), SpawnError> {
let executor = &task.header().executor;

#[cfg(feature = "integrated-timers")]
if let Some(executor) = executor.get() {
if core::ptr::from_ref(executor) != self {
return Err(SpawnError::BoundToDifferentExecutor);
}
}

executor.set(Some(self));

#[cfg(feature = "rtos-trace")]
trace::task_new(task.as_ptr() as u32);

self.enqueue(task);
Ok(())
}

/// # Safety
Expand All @@ -389,6 +456,7 @@ impl SyncExecutor {
let task = p.header();

#[cfg(feature = "integrated-timers")]
// FIXME: There is a data race between `TimerQueue::schedule_wake` and this line.
task.expires_at.set(u64::MAX);

if !task.state.run_dequeue() {
Expand Down Expand Up @@ -512,7 +580,7 @@ impl Executor {
/// It is OK to use `unsafe` to call this from a thread that's not the executor thread.
/// In this case, the task's Future must be Send. This is because this is effectively
/// sending the task to the executor thread.
pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
pub(super) unsafe fn spawn(&'static self, task: TaskRef) -> Result<(), SpawnError> {
self.inner.spawn(task)
}

Expand Down
17 changes: 9 additions & 8 deletions embassy-executor/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ pub enum SpawnError {
/// running at a time. You may allow multiple instances to run in parallel with
/// `#[embassy_executor::task(pool_size = 4)]`, at the cost of higher RAM usage.
Busy,

/// The task has already been assigned to an executor and can't be moved.
///
/// When integrated timers are enabled, tasks must not move between executors. If you need
/// to move a task between executors
#[cfg(feature = "integrated-timers")]
BoundToDifferentExecutor,
}

/// Handle to spawn tasks into an executor.
Expand Down Expand Up @@ -107,10 +114,7 @@ impl Spawner {
mem::forget(token);

match task {
Some(task) => {
unsafe { self.executor.spawn(task) };
Ok(())
}
Some(task) => unsafe { self.executor.spawn(task) },
None => Err(SpawnError::Busy),
}
}
Expand Down Expand Up @@ -178,10 +182,7 @@ impl SendSpawner {
mem::forget(token);

match header {
Some(header) => {
unsafe { self.executor.spawn(header) };
Ok(())
}
Some(header) => unsafe { self.executor.spawn(header) },
None => Err(SpawnError::Busy),
}
}
Expand Down
Loading