Skip to content

Commit

Permalink
Remove need to reset expiration in every poll
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Dec 8, 2024
1 parent 9e1e8d5 commit 63eac19
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 65 deletions.
22 changes: 5 additions & 17 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,8 @@ impl<F: Future + 'static> TaskStorage<F> {

#[cfg(feature = "integrated-timers")]
critical_section::with(|cs| {
this.raw
.executor
.get()
.unwrap_unchecked()
.timer_queue
.borrow(cs)
.notify_task_exited(p);
let executor = this.raw.executor.get().unwrap_unchecked();
executor.timer_queue.borrow(cs).notify_task_exited(p);
});
}
Poll::Pending => {}
Expand Down Expand Up @@ -377,9 +372,7 @@ impl SyncExecutor {
let this: &Self = unsafe { &*(ctx as *const Self) };

critical_section::with(|cs| unsafe {
this.timer_queue
.borrow(cs)
.dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);
this.timer_queue.borrow(cs).dispatch(wake_task_no_pend);
});

this.pender.pend();
Expand Down Expand Up @@ -410,11 +403,6 @@ impl SyncExecutor {
return;
}

#[cfg(feature = "integrated-timers")]
critical_section::with(|cs| {
self.timer_queue.borrow(cs).notify_task_started(p);
});

#[cfg(feature = "rtos-trace")]
trace::task_exec_begin(p.as_ptr() as u32);

Expand Down Expand Up @@ -579,9 +567,9 @@ impl embassy_time_queue_driver::TimerQueue for TimerQueue {
fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
let task = waker::task_from_waker(waker);
unsafe {
let executor = task.header().executor.get().unwrap_unchecked();
critical_section::with(|cs| {
executor.timer_queue.borrow(cs).update(task, at);
let executor = task.header().executor.get().unwrap_unchecked();
executor.timer_queue.borrow(cs).schedule(task, at);
});
}
}
Expand Down
91 changes: 43 additions & 48 deletions embassy-executor/src/raw/timer_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,63 @@ impl TimerQueue {

pub(crate) unsafe fn notify_task_exited(&self, p: TaskRef) {
let task = p.header();

// Trigger removal from the timer queue.
task.expires_at.set(u64::MAX);
self.dispatch(super::wake_task);
}

pub(crate) unsafe fn notify_task_started(&self, p: TaskRef) {
pub(crate) unsafe fn schedule(&self, p: TaskRef, at: u64) {
let task = p.header();
task.expires_at.set(u64::MAX);
}
let update = if task.state.timer_enqueue() {
// Not in the queue, add it and update.
let prev = self.head.replace(Some(p));
task.timer_queue_item.next.set(prev);

pub(crate) unsafe fn update(&self, p: TaskRef, at: u64) {
let task = p.header();
if at < task.expires_at.get() {
true
} else {
// Expiration is sooner than previously set, update.
at < task.expires_at.get()
};

if update {
task.expires_at.set(at);
if task.state.timer_enqueue() {
let prev = self.head.replace(Some(p));
task.timer_queue_item.next.set(prev);
}
self.dispatch(super::wake_task);
}
}

unsafe fn dequeue_expired_internal(&self, now: u64, on_task: fn(TaskRef)) -> bool {
let mut changed = false;
self.retain(|p| {
let task = p.header();
if task.expires_at.get() <= now {
on_task(p);
changed = true;
false
} else {
true
pub(crate) unsafe fn dispatch(&self, on_task: fn(TaskRef)) {
loop {
let now = embassy_time_driver::now();

let mut next_expiration = u64::MAX;

self.retain(|p| {
let task = p.header();
let expires = task.expires_at.get();

if expires <= now {
// Timer expired, process task.
on_task(p);
false
} else {
// Timer didn't yet expire, or never expires.
next_expiration = min(next_expiration, expires);
expires != u64::MAX
}
});

if self.update_alarm(next_expiration) {
break;
}
});
changed
}
}

pub(crate) unsafe fn dequeue_expired(&self, now: u64, on_task: fn(TaskRef)) {
if self.dequeue_expired_internal(now, on_task) {
self.dispatch(on_task);
fn update_alarm(&self, next_alarm: u64) -> bool {
if next_alarm == u64::MAX {
true
} else {
embassy_time_driver::set_alarm(self.alarm, next_alarm)
}
}

Expand All @@ -84,27 +102,4 @@ impl TimerQueue {
}
}
}

unsafe fn next_expiration(&self) -> u64 {
let mut res = u64::MAX;

self.retain(|p| {
let task = p.header();
let expires = task.expires_at.get();
res = min(res, expires);
expires != u64::MAX
});

res
}

unsafe fn dispatch(&self, cb: fn(TaskRef)) {
// If this is already in the past, set_alarm might return false.
let next_expiration = self.next_expiration();
if !embassy_time_driver::set_alarm(self.alarm, next_expiration) {
// Time driver did not schedule the alarm,
// so we need to dequeue expired timers manually.
self.dequeue_expired_internal(embassy_time_driver::now(), cb);
}
}
}
1 change: 1 addition & 0 deletions embassy-executor/src/raw/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl<T> SyncUnsafeCell<T> {
*self.value.get()
}

#[cfg(feature = "integrated-timers")]
pub unsafe fn replace(&self, value: T) -> T {
core::mem::replace(&mut *self.value.get(), value)
}
Expand Down

0 comments on commit 63eac19

Please sign in to comment.