diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 27ab9a7d42..08be0afe2b 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -346,7 +346,7 @@ impl SyncExecutor { pender, #[cfg(feature = "integrated-timers")] - timer_queue: timer_queue::TimerQueue::new(), + timer_queue: timer_queue::TimerQueue::new(alarm), #[cfg(feature = "integrated-timers")] alarm, } @@ -398,55 +398,27 @@ impl SyncExecutor { /// /// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created. pub(crate) unsafe fn poll(&'static self) { - #[allow(clippy::never_loop)] - loop { - self.run_queue.dequeue_all(|p| { - let task = p.header(); - - #[cfg(feature = "integrated-timers")] - task.next_expiration.set(u64::MAX); - - if !task.state.run_dequeue() { - // If task is not running, ignore it. This can happen in the following scenario: - // - Task gets dequeued, poll starts - // - While task is being polled, it gets woken. It gets placed in the queue. - // - Task poll finishes, returning done=true - // - RUNNING bit is cleared, but the task is already in the queue. - #[cfg(feature = "integrated-timers")] - self.timer_queue.notify_task_exited(p); - return; - } - - #[cfg(feature = "rtos-trace")] - trace::task_exec_begin(p.as_ptr() as u32); - - // Run the task - task.poll_fn.get().unwrap_unchecked()(p); + self.run_queue.dequeue_all(|p| { + let task = p.header(); + + if !task.state.run_dequeue() { + // If task is not running, ignore it. This can happen in the following scenario: + // - Task gets dequeued, poll starts + // - While task is being polled, it gets woken. It gets placed in the queue. + // - Task poll finishes, returning done=true + // - RUNNING bit is cleared, but the task is already in the queue. + return; + } - #[cfg(feature = "rtos-trace")] - trace::task_exec_end(); - }); + #[cfg(feature = "rtos-trace")] + trace::task_exec_begin(p.as_ptr() as u32); - #[cfg(feature = "integrated-timers")] - { - // If this is already in the past, set_alarm might return false - // In that case do another poll loop iteration. - let next_expiration = self.timer_queue.next_expiration(); - if embassy_time_driver::set_alarm(self.alarm, next_expiration) { - break; - } else { - // Time driver did not schedule the alarm, - // so we need to dequeue expired timers manually. - self.timer_queue - .dequeue_expired(embassy_time_driver::now(), wake_task_no_pend); - } - } + // Run the task + task.poll_fn.get().unwrap_unchecked()(p); - #[cfg(not(feature = "integrated-timers"))] - { - break; - } - } + #[cfg(feature = "rtos-trace")] + trace::task_exec_end(); + }); #[cfg(feature = "rtos-trace")] trace::system_idle(); diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index 0bf7321ea3..7b954d86c9 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -3,7 +3,7 @@ use core::cmp::min; use critical_section::{CriticalSection, Mutex}; -use super::TaskRef; +use super::{AlarmHandle, TaskRef}; pub(crate) struct TimerQueueItem { next: Mutex>>, @@ -19,14 +19,14 @@ impl TimerQueueItem { pub(crate) struct TimerQueue { head: Mutex>>, - rescan: Mutex>, + alarm: AlarmHandle, } impl TimerQueue { - pub const fn new() -> Self { + pub const fn new(alarm: AlarmHandle) -> Self { Self { head: Mutex::new(Cell::new(None)), - rescan: Mutex::new(Cell::new(true)), + alarm, } } @@ -34,7 +34,7 @@ impl TimerQueue { let task = p.header(); task.next_expiration.set(u64::MAX); critical_section::with(|cs| { - self.rescan.borrow(cs).set(true); + self.dispatch(cs, super::wake_task); }); } @@ -43,48 +43,34 @@ impl TimerQueue { if at < task.next_expiration.get() { task.next_expiration.set(at); critical_section::with(|cs| { - self.rescan.borrow(cs).set(true); if task.state.timer_enqueue() { let prev = self.head.borrow(cs).replace(Some(p)); task.timer_queue_item.next.borrow(cs).set(prev); } + self.dispatch(cs, super::wake_task); }); } } - pub(crate) unsafe fn next_expiration(&self) -> u64 { - let mut res = u64::MAX; - critical_section::with(|cs| { - let rescan = self.rescan.borrow(cs).replace(false); - if !rescan { - return; + unsafe fn dequeue_expired_internal(&self, now: u64, cs: CriticalSection<'_>, on_task: fn(TaskRef)) -> bool { + let mut changed = false; + self.retain(cs, |p| { + let task = p.header(); + if task.expires_at.borrow(cs).get() <= now { + on_task(p); + changed = true; + false + } else { + true } - self.retain(cs, |p| { - let task = p.header(); - let expires = task.next_expiration.get(); - task.expires_at.borrow(cs).set(expires); - res = min(res, expires); - expires != u64::MAX - }); }); - res + changed } - pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) { + pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: fn(TaskRef)) { critical_section::with(|cs| { - let mut changed = false; - self.retain(cs, |p| { - let task = p.header(); - if task.expires_at.borrow(cs).get() <= now { - on_task(p); - changed = true; - false - } else { - true - } - }); - if changed { - self.rescan.borrow(cs).set(true); + if self.dequeue_expired_internal(now, cs, on_task) { + self.dispatch(cs, on_task); } }); } @@ -101,4 +87,29 @@ impl TimerQueue { } } } + + unsafe fn next_expiration(&self, cs: CriticalSection<'_>) -> u64 { + let mut res = u64::MAX; + + self.retain(cs, |p| { + let task = p.header(); + let expires = task.next_expiration.get(); + task.expires_at.borrow(cs).set(expires); + res = min(res, expires); + expires != u64::MAX + }); + + res + } + + unsafe fn dispatch(&self, cs: CriticalSection<'_>, cb: fn(TaskRef)) { + // If this is already in the past, set_alarm might return false + // In that case do another poll loop iteration. + let next_expiration = self.next_expiration(cs); + if !embassy_time_driver::set_alarm(self.alarm, next_expiration) { + // Time driver did not schedule the alarm, + // so we need to dequeue expired timers manually. + self.dequeue_expired_internal(embassy_time_driver::now(), cs, cb); + } + } }