From 98646dee573ede41ad7ddc1a21cec2381747b91f Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 19 Nov 2024 13:32:07 -0800 Subject: [PATCH] Use Cell instead of RefCell for timer (#467) --- ntex-util/CHANGES.md | 4 + ntex-util/Cargo.toml | 2 +- ntex-util/src/time/wheel.rs | 352 +++++++++++++++++++----------------- 3 files changed, 188 insertions(+), 170 deletions(-) diff --git a/ntex-util/CHANGES.md b/ntex-util/CHANGES.md index 80460c896..5aab61a62 100644 --- a/ntex-util/CHANGES.md +++ b/ntex-util/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.6.0] - 2024-11-19 + +* Use Cell instead of RefCell for timer + ## [2.5.0] - 2024-11-04 * Use updated Service trait diff --git a/ntex-util/Cargo.toml b/ntex-util/Cargo.toml index bc496e0b5..3f717b770 100644 --- a/ntex-util/Cargo.toml +++ b/ntex-util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-util" -version = "2.5.0" +version = "2.6.0" authors = ["ntex contributors "] description = "Utilities for ntex framework" keywords = ["network", "framework", "async", "futures"] diff --git a/ntex-util/src/time/wheel.rs b/ntex-util/src/time/wheel.rs index b22538f32..6a5cb8702 100644 --- a/ntex-util/src/time/wheel.rs +++ b/ntex-util/src/time/wheel.rs @@ -2,9 +2,8 @@ //! //! Inspired by linux kernel timers system #![allow(arithmetic_overflow, clippy::let_underscore_future)] -use std::cell::{Cell, RefCell}; use std::time::{Duration, Instant, SystemTime}; -use std::{cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll}; +use std::{cell::Cell, cmp::max, future::Future, mem, pin::Pin, rc::Rc, task, task::Poll}; use futures_timer::Delay; use slab::Slab; @@ -72,7 +71,7 @@ const fn as_millis(dur: Duration) -> u64 { /// Resolution is 5ms #[inline] pub fn now() -> Instant { - TIMER.with(Timer::now) + TIMER.with(|t| t.with_mod(|inner| t.now(inner))) } /// Returns the system time corresponding to “now”. @@ -80,7 +79,7 @@ pub fn now() -> Instant { /// Resolution is 5ms #[inline] pub fn system_time() -> SystemTime { - TIMER.with(Timer::system_time) + TIMER.with(|t| t.with_mod(|inner| t.system_time(inner))) } /// Returns the system time corresponding to “now”. @@ -90,7 +89,7 @@ pub fn system_time() -> SystemTime { #[inline] #[doc(hidden)] pub fn query_system_time() -> SystemTime { - TIMER.with(Timer::system_time) + TIMER.with(|t| t.with_mod(|inner| t.system_time(inner))) } #[derive(Debug)] @@ -108,25 +107,27 @@ impl TimerHandle { } pub fn is_elapsed(&self) -> bool { - TIMER.with(|t| t.0.inner.borrow().timers[self.0].bucket.is_none()) + TIMER.with(|t| t.with_mod(|m| m.timers[self.0].bucket.is_none())) } pub fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<()> { TIMER.with(|t| { - let entry = &t.0.inner.borrow().timers[self.0]; - if entry.bucket.is_none() { - Poll::Ready(()) - } else { - entry.task.register(cx.waker()); - Poll::Pending - } + t.with_mod(|inner| { + let entry = &inner.timers[self.0]; + if entry.bucket.is_none() { + Poll::Ready(()) + } else { + entry.task.register(cx.waker()); + Poll::Pending + } + }) }) } } impl Drop for TimerHandle { fn drop(&mut self) { - TIMER.with(|t| t.remove_timer(self.0)); + TIMER.with(|t| t.with_mod(|inner| inner.remove_timer_bucket(self.0, true))) } } @@ -156,7 +157,7 @@ struct TimerInner { lowres_time: Cell>, lowres_stime: Cell>, lowres_driver: LocalWaker, - inner: RefCell, + inner: Cell>>, } struct TimerMod { @@ -170,6 +171,11 @@ struct TimerMod { impl Timer { fn new() -> Self { + println!( + "=========== {:?}", + std::mem::size_of::>>() + ); + Timer(Rc::new(TimerInner { elapsed: Cell::new(0), elapsed_time: Cell::new(None), @@ -179,16 +185,26 @@ impl Timer { lowres_time: Cell::new(None), lowres_stime: Cell::new(None), lowres_driver: LocalWaker::new(), - inner: RefCell::new(TimerMod { + inner: Cell::new(Some(Box::new(TimerMod { buckets: Self::create_buckets(), timers: Slab::default(), driver_sleep: Delay::new(Duration::ZERO), occupied: [0; WHEEL_SIZE], lowres_driver_sleep: Delay::new(Duration::ZERO), - }), + }))), })) } + fn with_mod(&self, f: F) -> R + where + F: FnOnce(&mut TimerMod) -> R, + { + let mut m = self.0.inner.take().unwrap(); + let result = f(&mut m); + self.0.inner.set(Some(m)); + result + } + fn create_buckets() -> Vec { let mut buckets = Vec::with_capacity(WHEEL_SIZE); for idx in 0..WHEEL_SIZE { @@ -199,7 +215,7 @@ impl Timer { buckets } - fn now(&self) -> Instant { + fn now(&self, inner: &mut TimerMod) -> Instant { if let Some(cur) = self.0.lowres_time.get() { cur } else { @@ -212,14 +228,14 @@ impl Timer { if flags.contains(Flags::LOWRES_DRIVER) { self.0.lowres_driver.wake(); } else { - LowresTimerDriver::start(self.0.clone()); + LowresTimerDriver::start(self.0.clone(), inner); } } now } } - fn system_time(&self) -> SystemTime { + fn system_time(&self, inner: &mut TimerMod) -> SystemTime { if let Some(cur) = self.0.lowres_stime.get() { cur } else { @@ -232,7 +248,7 @@ impl Timer { if flags.contains(Flags::LOWRES_DRIVER) { self.0.lowres_driver.wake(); } else { - LowresTimerDriver::start(self.0.clone()); + LowresTimerDriver::start(self.0.clone(), inner); } } now @@ -241,105 +257,103 @@ impl Timer { /// Add the timer into the hash bucket fn add_timer(&self, millis: u64) -> TimerHandle { - if millis == 0 { - let mut inner = self.0.inner.borrow_mut(); - - let entry = inner.timers.vacant_entry(); - let no = entry.key(); - - entry.insert(TimerEntry { - bucket_entry: 0, - bucket: None, - task: LocalWaker::new(), - }); - return TimerHandle(no); - } + self.with_mod(|inner| { + if millis == 0 { + let entry = inner.timers.vacant_entry(); + let no = entry.key(); + + entry.insert(TimerEntry { + bucket_entry: 0, + bucket: None, + task: LocalWaker::new(), + }); + return TimerHandle(no); + } - let mut flags = self.0.flags.get(); - flags.insert(Flags::RUNNING); - self.0.flags.set(flags); + let mut flags = self.0.flags.get(); + flags.insert(Flags::RUNNING); + self.0.flags.set(flags); - let now = self.now(); - let elapsed_time = self.0.elapsed_time(); - let delta = if now >= elapsed_time { - to_units(as_millis(now - elapsed_time) + millis) - } else { - to_units(millis) - }; - - let (no, bucket_expiry) = { - // crate timer entry - let (idx, bucket_expiry) = self - .0 - .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta); - - let no = self.0.inner.borrow_mut().add_entry(idx); - (no, bucket_expiry) - }; - - // Check whether new bucket expire earlier - if bucket_expiry < self.0.next_expiry.get() { - self.0.next_expiry.set(bucket_expiry); - if flags.contains(Flags::DRIVER_STARTED) { - flags.insert(Flags::DRIVER_RECALC); - self.0.flags.set(flags); - self.0.driver.wake(); + let now = self.now(inner); + let elapsed_time = self.0.elapsed_time(); + let delta = if now >= elapsed_time { + to_units(as_millis(now - elapsed_time) + millis) } else { - TimerDriver::start(self.0.clone()); + to_units(millis) + }; + + let (no, bucket_expiry) = { + // crate timer entry + let (idx, bucket_expiry) = self + .0 + .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta); + + let no = inner.add_entry(idx); + (no, bucket_expiry) + }; + + // Check whether new bucket expire earlier + if bucket_expiry < self.0.next_expiry.get() { + self.0.next_expiry.set(bucket_expiry); + if flags.contains(Flags::DRIVER_STARTED) { + flags.insert(Flags::DRIVER_RECALC); + self.0.flags.set(flags); + self.0.driver.wake(); + } else { + TimerDriver::start(self.0.clone(), inner); + } } - } - TimerHandle(no) + TimerHandle(no) + }) } /// Update existing timer fn update_timer(&self, hnd: usize, millis: u64) { - if millis == 0 { - self.remove_timer_bucket(hnd); - self.0.inner.borrow_mut().timers[hnd].bucket = None; - return; - } + self.with_mod(|inner| { + if millis == 0 { + inner.remove_timer_bucket(hnd, false); + inner.timers[hnd].bucket = None; + return; + } - let now = self.now(); - let elapsed_time = self.0.elapsed_time(); - let delta = if now >= elapsed_time { - max(to_units(as_millis(now - elapsed_time) + millis), 1) - } else { - max(to_units(millis), 1) - }; + let now = self.now(inner); + let elapsed_time = self.0.elapsed_time(); + let delta = if now >= elapsed_time { + max(to_units(as_millis(now - elapsed_time) + millis), 1) + } else { + max(to_units(millis), 1) + }; - let bucket_expiry = { - // calc bucket - let (idx, bucket_expiry) = self - .0 - .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta); + let bucket_expiry = { + // calc bucket + let (idx, bucket_expiry) = self + .0 + .calc_wheel_index(self.0.elapsed.get().wrapping_add(delta), delta); - self.0.inner.borrow_mut().update_entry(hnd, idx); + inner.update_entry(hnd, idx); - bucket_expiry - }; + bucket_expiry + }; - // Check whether new bucket expire earlier - if bucket_expiry < self.0.next_expiry.get() { - self.0.next_expiry.set(bucket_expiry); - let mut flags = self.0.flags.get(); - if flags.contains(Flags::DRIVER_STARTED) { - flags.insert(Flags::DRIVER_RECALC); - self.0.flags.set(flags); - self.0.driver.wake(); - } else { - TimerDriver::start(self.0.clone()); + // Check whether new bucket expire earlier + if bucket_expiry < self.0.next_expiry.get() { + self.0.next_expiry.set(bucket_expiry); + let mut flags = self.0.flags.get(); + if flags.contains(Flags::DRIVER_STARTED) { + flags.insert(Flags::DRIVER_RECALC); + self.0.flags.set(flags); + self.0.driver.wake(); + } else { + TimerDriver::start(self.0.clone(), inner); + } } - } - } - - fn remove_timer(&self, handle: usize) { - self.0.inner.borrow_mut().remove_timer_bucket(handle, true) + }) } - fn remove_timer_bucket(&self, handle: usize) { - self.0.inner.borrow_mut().remove_timer_bucket(handle, false) - } + // fn remove_timer(&self, handle: usize) { + // self.0.inner.borrow_mut().remove_timer_bucket(handle, true) + // } } impl TimerMod { @@ -424,6 +438,16 @@ impl TimerMod { } impl TimerInner { + fn with_mod(&self, f: F) -> R + where + F: FnOnce(&mut TimerMod) -> R, + { + let mut m = self.inner.take().unwrap(); + let result = f(&mut m); + self.inner.set(Some(m)); + result + } + fn calc_wheel_index(&self, expires: u64, delta: u64) -> (usize, u64) { if delta < lvl_start(1) { Self::calc_index(expires, 0) @@ -481,16 +505,12 @@ impl TimerInner { } } - fn execute_expired_timers(&self) { - self.inner - .borrow_mut() - .execute_expired_timers(self.next_expiry.get()); + fn execute_expired_timers(&self, inner: &mut TimerMod) { + inner.execute_expired_timers(self.next_expiry.get()); } /// Find next expiration bucket - fn next_pending_bucket(&self) -> Option { - let inner = self.inner.borrow_mut(); - + fn next_pending_bucket(&self, inner: &mut TimerMod) -> Option { let mut clk = self.elapsed.get(); let mut next = u64::MAX; @@ -537,7 +557,7 @@ impl TimerInner { fn stop_wheel(&self) { // mark all timers as elapsed - if let Ok(mut inner) = self.inner.try_borrow_mut() { + if let Some(mut inner) = self.inner.take() { let mut buckets = mem::take(&mut inner.buckets); for b in &mut buckets { for no in b.entries.drain() { @@ -555,6 +575,7 @@ impl TimerInner { inner.buckets = buckets; inner.occupied = [0; WHEEL_SIZE]; + self.inner.set(Some(inner)); } } } @@ -604,12 +625,11 @@ impl TimerEntry { struct TimerDriver(Rc); impl TimerDriver { - fn start(timer: Rc) { + fn start(timer: Rc, inner: &mut TimerMod) { let mut flags = timer.flags.get(); flags.insert(Flags::DRIVER_STARTED); timer.flags.set(flags); - timer.inner.borrow_mut().driver_sleep = - Delay::new(Duration::from_millis(timer.next_expiry_ms())); + inner.driver_sleep = Delay::new(Duration::from_millis(timer.next_expiry_ms())); let _ = crate::spawn(TimerDriver(timer)); } @@ -627,54 +647,53 @@ impl Future for TimerDriver { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { self.0.driver.register(cx.waker()); - let mut flags = self.0.flags.get(); - if flags.contains(Flags::DRIVER_RECALC) { - flags.remove(Flags::DRIVER_RECALC); - self.0.flags.set(flags); - - let now = Instant::now(); - let deadline = - if let Some(diff) = now.checked_duration_since(self.0.elapsed_time()) { - Duration::from_millis(self.0.next_expiry_ms()).saturating_sub(diff) - } else { - Duration::from_millis(self.0.next_expiry_ms()) - }; - self.0.inner.borrow_mut().driver_sleep.reset(deadline); - } + self.0.with_mod(|inner| { + let mut flags = self.0.flags.get(); + if flags.contains(Flags::DRIVER_RECALC) { + flags.remove(Flags::DRIVER_RECALC); + self.0.flags.set(flags); - loop { - if Pin::new(&mut self.0.inner.borrow_mut().driver_sleep) - .poll(cx) - .is_ready() - { let now = Instant::now(); - self.0.elapsed.set(self.0.next_expiry.get()); - self.0.elapsed_time.set(Some(now)); - self.0.execute_expired_timers(); - - if let Some(next_expiry) = self.0.next_pending_bucket() { - self.0.next_expiry.set(next_expiry); - let dur = Duration::from_millis(self.0.next_expiry_ms()); - self.0.inner.borrow_mut().driver_sleep.reset(dur); - continue; - } else { - self.0.next_expiry.set(u64::MAX); - self.0.elapsed_time.set(None); + let deadline = + if let Some(diff) = now.checked_duration_since(self.0.elapsed_time()) { + Duration::from_millis(self.0.next_expiry_ms()).saturating_sub(diff) + } else { + Duration::from_millis(self.0.next_expiry_ms()) + }; + inner.driver_sleep.reset(deadline); + } + + loop { + if Pin::new(&mut inner.driver_sleep).poll(cx).is_ready() { + let now = Instant::now(); + self.0.elapsed.set(self.0.next_expiry.get()); + self.0.elapsed_time.set(Some(now)); + self.0.execute_expired_timers(inner); + + if let Some(next_expiry) = self.0.next_pending_bucket(inner) { + self.0.next_expiry.set(next_expiry); + let dur = Duration::from_millis(self.0.next_expiry_ms()); + inner.driver_sleep.reset(dur); + continue; + } else { + self.0.next_expiry.set(u64::MAX); + self.0.elapsed_time.set(None); + } } + return Poll::Pending; } - return Poll::Pending; - } + }) } } struct LowresTimerDriver(Rc); impl LowresTimerDriver { - fn start(timer: Rc) { + fn start(timer: Rc, inner: &mut TimerMod) { let mut flags = timer.flags.get(); flags.insert(Flags::LOWRES_DRIVER); timer.flags.set(flags); - timer.inner.borrow_mut().lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION); + inner.lowres_driver_sleep = Delay::new(LOWRES_RESOLUTION); let _ = crate::spawn(LowresTimerDriver(timer)); } @@ -692,27 +711,22 @@ impl Future for LowresTimerDriver { fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { self.0.lowres_driver.register(cx.waker()); - let mut flags = self.0.flags.get(); - if !flags.contains(Flags::LOWRES_TIMER) { - flags.insert(Flags::LOWRES_TIMER); - self.0.flags.set(flags); - self.0 - .inner - .borrow_mut() - .lowres_driver_sleep - .reset(LOWRES_RESOLUTION); - } + self.0.with_mod(|inner| { + let mut flags = self.0.flags.get(); + if !flags.contains(Flags::LOWRES_TIMER) { + flags.insert(Flags::LOWRES_TIMER); + self.0.flags.set(flags); + inner.lowres_driver_sleep.reset(LOWRES_RESOLUTION); + } - if Pin::new(&mut self.0.inner.borrow_mut().lowres_driver_sleep) - .poll(cx) - .is_ready() - { - self.0.lowres_time.set(None); - self.0.lowres_stime.set(None); - flags.remove(Flags::LOWRES_TIMER); - self.0.flags.set(flags); - } - Poll::Pending + if Pin::new(&mut inner.lowres_driver_sleep).poll(cx).is_ready() { + self.0.lowres_time.set(None); + self.0.lowres_stime.set(None); + flags.remove(Flags::LOWRES_TIMER); + self.0.flags.set(flags); + } + Poll::Pending + }) } }