Skip to content

Commit

Permalink
Refactor integrated-timers
Browse files Browse the repository at this point in the history
  • Loading branch information
bugadani committed Dec 8, 2024
1 parent 86578ac commit efdddc3
Show file tree
Hide file tree
Showing 32 changed files with 613 additions and 1,183 deletions.
2 changes: 1 addition & 1 deletion .github/ci/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cargo test --manifest-path ./embassy-futures/Cargo.toml
cargo test --manifest-path ./embassy-sync/Cargo.toml
cargo test --manifest-path ./embassy-embedded-hal/Cargo.toml
cargo test --manifest-path ./embassy-hal-internal/Cargo.toml
cargo test --manifest-path ./embassy-time/Cargo.toml --features generic-queue,mock-driver
cargo test --manifest-path ./embassy-time/Cargo.toml --features mock-driver
cargo test --manifest-path ./embassy-time-driver/Cargo.toml

cargo test --manifest-path ./embassy-boot/Cargo.toml
Expand Down
4 changes: 3 additions & 1 deletion ci-xtensa.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ cargo batch \
--- build --release --manifest-path embassy-executor/Cargo.toml --target xtensa-esp32-none-elf --features arch-spin,executor-thread \
--- build --release --manifest-path embassy-executor/Cargo.toml --target xtensa-esp32-none-elf --features arch-spin,executor-thread,integrated-timers \
--- build --release --manifest-path embassy-sync/Cargo.toml --target xtensa-esp32s2-none-elf --features defmt \
--- build --release --manifest-path embassy-time/Cargo.toml --target xtensa-esp32s2-none-elf --features defmt,defmt-timestamp-uptime,generic-queue-8,mock-driver \
--- build --release --manifest-path embassy-time/Cargo.toml --target xtensa-esp32s2-none-elf --features defmt,defmt-timestamp-uptime,mock-driver \
--- build --release --manifest-path embassy-time-queue-driver/Cargo.toml --target xtensa-esp32s2-none-elf --features integrated-timers \
--- build --release --manifest-path embassy-time-queue-driver/Cargo.toml --target xtensa-esp32s2-none-elf --features generic-queue-8 \
--- build --release --manifest-path embassy-net/Cargo.toml --target xtensa-esp32-none-elf --features defmt,tcp,udp,dns,proto-ipv4,medium-ethernet,packet-trace \
--- build --release --manifest-path embassy-net/Cargo.toml --target xtensa-esp32-none-elf --features defmt,tcp,udp,dns,proto-ipv4,multicast,medium-ethernet \
--- build --release --manifest-path embassy-net/Cargo.toml --target xtensa-esp32-none-elf --features defmt,tcp,udp,dns,dhcpv4,medium-ethernet \
Expand Down
4 changes: 3 additions & 1 deletion ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ cargo batch \
--- build --release --manifest-path embassy-executor/Cargo.toml --target riscv32imac-unknown-none-elf --features arch-riscv32,executor-thread \
--- build --release --manifest-path embassy-executor/Cargo.toml --target riscv32imac-unknown-none-elf --features arch-riscv32,executor-thread,integrated-timers \
--- build --release --manifest-path embassy-sync/Cargo.toml --target thumbv6m-none-eabi --features defmt \
--- build --release --manifest-path embassy-time/Cargo.toml --target thumbv6m-none-eabi --features defmt,defmt-timestamp-uptime,generic-queue-8,mock-driver \
--- build --release --manifest-path embassy-time/Cargo.toml --target thumbv6m-none-eabi --features defmt,defmt-timestamp-uptime,mock-driver \
--- build --release --manifest-path embassy-time-queue-driver/Cargo.toml --target thumbv6m-none-eabi --features integrated-timers \
--- build --release --manifest-path embassy-time-queue-driver/Cargo.toml --target thumbv6m-none-eabi --features generic-queue-8 \
--- build --release --manifest-path embassy-net/Cargo.toml --target thumbv7em-none-eabi --features defmt,tcp,udp,dns,proto-ipv4,medium-ethernet,packet-trace \
--- build --release --manifest-path embassy-net/Cargo.toml --target thumbv7em-none-eabi --features defmt,tcp,udp,dns,proto-ipv4,multicast,medium-ethernet \
--- build --release --manifest-path embassy-net/Cargo.toml --target thumbv7em-none-eabi --features defmt,tcp,udp,dns,dhcpv4,medium-ethernet \
Expand Down
2 changes: 1 addition & 1 deletion embassy-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ nightly = ["embassy-executor-macros/nightly"]
turbowakers = []

## Use the executor-integrated `embassy-time` timer queue.
integrated-timers = ["dep:embassy-time-driver", "dep:embassy-time-queue-driver"]
integrated-timers = ["dep:embassy-time-driver"]

#! ### Architecture
_arch = [] # some arch was picked
Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/avr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}

init(self.inner.spawner());

loop {
Expand Down
6 changes: 0 additions & 6 deletions embassy-executor/src/arch/cortex_m.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}
init(self.inner.spawner());

loop {
Expand Down Expand Up @@ -210,9 +207,6 @@ mod interrupt {
}

let executor = unsafe { (&*self.executor.get()).assume_init_ref() };
unsafe {
executor.initialize();
}

unsafe { NVIC::unmask(irq) }

Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/riscv32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}

init(self.inner.spawner());

loop {
Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/spin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}

init(self.inner.spawner());

loop {
Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ mod thread {
///
/// This function never returns.
pub fn run(&'static mut self, init: impl FnOnce(Spawner)) -> ! {
unsafe {
self.inner.initialize();
}

init(self.inner.spawner());

loop {
Expand Down
4 changes: 0 additions & 4 deletions embassy-executor/src/arch/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ mod thread {
/// - a `static mut` (unsafe)
/// - a local variable in a function you know never returns (like `fn main() -> !`), upgrading its lifetime with `transmute`. (unsafe)
pub fn start(&'static mut self, init: impl FnOnce(Spawner)) {
unsafe {
self.inner.initialize();
}

unsafe {
let executor = &self.inner;
let future = Closure::new(move |_| {
Expand Down
134 changes: 26 additions & 108 deletions embassy-executor/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod run_queue;
mod state;

#[cfg(feature = "integrated-timers")]
mod timer_queue;
pub mod timer_queue;
pub(crate) mod util;
#[cfg_attr(feature = "turbowakers", path = "waker_turbo.rs")]
mod waker;
Expand All @@ -29,8 +29,6 @@ use core::pin::Pin;
use core::ptr::NonNull;
use core::task::{Context, Poll};

#[cfg(feature = "integrated-timers")]
use embassy_time_driver::AlarmHandle;
#[cfg(feature = "rtos-trace")]
use rtos_trace::trace;

Expand All @@ -47,8 +45,7 @@ pub(crate) struct TaskHeader {
pub(crate) executor: SyncUnsafeCell<Option<&'static SyncExecutor>>,
poll_fn: SyncUnsafeCell<Option<unsafe fn(TaskRef)>>,

#[cfg(feature = "integrated-timers")]
pub(crate) expires_at: SyncUnsafeCell<u64>,
/// Integrated timer queue storage. This field should not be accessed outside of the timer queue.
#[cfg(feature = "integrated-timers")]
pub(crate) timer_queue_item: timer_queue::TimerQueueItem,
}
Expand Down Expand Up @@ -80,6 +77,12 @@ impl TaskRef {
unsafe { self.ptr.as_ref() }
}

/// Returns a reference to the executor that the task is currently running on.
#[cfg(feature = "integrated-timers")]
pub unsafe fn executor(self) -> Option<&'static Executor> {
self.header().executor.get().map(|e| Executor::wrap(e))
}

/// The returned pointer is valid for the entire TaskStorage.
pub(crate) fn as_ptr(self) -> *const TaskHeader {
self.ptr.as_ptr()
Expand Down Expand Up @@ -120,8 +123,6 @@ impl<F: Future + 'static> TaskStorage<F> {
// Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss`
poll_fn: SyncUnsafeCell::new(None),

#[cfg(feature = "integrated-timers")]
expires_at: SyncUnsafeCell::new(0),
#[cfg(feature = "integrated-timers")]
timer_queue_item: timer_queue::TimerQueueItem::new(),
},
Expand Down Expand Up @@ -160,9 +161,6 @@ impl<F: Future + 'static> TaskStorage<F> {
Poll::Ready(_) => {
this.future.drop_in_place();
this.raw.state.despawn();

#[cfg(feature = "integrated-timers")]
this.raw.expires_at.set(u64::MAX);
}
Poll::Pending => {}
}
Expand Down Expand Up @@ -316,34 +314,16 @@ impl Pender {
pub(crate) struct SyncExecutor {
run_queue: RunQueue,
pender: Pender,

#[cfg(feature = "integrated-timers")]
pub(crate) timer_queue: timer_queue::TimerQueue,
#[cfg(feature = "integrated-timers")]
alarm: AlarmHandle,
}

impl SyncExecutor {
pub(crate) fn new(pender: Pender) -> Self {
#[cfg(feature = "integrated-timers")]
let alarm = unsafe { unwrap!(embassy_time_driver::allocate_alarm()) };

Self {
run_queue: RunQueue::new(),
pender,

#[cfg(feature = "integrated-timers")]
timer_queue: timer_queue::TimerQueue::new(),
#[cfg(feature = "integrated-timers")]
alarm,
}
}

pub(crate) unsafe fn initialize(&'static self) {
#[cfg(feature = "integrated-timers")]
embassy_time_driver::set_alarm_callback(self.alarm, Self::alarm_callback, self as *const _ as *mut ());
}

/// Enqueue a task in the task queue
///
/// # Safety
Expand All @@ -360,12 +340,6 @@ impl SyncExecutor {
}
}

#[cfg(feature = "integrated-timers")]
fn alarm_callback(ctx: *mut ()) {
let this: &Self = unsafe { &*(ctx as *const Self) };
this.pender.pend();
}

pub(super) unsafe fn spawn(&'static self, task: TaskRef) {
task.header().executor.set(Some(self));

Expand All @@ -379,56 +353,27 @@ impl SyncExecutor {
///
/// Same as [`Executor::poll`], plus you must only call this on the thread this executor was created.
pub(crate) unsafe fn poll(&'static self) {
#[allow(clippy::never_loop)]
loop {
#[cfg(feature = "integrated-timers")]
self.timer_queue
.dequeue_expired(embassy_time_driver::now(), wake_task_no_pend);

self.run_queue.dequeue_all(|p| {
let task = p.header();

#[cfg(feature = "integrated-timers")]
task.expires_at.set(u64::MAX);

if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}

#[cfg(feature = "rtos-trace")]
trace::task_exec_begin(p.as_ptr() as u32);
self.run_queue.dequeue_all(|p| {
let task = p.header();

if !task.state.run_dequeue() {
// If task is not running, ignore it. This can happen in the following scenario:
// - Task gets dequeued, poll starts
// - While task is being polled, it gets woken. It gets placed in the queue.
// - Task poll finishes, returning done=true
// - RUNNING bit is cleared, but the task is already in the queue.
return;
}

// Run the task
task.poll_fn.get().unwrap_unchecked()(p);
#[cfg(feature = "rtos-trace")]
trace::task_exec_begin(p.as_ptr() as u32);

#[cfg(feature = "rtos-trace")]
trace::task_exec_end();
// Run the task
task.poll_fn.get().unwrap_unchecked()(p);

// Enqueue or update into timer_queue
#[cfg(feature = "integrated-timers")]
self.timer_queue.update(p);
});

#[cfg(feature = "integrated-timers")]
{
// If this is already in the past, set_alarm might return false
// In that case do another poll loop iteration.
let next_expiration = self.timer_queue.next_expiration();
if embassy_time_driver::set_alarm(self.alarm, next_expiration) {
break;
}
}

#[cfg(not(feature = "integrated-timers"))]
{
break;
}
}
#[cfg(feature = "rtos-trace")]
trace::task_exec_end();
});

#[cfg(feature = "rtos-trace")]
trace::system_idle();
Expand Down Expand Up @@ -494,15 +439,6 @@ impl Executor {
}
}

/// Initializes the executor.
///
/// # Safety
///
/// This function must be called once before any other method is called.
pub unsafe fn initialize(&'static self) {
self.inner.initialize();
}

/// Spawn a task in this executor.
///
/// # Safety
Expand Down Expand Up @@ -576,24 +512,6 @@ pub fn wake_task_no_pend(task: TaskRef) {
}
}

#[cfg(feature = "integrated-timers")]
struct TimerQueue;

#[cfg(feature = "integrated-timers")]
impl embassy_time_queue_driver::TimerQueue for TimerQueue {
fn schedule_wake(&'static self, at: u64, waker: &core::task::Waker) {
let task = waker::task_from_waker(waker);
let task = task.header();
unsafe {
let expires_at = task.expires_at.get();
task.expires_at.set(expires_at.min(at));
}
}
}

#[cfg(feature = "integrated-timers")]
embassy_time_queue_driver::timer_queue_impl!(static TIMER_QUEUE: TimerQueue = TimerQueue);

#[cfg(all(feature = "rtos-trace", feature = "integrated-timers"))]
const fn gcd(a: u64, b: u64) -> u64 {
if b == 0 {
Expand Down
Loading

0 comments on commit efdddc3

Please sign in to comment.