From 01b12ece2241ae929b8615699c1ec1e3fdc210f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Mon, 9 Dec 2024 17:11:47 +0100 Subject: [PATCH 1/2] Document task states and state transitions --- embassy-executor/src/raw/mod.rs | 64 +++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index ebabee1ba2..7ae150bef7 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -41,6 +41,62 @@ pub use self::waker::task_from_waker; use super::SpawnToken; /// Raw task header for use in task pointers. +/// +/// A task can be in one of the following states: +/// +/// - Not spawned: the task is ready to spawn. +/// - `SPAWNED`: the task is currently spawned and may be running. +/// - `RUN_ENQUEUED`: the task is enqueued to be polled. Note that the task may be `!SPAWNED`. +/// In this case, the `RUN_ENQUEUED` state will be cleared when the task is next polled, without +/// polling the task's future. +/// - `TIMER_ENQUEUED`: the task is currently enqueued in the timer queue. When its expiration is +/// due, the task will be enqueued in the run queue. +/// +/// A task's complete life cycle is as follows: +/// +/// ```text +/// ┌────────────────────────────────────────────────────────┐ +/// 10 ┌──────────────────────────────────────────────────11┐ │ +/// ┌───┴─▼──────┐ ┌────────────────────────┐ ┌────────────┴─▼───────────┐ +/// ┌─►│Not spawned │◄13┤Not spawned|Run enqueued│ │Not spawned|Timer enqueued│ +/// │ │ │ │ │ │ │ +/// │ └─────┬──────┘ └──────▲────┬───▲────────┘ └──────────────────────────┘ +/// │ 1 │ 14 │ +/// │ │ │ │ │ +/// │ │ │ │ │ ┌───────────────────────────────────────┐ +/// │ │ │ │ └15┤Not spawned|Run enqueued|Timer enqueued│ +/// │ │ ┌────────────┘ └─────►│ │ +/// │ │ │ └───────────────────────────────────────┘ +/// │ │ │ +/// │ │ 12 +/// │ ┌─────▼────┴─────────┐ ┌───────────────────────────────────┐ +/// │ │Spawned|Run enqueued├7────────────────►│Spawned|Run enqueued|Timer enqueued│ +/// │ │ │◄────────────────8┤ │ +/// │ └─────┬▲───▲─────────┘ └───────────────────▲───────────────┘ +/// │ 2│ └─────────────────────────────────────┐ │ +/// │ │3 6 9 +/// │ ┌─────▼┴─────┐ ┌┴──────────┴──────────┐ +/// └─4┤ Spawned ├5────────────────────────────────►│Spawned|Timer enqueued│ +/// │ │ │ │ +/// └────────────┘ └──────────────────────┘ +/// ``` +/// +/// Transitions: +/// - 1: Task is claimed for spawning - `AvailableTask::claim -> Executor::spawn` +/// - 2: During poll - `RunQueue::dequeue_all -> State::run_dequeue` +/// - 3: Task wakes itself, waker wakes task - `Waker::wake -> wake_task -> State::run_enqueue` +/// - 4: Task exits - `TaskStorage::poll -> Poll::Ready` +/// - 5: Task schedules itself - `TimerQueue::schedule_wake -> TimerQueue::update` +/// - 6: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend -> State::run_enqueue` +/// - 7: `schedule_wake -> State::update` enqueues the task in the timer queue. +/// - 8: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend` -> task is already RUN_ENQUEUED. +/// - 9: A waker wakes a task that is in the timer queue. `Waker::wake -> wake_task -> State::run_enqueue` +/// - 10: A race condition happens: A task exits, then a different thread calls its `schedule_wake`, then the task calls its `TimerQueue::update` +/// - 11: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend -> task not SPAWNED` +/// - 12: A run-queued task exits - `TaskStorage::poll -> Poll::Ready` +/// - 13: During poll - `State::run_dequeue`, task is then ignored. +/// - 14: A race condition happens between the task clearing its `expires_at` value and another thread calling `schedule_wake`. +/// - 15: Timer queue is processed - `TimerQueue::dequeue_expired -> wake_task_no_pend` -> task is already RUN_ENQUEUED. pub(crate) struct TaskHeader { pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, @@ -162,6 +218,7 @@ impl TaskStorage { this.raw.state.despawn(); #[cfg(feature = "integrated-timers")] + // FIXME: There is a data race between `TimerQueue::schedule_wake` and this line. this.raw.expires_at.set(u64::MAX); } Poll::Pending => {} @@ -389,6 +446,7 @@ impl SyncExecutor { let task = p.header(); #[cfg(feature = "integrated-timers")] + // FIXME: There is a data race between `TimerQueue::schedule_wake` and this line. task.expires_at.set(u64::MAX); if !task.state.run_dequeue() { @@ -555,6 +613,9 @@ pub fn wake_task(task: TaskRef) { let header = task.header(); if header.state.run_enqueue() { // We have just marked the task as scheduled, so enqueue it. + // FIXME: there is currently a data race between re-spawning a task and waking it using an + // old waker. If the task is being spawned on a different executor, then reading and writing + // the executor field may happen concurrently. unsafe { let executor = header.executor.get().unwrap_unchecked(); executor.enqueue(task); @@ -569,6 +630,9 @@ pub fn wake_task_no_pend(task: TaskRef) { let header = task.header(); if header.state.run_enqueue() { // We have just marked the task as scheduled, so enqueue it. + // FIXME: there is currently a data race between re-spawning a task and waking it using an + // old waker. If the task is being spawned on a different executor, then reading and writing + // the executor field may happen concurrently. unsafe { let executor = header.executor.get().unwrap_unchecked(); executor.run_queue.enqueue(task); From 47d963cbbc07a6c5c7b5a2541ea3927ca209d4bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1niel=20Buga?= Date: Tue, 10 Dec 2024 16:11:10 +0100 Subject: [PATCH 2/2] Disallow respawning on different executor --- embassy-executor/CHANGELOG.md | 2 ++ embassy-executor/src/raw/mod.rs | 24 ++++++++++++++---------- embassy-executor/src/spawner.rs | 17 +++++++++-------- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/embassy-executor/CHANGELOG.md b/embassy-executor/CHANGELOG.md index 00b1bef288..57eb7f2368 100644 --- a/embassy-executor/CHANGELOG.md +++ b/embassy-executor/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `integrated-timers` disallows respawning a task on an other executor. +- `Executor::spawn` is now fallible. - `raw::Executor` now has an `fn initialize` that must be called once before starting to poll it. ## 0.6.3 - 2024-11-12 diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 7ae150bef7..d87d27f0b9 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -38,7 +38,7 @@ use self::run_queue::{RunQueue, RunQueueItem}; use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; -use super::SpawnToken; +use super::{SpawnError, SpawnToken}; /// Raw task header for use in task pointers. /// @@ -423,13 +423,23 @@ impl SyncExecutor { this.pender.pend(); } - pub(super) unsafe fn spawn(&'static self, task: TaskRef) { - task.header().executor.set(Some(self)); + pub(super) unsafe fn spawn(&'static self, task: TaskRef) -> Result<(), SpawnError> { + let executor = &task.header().executor; + + #[cfg(feature = "integrated-timers")] + if let Some(executor) = executor.get() { + if core::ptr::from_ref(executor) != self { + return Err(SpawnError::BoundToDifferentExecutor); + } + } + + executor.set(Some(self)); #[cfg(feature = "rtos-trace")] trace::task_new(task.as_ptr() as u32); self.enqueue(task); + Ok(()) } /// # Safety @@ -570,7 +580,7 @@ impl Executor { /// It is OK to use `unsafe` to call this from a thread that's not the executor thread. /// In this case, the task's Future must be Send. This is because this is effectively /// sending the task to the executor thread. - pub(super) unsafe fn spawn(&'static self, task: TaskRef) { + pub(super) unsafe fn spawn(&'static self, task: TaskRef) -> Result<(), SpawnError> { self.inner.spawn(task) } @@ -613,9 +623,6 @@ pub fn wake_task(task: TaskRef) { let header = task.header(); if header.state.run_enqueue() { // We have just marked the task as scheduled, so enqueue it. - // FIXME: there is currently a data race between re-spawning a task and waking it using an - // old waker. If the task is being spawned on a different executor, then reading and writing - // the executor field may happen concurrently. unsafe { let executor = header.executor.get().unwrap_unchecked(); executor.enqueue(task); @@ -630,9 +637,6 @@ pub fn wake_task_no_pend(task: TaskRef) { let header = task.header(); if header.state.run_enqueue() { // We have just marked the task as scheduled, so enqueue it. - // FIXME: there is currently a data race between re-spawning a task and waking it using an - // old waker. If the task is being spawned on a different executor, then reading and writing - // the executor field may happen concurrently. unsafe { let executor = header.executor.get().unwrap_unchecked(); executor.run_queue.enqueue(task); diff --git a/embassy-executor/src/spawner.rs b/embassy-executor/src/spawner.rs index 2716062445..e82c095abd 100644 --- a/embassy-executor/src/spawner.rs +++ b/embassy-executor/src/spawner.rs @@ -59,6 +59,13 @@ pub enum SpawnError { /// running at a time. You may allow multiple instances to run in parallel with /// `#[embassy_executor::task(pool_size = 4)]`, at the cost of higher RAM usage. Busy, + + /// The task has already been assigned to an executor and can't be moved. + /// + /// When integrated timers are enabled, tasks must not move between executors. If you need + /// to move a task between executors + #[cfg(feature = "integrated-timers")] + BoundToDifferentExecutor, } /// Handle to spawn tasks into an executor. @@ -107,10 +114,7 @@ impl Spawner { mem::forget(token); match task { - Some(task) => { - unsafe { self.executor.spawn(task) }; - Ok(()) - } + Some(task) => unsafe { self.executor.spawn(task) }, None => Err(SpawnError::Busy), } } @@ -178,10 +182,7 @@ impl SendSpawner { mem::forget(token); match header { - Some(header) => { - unsafe { self.executor.spawn(header) }; - Ok(()) - } + Some(header) => unsafe { self.executor.spawn(header) }, None => Err(SpawnError::Busy), } }