Skip to content

Commit

Permalink
Dequeue in alarm
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Nov 28, 2024
1 parent 37111a8 commit db1b584
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 40 deletions.
40 changes: 28 additions & 12 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ use core::pin::Pin;
use core::ptr::NonNull;
use core::task::{Context, Poll};

#[cfg(feature = "integrated-timers")]
use core::cell::Cell;
#[cfg(feature = "integrated-timers")]
use critical_section::Mutex;
#[cfg(feature = "integrated-timers")]
use embassy_time_driver::AlarmHandle;
#[cfg(feature = "rtos-trace")]
Expand All @@ -48,7 +52,7 @@ pub(crate) struct TaskHeader {
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,

#[cfg(feature = "integrated-timers")]
pub(crate) expires_at: SyncUnsafeCell<u64>,
pub(crate) expires_at: Mutex<Cell<u64>>,
#[cfg(feature = "integrated-timers")]
pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
}
Expand Down Expand Up @@ -121,7 +125,7 @@ impl<F: Future + 'static> TaskStorage<F> {
poll_fn: SyncUnsafeCell::new(None),

#[cfg(feature = "integrated-timers")]
expires_at: SyncUnsafeCell::new(0),
expires_at: Mutex::new(Cell::new(0)),
#[cfg(feature = "integrated-timers")]
timer_queue_item: timer_queue::TimerQueueItem::new(),
},
Expand Down Expand Up @@ -162,7 +166,9 @@ impl<F: Future + 'static> TaskStorage<F> {
this.raw.state.despawn();

#[cfg(feature = "integrated-timers")]
this.raw.expires_at.set(u64::MAX);
critical_section::with(|cs| {
this.raw.expires_at.borrow(cs).set(u64::MAX);
});
}
Poll::Pending => {}
}
Expand Down Expand Up @@ -363,6 +369,12 @@ impl SyncExecutor {
#[cfg(feature = "integrated-timers")]
fn alarm_callback(ctx: *mut ()) {
let this: &Self = unsafe { &*(ctx as *const Self) };

unsafe {
this.timer_queue
.dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
}

this.pender.pend();
}

Expand All @@ -379,17 +391,16 @@ impl SyncExecutor {
///
/// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
pub(crate) unsafe fn poll(&'static self) {
//trace!("poll");
#[allow(clippy::never_loop)]
loop {
#[cfg(feature = "integrated-timers")]
self.timer_queue
.dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);

self.run_queue.dequeue_all(|p| {
let task = p.header();

#[cfg(feature = "integrated-timers")]
task.expires_at.set(u64::MAX);
critical_section::with(|cs| {
task.expires_at.borrow(cs).set(u64::MAX);
});

if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
Expand Down Expand Up @@ -421,6 +432,11 @@ impl SyncExecutor {
let next_expiration = self.timer_queue.next_expiration();
if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
break;
} else {
// Time driver did not schedule the alarm,
// so we need to dequeue expired timers manually.
self.timer_queue
.dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
}
}

Expand Down Expand Up @@ -584,10 +600,10 @@ impl embassy_time_queue_driver::TimerQueue for TimerQueue {
fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
let task = waker::task_from_waker(waker);
let task = task.header();
unsafe {
let expires_at = task.expires_at.get();
task.expires_at.set(expires_at.min(at));
}
critical_section::with(|cs| {
let expires_at = task.expires_at.borrow(cs).get();
task.expires_at.borrow(cs).set(expires_at.min(at));
});
}
}

Expand Down
61 changes: 33 additions & 28 deletions embassy-executor/src/raw/timer_queue.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,79 @@
use core::cell::Cell;
use core::cmp::min;

use super::TaskRef;
use crate::raw::util::SyncUnsafeCell;
use critical_section::{CriticalSection, Mutex};

pub(crate) struct TimerQueueItem {
next: SyncUnsafeCell<Option<TaskRef>>,
pub(super) next: Mutex<Cell<Option<TaskRef>>>,
}

impl TimerQueueItem {
pub const fn new() -> Self {
Self {
next: SyncUnsafeCell::new(None),
next: Mutex::new(Cell::new(None)),
}
}
}

pub(crate) struct TimerQueue {
head: SyncUnsafeCell<Option<TaskRef>>,
pub(super) head: Mutex<Cell<Option<TaskRef>>>,
}

impl TimerQueue {
pub const fn new() -> Self {
Self {
head: SyncUnsafeCell::new(None),
head: Mutex::new(Cell::new(None)),
}
}

pub(crate) unsafe fn update(&self, p: TaskRef) {
let task = p.header();
if task.expires_at.get() != u64::MAX {
if task.state.timer_enqueue() {
task.timer_queue_item.next.set(self.head.get());
self.head.set(Some(p));
critical_section::with(|cs| {
if task.expires_at.borrow(cs).get() != u64::MAX {
if task.state.timer_enqueue() {
let prev = self.head.borrow(cs).replace(Some(p));
task.timer_queue_item.next.borrow(cs).set(prev);
}
}
}
});
}

pub(crate) unsafe fn next_expiration(&self) -> u64 {
let mut res = u64::MAX;
self.retain(|p| {
let task = p.header();
let expires = task.expires_at.get();
res = min(res, expires);
expires != u64::MAX
critical_section::with(|cs| {
self.retain(cs, |p| {
let task = p.header();
let expires = task.expires_at.borrow(cs).get();
res = min(res, expires);
expires != u64::MAX
});
});
res
}

pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: impl Fn(TaskRef)) {
self.retain(|p| {
let task = p.header();
if task.expires_at.get() <= now {
on_task(p);
false
} else {
true
}
critical_section::with(|cs| {
self.retain(cs, |p| {
let task = p.header();
if task.expires_at.borrow(cs).get() <= now {
on_task(p);
false
} else {
true
}
});
});
}

pub(crate) unsafe fn retain(&self, mut f: impl FnMut(TaskRef) -> bool) {
unsafe fn retain(&self, cs: CriticalSection<'_>, mut f: impl FnMut(TaskRef) -> bool) {
let mut prev = &self.head;
while let Some(p) = prev.get() {
while let Some(p) = prev.borrow(cs).get() {
let task = p.header();
if f(p) {
// Skip to next
prev = &task.timer_queue_item.next;
} else {
// Remove it
prev.set(task.timer_queue_item.next.get());
prev.borrow(cs).set(task.timer_queue_item.next.borrow(cs).get());
task.state.timer_dequeue();
}
}
Expand Down

0 comments on commit db1b584

Please sign in to comment.