Skip to content

Commit

Permalink
Do less work per poll
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Nov 28, 2024
1 parent 04c1419 commit 751ab22
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 81 deletions.
66 changes: 19 additions & 47 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl SyncExecutor {
pender,

#[cfg(feature = "integrated-timers")]
timer_queue: timer_queue::TimerQueue::new(),
timer_queue: timer_queue::TimerQueue::new(alarm),
#[cfg(feature = "integrated-timers")]
alarm,
}
Expand Down Expand Up @@ -398,55 +398,27 @@ impl SyncExecutor {
///
/// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
pub(crate) unsafe fn poll(&'static self) {
#[allow(clippy::never_loop)]
loop {
self.run_queue.dequeue_all(|p| {
let task = p.header();

#[cfg(feature = "integrated-timers")]
task.next_expiration.set(u64::MAX);

if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
#[cfg(feature = "integrated-timers")]
self.timer_queue.notify_task_exited(p);
return;
}

#[cfg(feature = "rtos-trace")]
trace::task_exec_begin(p.as_ptr() as u32);

// Run the task
task.poll_fn.get().unwrap_unchecked()(p);
self.run_queue.dequeue_all(|p| {
let task = p.header();

if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}

#[cfg(feature = "rtos-trace")]
trace::task_exec_end();
});
#[cfg(feature = "rtos-trace")]
trace::task_exec_begin(p.as_ptr() as u32);

#[cfg(feature = "integrated-timers")]
{
// If this is already in the past, set_alarm might return false
// In that case do another poll loop iteration.
let next_expiration = self.timer_queue.next_expiration();
if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
break;
} else {
// Time driver did not schedule the alarm,
// so we need to dequeue expired timers manually.
self.timer_queue
.dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
}
}
// Run the task
task.poll_fn.get().unwrap_unchecked()(p);

#[cfg(not(feature = "integrated-timers"))]
{
break;
}
}
#[cfg(feature = "rtos-trace")]
trace::task_exec_end();
});

#[cfg(feature = "rtos-trace")]
trace::system_idle();
Expand Down
79 changes: 45 additions & 34 deletions embassy-executor/src/raw/timer_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::cmp::min;

use critical_section::{CriticalSection, Mutex};

use super::TaskRef;
use super::{AlarmHandle, TaskRef};

pub(crate) struct TimerQueueItem {
next: Mutex<Cell<Option<TaskRef>>>,
Expand All @@ -19,22 +19,22 @@ impl TimerQueueItem {

pub(crate) struct TimerQueue {
head: Mutex<Cell<Option<TaskRef>>>,
rescan: Mutex<Cell<bool>>,
alarm: AlarmHandle,
}

impl TimerQueue {
pub const fn new() -> Self {
pub const fn new(alarm: AlarmHandle) -> Self {
Self {
head: Mutex::new(Cell::new(None)),
rescan: Mutex::new(Cell::new(true)),
alarm,
}
}

pub(crate) unsafe fn notify_task_exited(&self, p: TaskRef) {
let task = p.header();
task.next_expiration.set(u64::MAX);
critical_section::with(|cs| {
self.rescan.borrow(cs).set(true);
self.dispatch(cs, super::wake_task);
});
}

Expand All @@ -43,48 +43,34 @@ impl TimerQueue {
if at < task.next_expiration.get() {
task.next_expiration.set(at);
critical_section::with(|cs| {
self.rescan.borrow(cs).set(true);
if task.state.timer_enqueue() {
let prev = self.head.borrow(cs).replace(Some(p));
task.timer_queue_item.next.borrow(cs).set(prev);
}
self.dispatch(cs, super::wake_task);
});
}
}

pub(crate) unsafe fn next_expiration(&self) -> u64 {
let mut res = u64::MAX;
critical_section::with(|cs| {
let rescan = self.rescan.borrow(cs).replace(false);
if !rescan {
return;
unsafe fn dequeue_expired_internal(&self, now: u64, cs: CriticalSection<'_>, on_task: fn(TaskRef)) -> bool {
let mut changed = false;
self.retain(cs, |p| {
let task = p.header();
if task.expires_at.borrow(cs).get() <= now {
on_task(p);
changed = true;
false
} else {
true
}
self.retain(cs, |p| {
let task = p.header();
let expires = task.next_expiration.get();
task.expires_at.borrow(cs).set(expires);
res = min(res, expires);
expires != u64::MAX
});
});
res
changed
}

pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) {
pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: fn(TaskRef)) {
critical_section::with(|cs| {
let mut changed = false;
self.retain(cs, |p| {
let task = p.header();
if task.expires_at.borrow(cs).get() <= now {
on_task(p);
changed = true;
false
} else {
true
}
});
if changed {
self.rescan.borrow(cs).set(true);
if self.dequeue_expired_internal(now, cs, on_task) {
self.dispatch(cs, on_task);
}
});
}
Expand All @@ -101,4 +87,29 @@ impl TimerQueue {
}
}
}

unsafe fn next_expiration(&self, cs: CriticalSection<'_>) -> u64 {
let mut res = u64::MAX;

self.retain(cs, |p| {
let task = p.header();
let expires = task.next_expiration.get();
task.expires_at.borrow(cs).set(expires);
res = min(res, expires);
expires != u64::MAX
});

res
}

unsafe fn dispatch(&self, cs: CriticalSection<'_>, cb: fn(TaskRef)) {
// If this is already in the past, set_alarm might return false
// In that case do another poll loop iteration.
let next_expiration = self.next_expiration(cs);
if !embassy_time_driver::set_alarm(self.alarm, next_expiration) {
// Time driver did not schedule the alarm,
// so we need to dequeue expired timers manually.
self.dequeue_expired_internal(embassy_time_driver::now(), cs, cb);
}
}
}

0 comments on commit 751ab22

Please sign in to comment.