From ee25febafc64f141eac4304ab97fda4b461fe94a Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Tue, 2 Jan 2024 12:37:47 +0100 Subject: [PATCH] fix: add async API Changes the sequential-storage APIs for queues and maps to be async first. Non-async behavior is achieved using a block_on, optionally combined with an adapter for the flash supporting embedded-storage-async. --- Cargo.toml | 6 +- README.md | 5 + fuzz/Cargo.toml | 1 + fuzz/fuzz_targets/map.rs | 21 +-- fuzz/fuzz_targets/queue.rs | 92 ++++++++--- src/item.rs | 264 +++++++++++++++++------------- src/lib.rs | 136 +++++++++++---- src/map.rs | 327 ++++++++++++++++++++----------------- src/mock_flash.rs | 109 +++++++------ src/queue.rs | 326 ++++++++++++++++++++---------------- 10 files changed, 771 insertions(+), 516 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a635feb..e28131b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,12 +12,14 @@ keywords = ["no_std", "embedded", "flash", "storage"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -embedded-storage = "0.3.0" +embedded-storage-async = "0.4.1" defmt = { version = "0.3", optional = true } +futures = { version = "0.3.30", features = ["executor"], optional = true } [dev-dependencies] approx = "0.5.1" +futures = { version = "0.3.30", features = ["executor"] } [features] defmt = ["dep:defmt"] -_test = [] +_test = ["futures"] diff --git a/README.md b/README.md index 9e62d2d..be75310 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,11 @@ When using peek_many, you can look at all data from oldest to newest. aid with shutdown/cancellation issues. - When the state is corrupted, many issues can now be repaired with the repair functions in the map and queue modules - Made changes to the entire to better survive shutoffs +- *Breaking* Convert API to async first supporting the traits from embedded-storage-async. Flash + drivers supporting `sequential-storage` can be wrapped using + [BlockingAsync](https://docs.embassy.dev/embassy-embedded-hal/git/default/adapter/struct.BlockingAsync.html), and a + simple [blocking executor](https://docs.rs/futures/0.3.30/futures/executor/fn.block_on.html) can be used to call the + API from a non-async function. ### 0.6.2 - 22-12-23 diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index c83a249..48cd8d5 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -13,6 +13,7 @@ sequential-storage = { path = "..", features = ["_test"] } arbitrary = { version = "1.2.2", features = ["derive"] } rand = "0.8.5" rand_pcg = "0.3.1" +futures = { version = "0.3.30", features = ["executor"] } # Prevent this from interfering with workspaces [workspace] diff --git a/fuzz/fuzz_targets/map.rs b/fuzz/fuzz_targets/map.rs index ce8d211..c5d522c 100644 --- a/fuzz/fuzz_targets/map.rs +++ b/fuzz/fuzz_targets/map.rs @@ -1,5 +1,6 @@ #![no_main] +use futures::executor::block_on; use libfuzzer_sys::arbitrary::Arbitrary; use libfuzzer_sys::fuzz_target; use rand::SeedableRng; @@ -117,12 +118,12 @@ fn fuzz(ops: Input) { match op.clone() { Op::Store(op) => { let item = op.into_test_item(&mut rng); - match sequential_storage::map::store_item( + match block_on(sequential_storage::map::store_item( &mut flash, FLASH_RANGE, &mut buf, item.clone(), - ) { + )) { Ok(_) => { map.insert(item.key, item.value); } @@ -131,12 +132,12 @@ fn fuzz(ops: Input) { value: MockFlashError::EarlyShutoff(_), backtrace: _backtrace, }) => { - match sequential_storage::map::fetch_item::( + match block_on(sequential_storage::map::fetch_item::( &mut flash, FLASH_RANGE, &mut buf, item.key, - ) { + )) { Ok(Some(check_item)) if check_item.key == item.key && check_item.value == item.value => @@ -161,11 +162,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while storing! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::map::try_repair::( + block_on(sequential_storage::map::try_repair::( &mut flash, FLASH_RANGE, &mut buf, - ) + )) .unwrap(); corruption_repaired = true; retry = true; @@ -174,12 +175,12 @@ fn fuzz(ops: Input) { } } Op::Fetch(key) => { - match sequential_storage::map::fetch_item::( + match block_on(sequential_storage::map::fetch_item::( &mut flash, FLASH_RANGE, &mut buf, key, - ) { + )) { Ok(Some(fetch_result)) => { let map_value = map .get(&key) @@ -207,11 +208,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while fetching! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::map::try_repair::( + block_on(sequential_storage::map::try_repair::( &mut flash, FLASH_RANGE, &mut buf, - ) + )) .unwrap(); corruption_repaired = true; retry = true; diff --git a/fuzz/fuzz_targets/queue.rs b/fuzz/fuzz_targets/queue.rs index 72a3874..5905c8c 100644 --- a/fuzz/fuzz_targets/queue.rs +++ b/fuzz/fuzz_targets/queue.rs @@ -1,5 +1,6 @@ #![no_main] +use futures::executor::block_on; use libfuzzer_sys::arbitrary::Arbitrary; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -33,6 +34,9 @@ struct PushOp { value_len: u8, } +#[repr(align(4))] +struct AlignedBuf([u8; MAX_VALUE_SIZE + 1]); + fn fuzz(ops: Input) { const PAGES: usize = 4; const WORD_SIZE: usize = 4; @@ -45,7 +49,7 @@ fn fuzz(ops: Input) { const FLASH_RANGE: Range = 0x000..0x1000; let mut order = VecDeque::new(); - let mut buf = [0; MAX_VALUE_SIZE + 1]; + let mut buf = AlignedBuf([0; MAX_VALUE_SIZE + 1]); let mut rng = rand_pcg::Pcg32::seed_from_u64(ops.seed); @@ -66,10 +70,10 @@ fn fuzz(ops: Input) { Op::Push(op) => { let val: Vec = (0..op.value_len as usize % 16).map(|_| rng.gen()).collect(); - let max_fit = match sequential_storage::queue::find_max_fit( + let max_fit = match block_on(sequential_storage::queue::find_max_fit( &mut flash, FLASH_RANGE, - ) { + )) { Ok(val) => val, Err(Error::Corrupted { backtrace: _backtrace, @@ -79,7 +83,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while finding max fit! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap(); + block_on(sequential_storage::queue::try_repair( + &mut flash, + FLASH_RANGE, + )) + .unwrap(); corruption_repaired = true; retry = true; continue; @@ -87,7 +95,13 @@ fn fuzz(ops: Input) { Err(e) => panic!("Error while finding max fit: {e:?}"), }; - match sequential_storage::queue::push(&mut flash, FLASH_RANGE, &val, false) { + buf.0[..val.len()].copy_from_slice(&val); + match block_on(sequential_storage::queue::push( + &mut flash, + FLASH_RANGE, + &buf.0[..val.len()], + false, + )) { Ok(_) => { if let Some(max_fit) = max_fit { if val.len() > max_fit as usize { @@ -128,7 +142,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while pushing! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap(); + block_on(sequential_storage::queue::try_repair( + &mut flash, + FLASH_RANGE, + )) + .unwrap(); corruption_repaired = true; retry = true; } @@ -136,7 +154,11 @@ fn fuzz(ops: Input) { } } Op::Pop => { - match sequential_storage::queue::pop(&mut flash, FLASH_RANGE, &mut buf) { + match block_on(sequential_storage::queue::pop( + &mut flash, + FLASH_RANGE, + &mut buf.0, + )) { Ok(value) => { assert_eq!( value, @@ -168,7 +190,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while popping (single)! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap(); + block_on(sequential_storage::queue::try_repair( + &mut flash, + FLASH_RANGE, + )) + .unwrap(); corruption_repaired = true; retry = true; } @@ -176,10 +202,10 @@ fn fuzz(ops: Input) { } } Op::PopMany(n) => { - let mut popper = match sequential_storage::queue::pop_many( + let mut popper = match block_on(sequential_storage::queue::pop_many( &mut flash, FLASH_RANGE, - ) { + )) { Ok(val) => val, Err(Error::Corrupted { backtrace: _backtrace, @@ -189,7 +215,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while creating popper! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap(); + block_on(sequential_storage::queue::try_repair( + &mut flash, + FLASH_RANGE, + )) + .unwrap(); corruption_repaired = true; retry = true; continue; @@ -198,7 +228,7 @@ fn fuzz(ops: Input) { }; for i in 0..*n { - match popper.next(&mut buf) { + match block_on(popper.next(&mut buf.0)) { Ok(value) => { assert_eq!( value, @@ -234,8 +264,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while popping (many)! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE) - .unwrap(); + block_on(sequential_storage::queue::try_repair( + &mut flash, + FLASH_RANGE, + )) + .unwrap(); corruption_repaired = true; retry = true; *n -= i; @@ -246,7 +279,11 @@ fn fuzz(ops: Input) { } } Op::Peek => { - match sequential_storage::queue::peek(&mut flash, FLASH_RANGE, &mut buf) { + match block_on(sequential_storage::queue::peek( + &mut flash, + FLASH_RANGE, + &mut buf.0, + )) { Ok(value) => { assert_eq!( value.map(|b| &b[..]), @@ -261,7 +298,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while peeking (single)! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap(); + block_on(sequential_storage::queue::try_repair( + &mut flash, + FLASH_RANGE, + )) + .unwrap(); corruption_repaired = true; retry = true; } @@ -269,10 +310,10 @@ fn fuzz(ops: Input) { } } Op::PeekMany(n) => { - let mut peeker = match sequential_storage::queue::peek_many( + let mut peeker = match block_on(sequential_storage::queue::peek_many( &mut flash, FLASH_RANGE, - ) { + )) { Ok(val) => val, Err(Error::Corrupted { backtrace: _backtrace, @@ -282,7 +323,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while creating peeker! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap(); + block_on(sequential_storage::queue::try_repair( + &mut flash, + FLASH_RANGE, + )) + .unwrap(); corruption_repaired = true; retry = true; continue; @@ -291,7 +336,7 @@ fn fuzz(ops: Input) { }; for i in 0..*n { - match peeker.next(&mut buf) { + match block_on(peeker.next(&mut buf.0)) { Ok(value) => { assert_eq!( value.map(|b| &b[..]), @@ -309,8 +354,11 @@ fn fuzz(ops: Input) { "### Encountered curruption while peeking (many)! Repairing now. Originated from:\n{_backtrace:#}" ); - sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE) - .unwrap(); + block_on(sequential_storage::queue::try_repair( + &mut flash, + FLASH_RANGE, + )) + .unwrap(); corruption_repaired = true; retry = true; *n -= i; diff --git a/src/item.rs b/src/item.rs index 60d3a5d..dfcf381 100644 --- a/src/item.rs +++ b/src/item.rs @@ -25,12 +25,12 @@ use core::num::NonZeroU32; use core::ops::ControlFlow; use core::ops::Range; -use embedded_storage::nor_flash::{MultiwriteNorFlash, NorFlash}; +use embedded_storage_async::nor_flash::{MultiwriteNorFlash, NorFlash}; use crate::{ calculate_page_address, calculate_page_end_address, get_page_state, round_down_to_alignment, - round_down_to_alignment_usize, round_up_to_alignment, round_up_to_alignment_usize, Error, - NorFlashExt, PageState, MAX_WORD_SIZE, + round_down_to_alignment_usize, round_up_to_alignment, round_up_to_alignment_usize, AlignedBuf, + Error, NorFlashExt, PageState, MAX_WORD_SIZE, }; #[derive(Debug)] @@ -47,7 +47,7 @@ impl ItemHeader { const LENGTH_FIELD: Range = 4..6; const LENGTH_CRC_FIELD: Range = 6..8; - pub fn read_new( + pub async fn read_new( flash: &mut S, address: u32, end_address: u32, @@ -61,6 +61,7 @@ impl ItemHeader { flash .read(address, header_slice) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -94,7 +95,7 @@ impl ItemHeader { })) } - pub fn read_item<'d, S: NorFlash>( + pub async fn read_item<'d, S: NorFlash>( self, flash: &mut S, data_buffer: &'d mut [u8], @@ -115,6 +116,7 @@ impl ItemHeader { flash .read(data_address, &mut data_buffer[..read_len]) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -136,20 +138,21 @@ impl ItemHeader { } } - fn write(&self, flash: &mut S, address: u32) -> Result<(), Error> { - let mut buffer = [0xFF; MAX_WORD_SIZE]; + async fn write(&self, flash: &mut S, address: u32) -> Result<(), Error> { + let mut buffer = AlignedBuf([0xFF; MAX_WORD_SIZE]); - buffer[Self::DATA_CRC_FIELD] + buffer.0[Self::DATA_CRC_FIELD] .copy_from_slice(&self.crc.map(|crc| crc.get()).unwrap_or(0).to_le_bytes()); - buffer[Self::LENGTH_FIELD].copy_from_slice(&self.length.to_le_bytes()); - buffer[Self::LENGTH_CRC_FIELD] + buffer.0[Self::LENGTH_FIELD].copy_from_slice(&self.length.to_le_bytes()); + buffer.0[Self::LENGTH_CRC_FIELD] .copy_from_slice(&crc16(&self.length.to_le_bytes()).to_le_bytes()); flash .write( address, - &buffer[..round_up_to_alignment_usize::(Self::LENGTH)], + &buffer.0[..round_up_to_alignment_usize::(Self::LENGTH)], ) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -157,13 +160,13 @@ impl ItemHeader { }) } - pub fn erase_data( + pub async fn erase_data( mut self, flash: &mut S, address: u32, ) -> Result> { self.crc = None; - self.write(flash, address)?; + self.write(flash, address).await?; Ok(self) } @@ -201,7 +204,7 @@ impl<'d> Item<'d> { (self.header, self.data_buffer) } - pub fn write_new( + pub async fn write_new( flash: &mut S, address: u32, data: &'d [u8], @@ -211,24 +214,25 @@ impl<'d> Item<'d> { crc: Some(adapted_crc32(data)), }; - Self::write_raw(&header, data, flash, address)?; + Self::write_raw(&header, data, flash, address).await?; Ok(header) } - fn write_raw( + async fn write_raw( header: &ItemHeader, data: &[u8], flash: &mut S, address: u32, ) -> Result<(), Error> { - header.write(flash, address)?; + header.write(flash, address).await?; let (data_block, data_left) = data.split_at(round_down_to_alignment_usize::(data.len())); let data_address = ItemHeader::data_address::(address); flash .write(data_address, data_block) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -236,13 +240,14 @@ impl<'d> Item<'d> { })?; if !data_left.is_empty() { - let mut buffer = [0; MAX_WORD_SIZE]; - buffer[..data_left.len()].copy_from_slice(data_left); + let mut buffer = AlignedBuf([0; MAX_WORD_SIZE]); + buffer.0[..data_left.len()].copy_from_slice(data_left); flash .write( data_address + data_block.len() as u32, - &buffer[..round_up_to_alignment_usize::(data_left.len())], + &buffer.0[..round_up_to_alignment_usize::(data_left.len())], ) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -253,8 +258,12 @@ impl<'d> Item<'d> { Ok(()) } - pub fn write(&self, flash: &mut S, address: u32) -> Result<(), Error> { - Self::write_raw(&self.header, self.data(), flash, address) + pub async fn write( + &self, + flash: &mut S, + address: u32, + ) -> Result<(), Error> { + Self::write_raw(&self.header, self.data(), flash, address).await } } @@ -270,95 +279,21 @@ impl<'d> core::fmt::Debug for Item<'d> { } } -/// Reads all item headers between the start and end address. -/// -/// The callback is called with the flash, the found header and the address at which the item starts. -/// The callback can return break with a value to stop the iteration. That value is then returned by this function. -/// -/// The return value also includes the next item address. -pub fn read_item_headers( - flash: &mut S, - start_address: u32, - end_address: u32, - mut callback: impl FnMut(&mut S, ItemHeader, u32) -> ControlFlow, -) -> Result<(Option, u32), Error> { - let mut current_address = start_address; - - loop { - if current_address >= end_address { - return Ok((None, current_address)); - } - - match ItemHeader::read_new(flash, current_address, end_address) { - Ok(Some(header)) => { - let next_address = header.next_item_address::(current_address); - - match callback(flash, header, current_address) { - ControlFlow::Continue(_) => {} - ControlFlow::Break(r) => return Ok((Some(r), next_address)), - } - - current_address = next_address; - } - Ok(None) => return Ok((None, current_address)), - Err(Error::Corrupted { .. }) => { - current_address = ItemHeader::data_address::(current_address); - } - Err(e) => return Err(e), - }; - } -} - -pub fn read_items( - flash: &mut S, - start_address: u32, - end_address: u32, - data_buffer: &mut [u8], - mut callback: impl FnMut(&mut S, Item<'_>, u32) -> ControlFlow, -) -> Result, Error> { - read_item_headers( - flash, - start_address, - end_address, - |flash, header, address| match header.read_item(flash, data_buffer, address, end_address) { - Ok(MaybeItem::Corrupted(_, _)) => { - #[cfg(feature = "defmt")] - defmt::error!( - "Found a corrupted item at {:X}. Skipping...", - current_address - ); - ControlFlow::Continue(()) - } - Ok(MaybeItem::Erased(_)) => ControlFlow::Continue(()), - Ok(MaybeItem::Present(item)) => match callback(flash, item, address) { - ControlFlow::Continue(_) => ControlFlow::Continue(()), - ControlFlow::Break(r) => ControlFlow::Break(Ok(r)), - }, - Err(e) => ControlFlow::Break(Err(e)), - }, - )? - .0 - .transpose() -} - /// Scans through the items to find the first spot that is free to store a new item. /// /// - `end_address` is exclusive. -pub fn find_next_free_item_spot( +pub async fn find_next_free_item_spot( flash: &mut S, start_address: u32, end_address: u32, data_length: u32, ) -> Result, Error> { - let (_, free_item_address) = - read_item_headers(flash, start_address, end_address, |_, _, _| { - ControlFlow::<(), ()>::Continue(()) - })?; - + let mut it = HeaderIter::new(start_address, end_address); + let (_, free_item_address) = it.traverse(flash, |_, _| ControlFlow::Continue(())).await?; if let Some(available) = ItemHeader::available_data_bytes::(end_address - free_item_address) { if available >= data_length { - Ok(Some(free_item_address)) + return Ok(Some(free_item_address)); } else { Ok(None) } @@ -466,7 +401,7 @@ fn crc32_with_initial(data: &[u8], initial: u32) -> u32 { /// /// The page state can optionally be given if it's already known. /// In that case the state will not be checked again. -pub fn is_page_empty( +pub async fn is_page_empty( flash: &mut S, flash_range: Range, page_index: usize, @@ -474,7 +409,7 @@ pub fn is_page_empty( ) -> Result> { let page_state = match page_state { Some(page_state) => page_state, - None => get_page_state::(flash, flash_range.clone(), page_index)?, + None => get_page_state::(flash, flash_range.clone(), page_index).await?, }; match page_state { @@ -485,23 +420,130 @@ pub fn is_page_empty( calculate_page_end_address::(flash_range.clone(), page_index) - S::WORD_SIZE as u32; - Ok(read_item_headers( - flash, - page_data_start_address, - page_data_end_address, - |_, header, _| match header.crc { + let mut it = HeaderIter::new(page_data_start_address, page_data_end_address); + Ok(it + .traverse(flash, |header, _| match header.crc { Some(_) => ControlFlow::Break(()), None => ControlFlow::Continue(()), - }, - )? - .0 - .is_none()) + }) + .await? + .0 + .is_none()) } PageState::PartialOpen => Ok(false), PageState::Open => Ok(true), } } +pub struct ItemIter { + header: HeaderIter, +} + +impl ItemIter { + pub fn new(start_address: u32, end_address: u32) -> Self { + Self { + header: HeaderIter::new(start_address, end_address), + } + } + + pub async fn next<'m, S: NorFlash>( + &mut self, + flash: &mut S, + data_buffer: &'m mut [u8], + ) -> Result, u32)>, Error> { + while let (Some(header), address) = self.header.next(flash).await? { + let data_buffer = unsafe { + core::slice::from_raw_parts_mut(data_buffer.as_mut_ptr(), data_buffer.len()) + }; + + match header + .read_item(flash, data_buffer, address, self.header.end_address) + .await? + { + MaybeItem::Corrupted { .. } => { + #[cfg(feature = "defmt")] + defmt::error!( + "Found a corrupted item at {:X}. Skipping...", + self.header.current_address + ); + continue; + } + MaybeItem::Erased(_) => {} + MaybeItem::Present(item) => { + return Ok(Some((item, address))); + } + } + } + Ok(None) + } +} + +pub struct HeaderIter { + current_address: u32, + end_address: u32, +} + +impl HeaderIter { + pub fn new(start_address: u32, end_address: u32) -> Self { + Self { + current_address: start_address, + end_address, + } + } + + /// Fetch next item + pub async fn next( + &mut self, + flash: &mut S, + ) -> Result<(Option, u32), Error> { + self.traverse(flash, |_, _| ControlFlow::Break(())).await + } + + /// Traverse all headers until the callback determines the next element. + /// + /// If the end of the headers is reached, a `None` item header is returned. + pub async fn traverse( + &mut self, + flash: &mut S, + callback: impl Fn(&ItemHeader, u32) -> ControlFlow<(), ()>, + ) -> Result<(Option, u32), Error> { + if self.current_address >= self.end_address { + return Ok((None, self.current_address)); + } + + loop { + match ItemHeader::read_new(flash, self.current_address, self.end_address).await { + Ok(Some(header)) => { + let next_address = header.next_item_address::(self.current_address); + match callback(&header, self.current_address) { + ControlFlow::Continue(_) => { + self.current_address = next_address; + continue; + } + ControlFlow::Break(_) => { + let current_address = self.current_address; + self.current_address = next_address; + return Ok((Some(header), current_address)); + } + } + } + Ok(None) => { + return Ok((None, self.current_address)); + } + Err(Error::Corrupted { .. }) => { + #[cfg(feature = "defmt")] + defmt::error!( + "Found a corrupted item header at {:X}. Skipping...", + self.current_address + ); + self.current_address += S::WORD_SIZE as u32; + } + Err(e) => return Err(e), + } + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/lib.rs b/src/lib.rs index 2de71bc..2faddc2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ use core::{ fmt::Debug, ops::{ControlFlow, Range}, }; -use embedded_storage::nor_flash::NorFlash; +use embedded_storage_async::nor_flash::NorFlash; mod item; pub mod map; @@ -27,7 +27,7 @@ pub mod mock_flash; /// Many flashes have 4-byte or 1-byte words. const MAX_WORD_SIZE: usize = 32; -fn try_general_repair( +async fn try_general_repair( flash: &mut S, flash_range: Range, ) -> Result<(), Error> { @@ -35,7 +35,7 @@ fn try_general_repair( // the page is likely half-erased. Fix for that is to re-erase again to hopefully finish the job. for page_index in get_pages::(flash_range.clone(), 0) { if matches!( - get_page_state(flash, flash_range.clone(), page_index), + get_page_state(flash, flash_range.clone(), page_index).await, Err(Error::Corrupted { .. }) ) { flash @@ -43,6 +43,7 @@ fn try_general_repair( calculate_page_address::(flash_range.clone(), page_index), calculate_page_end_address::(flash_range.clone(), page_index), ) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -54,17 +55,21 @@ fn try_general_repair( Ok(()) } +/// Align the given buffer to the word size of the flash. +#[repr(align(4))] +struct AlignedBuf([u8; MAX_WORD_SIZE]); + /// Find the first page that is in the given page state. /// /// The search starts at starting_page_index (and wraps around back to 0 if required) -fn find_first_page( +async fn find_first_page( flash: &mut S, flash_range: Range, starting_page_index: usize, page_state: PageState, ) -> Result, Error> { for page_index in get_pages::(flash_range.clone(), starting_page_index) { - if page_state == get_page_state::(flash, flash_range.clone(), page_index)? { + if page_state == get_page_state::(flash, flash_range.clone(), page_index).await? { return Ok(Some(page_index)); } } @@ -121,7 +126,7 @@ const fn calculate_page_index(flash_range: Range, address: u32 const MARKER: u8 = 0; /// Get the state of the page located at the given index -fn get_page_state( +async fn get_page_state( flash: &mut S, flash_range: Range, page_index: usize, @@ -135,6 +140,7 @@ fn get_page_state( let mut buffer = [0; MAX_WORD_SIZE]; flash .read(page_address, &mut buffer[..S::READ_SIZE]) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -151,6 +157,7 @@ fn get_page_state( page_address + (S::ERASE_SIZE - S::READ_SIZE) as u32, &mut buffer[..S::READ_SIZE], ) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -175,24 +182,25 @@ fn get_page_state( } /// Fully closes a page by writing both the start and end marker -fn close_page( +async fn close_page( flash: &mut S, flash_range: Range, page_index: usize, ) -> Result<(), Error> { - let current_state = partial_close_page::(flash, flash_range.clone(), page_index)?; + let current_state = partial_close_page::(flash, flash_range.clone(), page_index).await?; if current_state != PageState::PartialOpen { return Ok(()); } - let buffer = [MARKER; MAX_WORD_SIZE]; + let buffer = AlignedBuf([MARKER; MAX_WORD_SIZE]); // Close the end marker flash .write( calculate_page_end_address::(flash_range, page_index) - S::WORD_SIZE as u32, - &buffer[..S::WORD_SIZE], + &buffer.0[..S::WORD_SIZE], ) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -203,24 +211,25 @@ fn close_page( } /// Partially close a page by writing the start marker -fn partial_close_page( +async fn partial_close_page( flash: &mut S, flash_range: Range, page_index: usize, ) -> Result> { - let current_state = get_page_state::(flash, flash_range.clone(), page_index)?; + let current_state = get_page_state::(flash, flash_range.clone(), page_index).await?; if current_state != PageState::Open { return Ok(current_state); } - let buffer = [MARKER; MAX_WORD_SIZE]; + let buffer = AlignedBuf([MARKER; MAX_WORD_SIZE]); // Close the start marker flash .write( calculate_page_address::(flash_range, page_index), - &buffer[..S::WORD_SIZE], + &buffer.0[..S::WORD_SIZE], ) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] @@ -377,6 +386,7 @@ where #[cfg(test)] mod tests { use super::*; + use futures::executor::block_on; type MockFlash = mock_flash::MockFlashBase<4, 4, 64>; @@ -390,57 +400,123 @@ mod tests { let mut flash = MockFlash::default(); // Page 0 markers - flash.write(0x000, &[MARKER, 0, 0, 0]).unwrap(); - flash.write(0x100 - 4, &[0, 0, 0, MARKER]).unwrap(); + block_on(flash.write_aligned::<256>(0x000, &[MARKER, 0, 0, 0])).unwrap(); + block_on(flash.write_aligned::<256>(0x100 - 4, &[0, 0, 0, MARKER])).unwrap(); // Page 1 markers - flash.write(0x100, &[MARKER, 0, 0, 0]).unwrap(); - flash.write(0x200 - 4, &[0, 0, 0, MARKER]).unwrap(); + block_on(flash.write_aligned::<256>(0x100, &[MARKER, 0, 0, 0])).unwrap(); + block_on(flash.write_aligned::<256>(0x200 - 4, &[0, 0, 0, MARKER])).unwrap(); // Page 2 markers - flash.write(0x200, &[MARKER, 0, 0, 0]).unwrap(); + block_on(flash.write_aligned::<256>(0x200, &[MARKER, 0, 0, 0])).unwrap(); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 0, PageState::Open).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 0, + PageState::Open + )) + .unwrap(), Some(3) ); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 0, PageState::PartialOpen).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 0, + PageState::PartialOpen + )) + .unwrap(), Some(2) ); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 1, PageState::PartialOpen).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 1, + PageState::PartialOpen + )) + .unwrap(), Some(2) ); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 2, PageState::PartialOpen).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 2, + PageState::PartialOpen + )) + .unwrap(), Some(2) ); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 3, PageState::Open).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 3, + PageState::Open + )) + .unwrap(), Some(3) ); assert_eq!( - find_first_page(&mut flash, 0x000..0x200, 0, PageState::PartialOpen).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x200, + 0, + PageState::PartialOpen + )) + .unwrap(), None ); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 0, PageState::Closed).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 0, + PageState::Closed + )) + .unwrap(), Some(0) ); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 1, PageState::Closed).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 1, + PageState::Closed + )) + .unwrap(), Some(1) ); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 2, PageState::Closed).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 2, + PageState::Closed + )) + .unwrap(), Some(0) ); assert_eq!( - find_first_page(&mut flash, 0x000..0x400, 3, PageState::Closed).unwrap(), + block_on(find_first_page( + &mut flash, + 0x000..0x400, + 3, + PageState::Closed + )) + .unwrap(), Some(0) ); assert_eq!( - find_first_page(&mut flash, 0x200..0x400, 0, PageState::Closed).unwrap(), + block_on(find_first_page( + &mut flash, + 0x200..0x400, + 0, + PageState::Closed + )) + .unwrap(), None ); } diff --git a/src/map.rs b/src/map.rs index 27ac52b..5f6bb86 100644 --- a/src/map.rs +++ b/src/map.rs @@ -10,6 +10,7 @@ //! ```rust //! # use sequential_storage::map::{store_item, fetch_item, StorageItem}; //! # use mock_flash::MockFlashBase; +//! # use futures::executor::block_on; //! # type Flash = MockFlashBase<10, 1, 4096>; //! # mod mock_flash { //! # include!("mock_flash.rs"); @@ -73,40 +74,38 @@ //! // Nothing is stored in it yet, so it will return None. //! //! assert_eq!( -//! fetch_item::( +//! block_on(fetch_item::( //! &mut flash, //! flash_range.clone(), //! &mut data_buffer, //! 42, -//! ).unwrap(), +//! )).unwrap(), //! None //! ); //! //! // Now we store an item the flash with key 42 //! -//! store_item::( +//! block_on(store_item::( //! &mut flash, //! flash_range.clone(), //! &mut data_buffer, //! MyCustomType { key: 42, data: 104729 }, -//! ).unwrap(); +//! )).unwrap(); //! //! // When we ask for key 42, we not get back a Some with the correct value //! //! assert_eq!( -//! fetch_item::( +//! block_on(fetch_item::( //! &mut flash, //! flash_range.clone(), //! &mut data_buffer, //! 42, -//! ).unwrap(), +//! )).unwrap(), //! Some(MyCustomType { key: 42, data: 104729 }) //! ); //! ``` -use core::ops::ControlFlow; - -use crate::item::{find_next_free_item_spot, read_items, Item, ItemHeader}; +use crate::item::{find_next_free_item_spot, Item, ItemHeader, ItemIter}; use super::*; @@ -120,21 +119,22 @@ use super::*; /// /// *Note: On a given flash range, make sure to use only the same type as [StorageItem] every time /// or types that serialize and deserialize the key in the same way.* -pub fn fetch_item( +pub async fn fetch_item( flash: &mut S, flash_range: Range, data_buffer: &mut [u8], search_key: I::Key, ) -> Result, MapError> { Ok( - fetch_item_with_location(flash, flash_range, data_buffer, search_key)? + fetch_item_with_location(flash, flash_range, data_buffer, search_key) + .await? .map(|(item, _, _)| item), ) } /// Fetch the item, but with the address and header #[allow(clippy::type_complexity)] -fn fetch_item_with_location( +async fn fetch_item_with_location( flash: &mut S, flash_range: Range, data_buffer: &mut [u8], @@ -149,7 +149,7 @@ fn fetch_item_with_location( // We need to find the page we were last using. This should be the only partial open page. let mut last_used_page = - find_first_page(flash, flash_range.clone(), 0, PageState::PartialOpen)?; + find_first_page(flash, flash_range.clone(), 0, PageState::PartialOpen).await?; #[cfg(feature = "defmt")] defmt::trace!("Fetch item, last used page: {}", last_used_page); @@ -158,10 +158,13 @@ fn fetch_item_with_location( // In the event that all pages are still open or the last used page was just closed, we search for the first open page. // If the page one before that is closed, then that's the last used page. if let Some(first_open_page) = - find_first_page(flash, flash_range.clone(), 0, PageState::Open)? + find_first_page(flash, flash_range.clone(), 0, PageState::Open).await? { let previous_page = previous_page::(flash_range.clone(), first_open_page); - if get_page_state(flash, flash_range.clone(), previous_page)?.is_closed() { + if get_page_state(flash, flash_range.clone(), previous_page) + .await? + .is_closed() + { last_used_page = Some(previous_page); } else { // The page before the open page is not closed, so it must be open. @@ -193,30 +196,15 @@ fn fetch_item_with_location( calculate_page_end_address::(flash_range.clone(), current_page_to_check) - S::WORD_SIZE as u32; - if let Some(e) = read_items( - flash, - page_data_start_address, - page_data_end_address, - data_buffer, - |_, item, address| { - if I::deserialize_key_only(item.data()) - .map_err(MapError::Item) - .to_controlflow()? - == search_key - { - newest_found_item = Some(( - I::deserialize_from(item.data()) - .map_err(MapError::Item) - .to_controlflow()?, - address, - item.header, - )); - } - - ControlFlow::, ()>::Continue(()) - }, - )? { - return Err(e); + let mut it = ItemIter::new(page_data_start_address, page_data_end_address); + while let Some((item, address)) = it.next(flash, data_buffer).await? { + if I::deserialize_key_only(item.data()).map_err(MapError::Item)? == search_key { + newest_found_item = Some(( + I::deserialize_from(item.data()).map_err(MapError::Item)?, + address, + item.header, + )); + } } // We've found the item! We can stop searching @@ -227,7 +215,7 @@ fn fetch_item_with_location( // We have not found the item. We've got to look in the previous page, but only if that page is closed and contains data. let previous_page = previous_page::(flash_range.clone(), current_page_to_check); - if get_page_state(flash, flash_range.clone(), previous_page)? != PageState::Closed { + if get_page_state(flash, flash_range.clone(), previous_page).await? != PageState::Closed { // We've looked through all the pages with data and couldn't find the item return Ok(None); } @@ -246,7 +234,7 @@ fn fetch_item_with_location( /// /// *Note: On a given flash range, make sure to use only the same type as [StorageItem] every time /// or types that serialize and deserialize the key in the same way.* -pub fn store_item( +pub async fn store_item( flash: &mut S, flash_range: Range, data_buffer: &mut [u8], @@ -260,15 +248,8 @@ pub fn store_item( assert!(S::ERASE_SIZE >= S::WORD_SIZE * 3); assert!(S::WORD_SIZE <= MAX_WORD_SIZE); - return store_item_inner::(flash, flash_range, data_buffer, item, 0); - - fn store_item_inner( - flash: &mut S, - flash_range: Range, - data_buffer: &mut [u8], - item: I, - recursion_level: usize, - ) -> Result<(), MapError> { + let mut recursion_level = 0; + loop { #[cfg(feature = "defmt")] defmt::trace!("Store item inner. Recursion: {}", recursion_level); @@ -279,7 +260,7 @@ pub fn store_item( // If there is a partial open page, we try to write in that first if there is enough space let next_page_to_use = if let Some(partial_open_page) = - find_first_page(flash, flash_range.clone(), 0, PageState::PartialOpen)? + find_first_page(flash, flash_range.clone(), 0, PageState::PartialOpen).await? { #[cfg(feature = "defmt")] defmt::trace!("Partial open page found: {}", partial_open_page); @@ -289,7 +270,8 @@ pub fn store_item( flash, flash_range.clone(), next_page::(flash_range.clone(), partial_open_page), - )? + ) + .await? .is_open() { // Oh oh, the next page which serves as the buffer page is not open. We're corrupt. @@ -318,11 +300,13 @@ pub fn store_item( page_data_start_address, page_data_end_address, item_data_length as u32, - )?; + ) + .await?; match free_spot_address { Some(free_spot_address) => { - Item::write_new(flash, free_spot_address, &data_buffer[..item_data_length])?; + Item::write_new(flash, free_spot_address, &data_buffer[..item_data_length]) + .await?; #[cfg(feature = "defmt")] defmt::trace!("Item has been written ok"); @@ -337,7 +321,7 @@ pub fn store_item( ); // The item doesn't fit here, so we need to close this page and move to the next - close_page(flash, flash_range.clone(), partial_open_page)?; + close_page(flash, flash_range.clone(), partial_open_page).await?; Some(next_page::(flash_range.clone(), partial_open_page)) } } @@ -355,7 +339,8 @@ pub fn store_item( match next_page_to_use { Some(next_page_to_use) => { - let next_page_state = get_page_state(flash, flash_range.clone(), next_page_to_use)?; + let next_page_state = + get_page_state(flash, flash_range.clone(), next_page_to_use).await?; if !next_page_state.is_open() { // What was the previous buffer page was not open... @@ -368,11 +353,11 @@ pub fn store_item( // Since we're gonna write data here, let's already partially close the page // This could be done after moving the data, but this is more robust in the // face of shutdowns and cancellations - partial_close_page(flash, flash_range.clone(), next_page_to_use)?; + partial_close_page(flash, flash_range.clone(), next_page_to_use).await?; let next_buffer_page = next_page::(flash_range.clone(), next_page_to_use); let next_buffer_page_state = - get_page_state(flash, flash_range.clone(), next_buffer_page)?; + get_page_state(flash, flash_range.clone(), next_buffer_page).await?; if !next_buffer_page_state.is_open() { migrate_items::( @@ -381,13 +366,14 @@ pub fn store_item( data_buffer, next_buffer_page, next_page_to_use, - )?; + ) + .await?; } } None => { // There's no partial open page, so we just gotta turn the first open page into a partial open one let first_open_page = - match find_first_page(flash, flash_range.clone(), 0, PageState::Open)? { + match find_first_page(flash, flash_range.clone(), 0, PageState::Open).await? { Some(first_open_page) => first_open_page, None => { #[cfg(feature = "defmt")] @@ -405,12 +391,12 @@ pub fn store_item( } }; - partial_close_page(flash, flash_range.clone(), first_open_page)?; + partial_close_page(flash, flash_range.clone(), first_open_page).await?; } } // If we get here, we just freshly partially closed a new page, so this should succeed - store_item_inner::(flash, flash_range, data_buffer, item, recursion_level + 1) + recursion_level += 1; } } @@ -519,7 +505,7 @@ impl PartialEq for MapError { } } -fn migrate_items( +async fn migrate_items( flash: &mut S, flash_range: Range, data_buffer: &mut [u8], @@ -532,47 +518,35 @@ fn migrate_items( let mut next_page_write_address = calculate_page_address::(flash_range.clone(), target_page) + S::WORD_SIZE as u32; - if let Some(e) = read_items( - flash, + let mut it = ItemIter::new( calculate_page_address::(flash_range.clone(), source_page) + S::WORD_SIZE as u32, calculate_page_end_address::(flash_range.clone(), source_page) - S::WORD_SIZE as u32, - data_buffer, - |flash, item, item_address| { - let key = I::deserialize_key_only(item.data()) - .map_err(MapError::Item) - .to_controlflow()?; - let (item_header, data_buffer) = item.destruct(); - - // Search for the newest item with the key we found - let Some((_, found_address, _)) = - fetch_item_with_location::(flash, flash_range.clone(), data_buffer, key) - .to_controlflow()? - else { - // We couldn't even find our own item? - return ControlFlow::Break(MapError::Corrupted { - #[cfg(feature = "_test")] - backtrace: std::backtrace::Backtrace::capture(), - }); - }; - - if found_address == item_address { - // The newest item with this key is the item we're about to erase - // This means we need to copy it over to the next_page_to_use - let item = item_header - .read_item(flash, data_buffer, item_address, u32::MAX) - .to_controlflow()? - .unwrap() - .to_controlflow()?; - item.write(flash, next_page_write_address) - .to_controlflow()?; - next_page_write_address = - item.header.next_item_address::(next_page_write_address); - } + ); + while let Some((item, item_address)) = it.next(flash, data_buffer).await? { + let key = I::deserialize_key_only(item.data()).map_err(MapError::Item)?; + let (item_header, data_buffer) = item.destruct(); + + // Search for the newest item with the key we found + let Some((_, found_address, _)) = + fetch_item_with_location::(flash, flash_range.clone(), data_buffer, key).await? + else { + // We couldn't even find our own item? + return Err(MapError::Corrupted { + #[cfg(feature = "_test")] + backtrace: std::backtrace::Backtrace::capture(), + }); + }; - ControlFlow::, ()>::Continue(()) - }, - )? { - return Err(e); + if found_address == item_address { + // The newest item with this key is the item we're about to erase + // This means we need to copy it over to the next_page_to_use + let item = item_header + .read_item(flash, data_buffer, item_address, u32::MAX) + .await? + .unwrap()?; + item.write(flash, next_page_write_address).await?; + next_page_write_address = item.header.next_item_address::(next_page_write_address); + } } flash @@ -580,6 +554,7 @@ fn migrate_items( calculate_page_address::(flash_range.clone(), source_page), calculate_page_end_address::(flash_range.clone(), source_page), ) + .await .map_err(|e| MapError::Storage { value: e, #[cfg(feature = "_test")] @@ -599,19 +574,22 @@ fn migrate_items( /// If this function or the function call after this crate returns [Error::Corrupted], then it's unlikely /// that the state can be recovered. To at least make everything function again at the cost of losing the data, /// erase the flash range. -pub fn try_repair( +pub async fn try_repair( flash: &mut S, flash_range: Range, data_buffer: &mut [u8], ) -> Result<(), MapError> { - crate::try_general_repair(flash, flash_range.clone())?; + crate::try_general_repair(flash, flash_range.clone()).await?; // Let's check if we corrupted in the middle of a migration if let Some(partial_open_page) = - find_first_page(flash, flash_range.clone(), 0, PageState::PartialOpen)? + find_first_page(flash, flash_range.clone(), 0, PageState::PartialOpen).await? { let buffer_page = next_page::(flash_range.clone(), partial_open_page); - if !get_page_state(flash, flash_range.clone(), buffer_page)?.is_open() { + if !get_page_state(flash, flash_range.clone(), buffer_page) + .await? + .is_open() + { // Yes, the migration got interrupted. Let's redo it. // To do that, we erase the partial open page first because it contains incomplete data. flash @@ -619,6 +597,7 @@ pub fn try_repair( calculate_page_address::(flash_range.clone(), partial_open_page), calculate_page_end_address::(flash_range.clone(), partial_open_page), ) + .await .map_err(|e| MapError::Storage { value: e, #[cfg(feature = "_test")] @@ -626,7 +605,7 @@ pub fn try_repair( })?; // Then partially close it again - partial_close_page(flash, flash_range.clone(), partial_open_page)?; + partial_close_page(flash, flash_range.clone(), partial_open_page).await?; migrate_items::( flash, @@ -634,7 +613,8 @@ pub fn try_repair( data_buffer, buffer_page, partial_open_page, - )?; + ) + .await?; } } @@ -644,6 +624,7 @@ pub fn try_repair( #[cfg(test)] mod tests { use super::*; + use futures::executor::block_on; type MockFlashBig = mock_flash::MockFlashBase<4, 4, 256>; type MockFlashTiny = mock_flash::MockFlashBase<2, 1, 32>; @@ -723,26 +704,34 @@ mod tests { let mut data_buffer = [0; 128]; - let item = - fetch_item::(&mut flash, flash_range.clone(), &mut data_buffer, 0) - .unwrap(); + let item = block_on(fetch_item::( + &mut flash, + flash_range.clone(), + &mut data_buffer, + 0, + )) + .unwrap(); assert_eq!(item, None); - let item = - fetch_item::(&mut flash, flash_range.clone(), &mut data_buffer, 60) - .unwrap(); + let item = block_on(fetch_item::( + &mut flash, + flash_range.clone(), + &mut data_buffer, + 60, + )) + .unwrap(); assert_eq!(item, None); - let item = fetch_item::( + let item = block_on(fetch_item::( &mut flash, flash_range.clone(), &mut data_buffer, 0xFF, - ) + )) .unwrap(); assert_eq!(item, None); - store_item::<_, _>( + block_on(store_item::<_, _>( &mut flash, flash_range.clone(), &mut data_buffer, @@ -750,9 +739,9 @@ mod tests { key: 0, value: vec![5], }, - ) + )) .unwrap(); - store_item::<_, _>( + block_on(store_item::<_, _>( &mut flash, flash_range.clone(), &mut data_buffer, @@ -760,17 +749,21 @@ mod tests { key: 0, value: vec![5, 6], }, - ) + )) .unwrap(); - let item = - fetch_item::(&mut flash, flash_range.clone(), &mut data_buffer, 0) - .unwrap() - .unwrap(); + let item = block_on(fetch_item::( + &mut flash, + flash_range.clone(), + &mut data_buffer, + 0, + )) + .unwrap() + .unwrap(); assert_eq!(item.key, 0); assert_eq!(item.value, vec![5, 6]); - store_item::<_, _>( + block_on(store_item::<_, _>( &mut flash, flash_range.clone(), &mut data_buffer, @@ -778,25 +771,33 @@ mod tests { key: 1, value: vec![2, 2, 2, 2, 2, 2], }, - ) + )) .unwrap(); - let item = - fetch_item::(&mut flash, flash_range.clone(), &mut data_buffer, 0) - .unwrap() - .unwrap(); + let item = block_on(fetch_item::( + &mut flash, + flash_range.clone(), + &mut data_buffer, + 0, + )) + .unwrap() + .unwrap(); assert_eq!(item.key, 0); assert_eq!(item.value, vec![5, 6]); - let item = - fetch_item::(&mut flash, flash_range.clone(), &mut data_buffer, 1) - .unwrap() - .unwrap(); + let item = block_on(fetch_item::( + &mut flash, + flash_range.clone(), + &mut data_buffer, + 1, + )) + .unwrap() + .unwrap(); assert_eq!(item.key, 1); assert_eq!(item.value, vec![2, 2, 2, 2, 2, 2]); for index in 0..4000 { - store_item::<_, _>( + block_on(store_item::<_, _>( &mut flash, flash_range.clone(), &mut data_buffer, @@ -804,17 +805,17 @@ mod tests { key: (index % 10) as u8, value: vec![(index % 10) as u8 * 2; index % 10], }, - ) + )) .unwrap(); } for i in 0..10 { - let item = fetch_item::( + let item = block_on(fetch_item::( &mut flash, flash_range.clone(), &mut data_buffer, i, - ) + )) .unwrap() .unwrap(); assert_eq!(item.key, i); @@ -822,7 +823,7 @@ mod tests { } for _ in 0..4000 { - store_item::<_, _>( + block_on(store_item::<_, _>( &mut flash, flash_range.clone(), &mut data_buffer, @@ -830,17 +831,17 @@ mod tests { key: 11, value: vec![0; 10], }, - ) + )) .unwrap(); } for i in 0..10 { - let item = fetch_item::( + let item = block_on(fetch_item::( &mut flash, flash_range.clone(), &mut data_buffer, i, - ) + )) .unwrap() .unwrap(); assert_eq!(item.key, i); @@ -867,11 +868,17 @@ mod tests { }; println!("Storing {item:?}"); - store_item::<_, _>(&mut tiny_flash, 0x00..0x40, &mut data_buffer, item).unwrap(); + block_on(store_item::<_, _>( + &mut tiny_flash, + 0x00..0x40, + &mut data_buffer, + item, + )) + .unwrap(); } assert_eq!( - store_item::<_, _>( + block_on(store_item::<_, _>( &mut tiny_flash, 0x00..0x40, &mut data_buffer, @@ -879,17 +886,17 @@ mod tests { key: UPPER_BOUND, value: vec![0; UPPER_BOUND as usize], }, - ), + )), Err(MapError::FullStorage) ); for i in 0..UPPER_BOUND { - let item = fetch_item::( + let item = block_on(fetch_item::( &mut tiny_flash, 0x00..0x40, &mut data_buffer, i as u8, - ) + )) .unwrap() .unwrap(); @@ -913,11 +920,17 @@ mod tests { }; println!("Storing {item:?}"); - store_item::<_, _>(&mut big_flash, 0x0000..0x1000, &mut data_buffer, item).unwrap(); + block_on(store_item::<_, _>( + &mut big_flash, + 0x0000..0x1000, + &mut data_buffer, + item, + )) + .unwrap(); } assert_eq!( - store_item::<_, _>( + block_on(store_item::<_, _>( &mut big_flash, 0x0000..0x1000, &mut data_buffer, @@ -925,17 +938,17 @@ mod tests { key: UPPER_BOUND, value: vec![0; UPPER_BOUND as usize], }, - ), + )), Err(MapError::FullStorage) ); for i in 0..UPPER_BOUND { - let item = fetch_item::( + let item = block_on(fetch_item::( &mut big_flash, 0x0000..0x1000, &mut data_buffer, i as u8, - ) + )) .unwrap() .unwrap(); @@ -961,17 +974,23 @@ mod tests { value: vec![i as u8; LENGHT_PER_KEY[i]], }; - store_item::<_, _>(&mut flash, 0x0000..0x4000, &mut data_buffer, item).unwrap(); + block_on(store_item::<_, _>( + &mut flash, + 0x0000..0x4000, + &mut data_buffer, + item, + )) + .unwrap(); } } for i in 0..24 { - let item = fetch_item::( + let item = block_on(fetch_item::( &mut flash, 0x0000..0x4000, &mut data_buffer, i as u8, - ) + )) .unwrap() .unwrap(); diff --git a/src/mock_flash.rs b/src/mock_flash.rs index 1f51318..113c6f8 100644 --- a/src/mock_flash.rs +++ b/src/mock_flash.rs @@ -1,5 +1,5 @@ use core::ops::Range; -use embedded_storage::nor_flash::{ +use embedded_storage_async::nor_flash::{ ErrorType, MultiwriteNorFlash, NorFlash, NorFlashError, NorFlashErrorKind, ReadNorFlash, }; @@ -102,10 +102,24 @@ impl } } + pub(crate) async fn write_aligned( + &mut self, + offset: u32, + bytes: &[u8], + ) -> Result<(), MockFlashError> { + #[repr(align(4))] + struct AlignedBuf([u8; SIZE]); + + let mut buf = AlignedBuf([0; SIZE]); + buf.0[..bytes.len()].copy_from_slice(bytes); + self.write(offset, &buf.0[..bytes.len()]).await + } + #[cfg(feature = "_test")] /// Print all items in flash to the returned string pub fn print_items(&mut self) -> String { use crate::NorFlashExt; + use futures::executor::block_on; use std::fmt::Write; let mut buf = [0; 1024 * 16]; @@ -119,7 +133,11 @@ impl writeln!( s, " Page {page_index} ({}):", - match crate::get_page_state(self, Self::FULL_FLASH_RANGE, page_index) { + match block_on(crate::get_page_state( + self, + Self::FULL_FLASH_RANGE, + page_index + )) { Ok(value) => format!("{value:?}"), Err(e) => format!("Error ({e:?})"), } @@ -132,24 +150,20 @@ impl crate::calculate_page_end_address::(Self::FULL_FLASH_RANGE, page_index) - Self::WORD_SIZE as u32; - crate::item::read_item_headers( - self, - page_data_start, - page_data_end, - |flash, header, item_address| { - let next_item_address = header.next_item_address::(item_address); - let maybe_item = header - .read_item(flash, &mut buf, item_address, page_data_end) + let mut it = crate::item::HeaderIter::new(page_data_start, page_data_end); + while let (Some(header), item_address) = + block_on(it.traverse(self, |_, _| core::ops::ControlFlow::Break(()))).unwrap() + { + let next_item_address = header.next_item_address::(item_address); + let maybe_item = + block_on(header.read_item(self, &mut buf, item_address, page_data_end)) .unwrap(); - writeln!( - s, - " Item {maybe_item:?} at {item_address}..{next_item_address}" - ) - .unwrap(); - core::ops::ControlFlow::<(), ()>::Continue(()) - }, - ) - .unwrap(); + writeln!( + s, + " Item {maybe_item:?} at {item_address}..{next_item_address}" + ) + .unwrap(); + } } s @@ -162,7 +176,10 @@ impl /// - If true, the item is present and fine. /// - If false, the item is corrupt or erased. pub fn get_item_presence(&mut self, target_item_address: u32) -> Option { + use core::ops::ControlFlow; + use crate::NorFlashExt; + use futures::executor::block_on; if !Self::FULL_FLASH_RANGE.contains(&target_item_address) { return None; @@ -180,36 +197,32 @@ impl crate::calculate_page_end_address::(Self::FULL_FLASH_RANGE, page_index) - Self::WORD_SIZE as u32; - let found_item = crate::item::read_item_headers( - self, - page_data_start, - page_data_end, - |flash, header, item_address| { - let next_item_address = header.next_item_address::(item_address); + let mut found_item = None; + let mut it = crate::item::HeaderIter::new(page_data_start, page_data_end); + while let (Some(header), item_address) = + block_on(it.traverse(self, |_, _| ControlFlow::Break(()))).unwrap() + { + let next_item_address = header.next_item_address::(item_address); - if (item_address..next_item_address).contains(&target_item_address) { - let maybe_item = header - .read_item(flash, &mut buf, item_address, page_data_end) + if (item_address..next_item_address).contains(&target_item_address) { + let maybe_item = + block_on(header.read_item(self, &mut buf, item_address, page_data_end)) .unwrap(); - match maybe_item { - crate::item::MaybeItem::Corrupted(_, _) - | crate::item::MaybeItem::Erased(_) => { - core::ops::ControlFlow::Break(Some(false)) - } - crate::item::MaybeItem::Present(_) => { - core::ops::ControlFlow::Break(Some(true)) - } + match maybe_item { + crate::item::MaybeItem::Corrupted(_, _) | crate::item::MaybeItem::Erased(_) => { + found_item.replace(false); + break; + } + crate::item::MaybeItem::Present(_) => { + found_item.replace(true); + break; } - } else { - core::ops::ControlFlow::Continue(()) } - }, - ) - .unwrap() - .0; + } + } - found_item.flatten() + found_item } } @@ -224,7 +237,7 @@ impl R { const READ_SIZE: usize = BYTES_PER_WORD; - fn read(&mut self, offset: u32, bytes: &mut [u8]) -> Result<(), Self::Error> { + async fn read(&mut self, offset: u32, bytes: &mut [u8]) -> Result<(), Self::Error> { self.reads += 1; if bytes.len() % Self::READ_SIZE != 0 { @@ -255,7 +268,7 @@ impl N const ERASE_SIZE: usize = Self::PAGE_BYTES; - fn erase(&mut self, from: u32, to: u32) -> Result<(), Self::Error> { + async fn erase(&mut self, from: u32, to: u32) -> Result<(), Self::Error> { self.erases += 1; let from = from as usize; @@ -283,11 +296,15 @@ impl N Ok(()) } - fn write(&mut self, offset: u32, bytes: &[u8]) -> Result<(), Self::Error> { + async fn write(&mut self, offset: u32, bytes: &[u8]) -> Result<(), Self::Error> { self.writes += 1; let range = Self::validate_operation(offset, bytes.len())?; + if bytes.as_ptr() as usize % Self::WRITE_SIZE != 0 { + panic!("write buffer must be aligned to Self::WRITE_SIZE bytes"); + } + if bytes.len() % Self::WRITE_SIZE != 0 { panic!("any write must be a multiple of Self::WRITE_SIZE bytes"); } diff --git a/src/queue.rs b/src/queue.rs index 4b2016d..92fe88d 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -5,6 +5,7 @@ //! ```rust //! # use sequential_storage::queue::{push, peek, pop}; //! # use mock_flash::MockFlashBase; +//! # use futures::executor::block_on; //! # type Flash = MockFlashBase<10, 1, 4096>; //! # mod mock_flash { //! # include!("mock_flash.rs"); @@ -26,35 +27,34 @@ //! let my_data = [10, 47, 29]; //! //! // We can push some data to the queue -//! -//! push(&mut flash, flash_range.clone(), &my_data, false).unwrap(); +//! block_on(push(&mut flash, flash_range.clone(), &my_data, false)).unwrap(); //! //! // We can peek at the oldest data //! //! assert_eq!( -//! &peek(&mut flash, flash_range.clone(), &mut data_buffer).unwrap().unwrap()[..], +//! &block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)).unwrap().unwrap()[..], //! &my_data[..] //! ); //! //! // With popping we get back the oldest data, but that data is now also removed //! //! assert_eq!( -//! &pop(&mut flash, flash_range.clone(), &mut data_buffer).unwrap().unwrap()[..], +//! &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)).unwrap().unwrap()[..], //! &my_data[..] //! ); //! //! // If we pop again, we find there's no data anymore //! //! assert_eq!( -//! pop(&mut flash, flash_range.clone(), &mut data_buffer), +//! block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)), //! Ok(None) //! ); //! ``` -use crate::item::{find_next_free_item_spot, is_page_empty, read_item_headers, Item, ItemHeader}; +use crate::item::{find_next_free_item_spot, is_page_empty, HeaderIter, Item, ItemHeader}; use super::*; -use embedded_storage::nor_flash::MultiwriteNorFlash; +use embedded_storage_async::nor_flash::MultiwriteNorFlash; /// Push data into the queue in the given flash memory with the given range. /// The data can only be taken out with the [pop] function. @@ -64,7 +64,7 @@ use embedded_storage::nor_flash::MultiwriteNorFlash; /// /// *Note: If a page is already used and you push more data than the remaining capacity of the page, /// the entire remaining capacity will go unused because the data is stored on the next page.* -pub fn push( +pub async fn push( flash: &mut S, flash_range: Range, data: &[u8], @@ -84,14 +84,14 @@ pub fn push( return Err(Error::BufferTooBig); } - let current_page = find_youngest_page(flash, flash_range.clone())?; + let current_page = find_youngest_page(flash, flash_range.clone()).await?; let page_data_start_address = calculate_page_address::(flash_range.clone(), current_page) + S::WORD_SIZE as u32; let page_data_end_address = calculate_page_end_address::(flash_range.clone(), current_page) - S::WORD_SIZE as u32; - partial_close_page(flash, flash_range.clone(), current_page)?; + partial_close_page(flash, flash_range.clone(), current_page).await?; // Find the last item on the page so we know where we need to write @@ -100,15 +100,16 @@ pub fn push( page_data_start_address, page_data_end_address, data.len() as u32, - )?; + ) + .await?; if next_address.is_none() { // No cap left on this page, move to the next page let next_page = next_page::(flash_range.clone(), current_page); - match get_page_state(flash, flash_range.clone(), next_page)? { + match get_page_state(flash, flash_range.clone(), next_page).await? { PageState::Open => { - close_page(flash, flash_range.clone(), current_page)?; - partial_close_page(flash, flash_range.clone(), next_page)?; + close_page(flash, flash_range.clone(), current_page).await?; + partial_close_page(flash, flash_range.clone(), next_page).await?; next_address = Some( calculate_page_address::(flash_range.clone(), next_page) + S::WORD_SIZE as u32, @@ -120,7 +121,7 @@ pub fn push( + S::WORD_SIZE as u32; if !allow_overwrite_old_data - && !is_page_empty(flash, flash_range.clone(), next_page, Some(state))? + && !is_page_empty(flash, flash_range.clone(), next_page, Some(state)).await? { return Err(Error::FullStorage); } @@ -130,14 +131,15 @@ pub fn push( calculate_page_address::(flash_range.clone(), next_page), calculate_page_end_address::(flash_range.clone(), next_page), ) + .await .map_err(|e| Error::Storage { value: e, #[cfg(feature = "_test")] backtrace: std::backtrace::Backtrace::capture(), })?; - close_page(flash, flash_range.clone(), current_page)?; - partial_close_page(flash, flash_range.clone(), next_page)?; + close_page(flash, flash_range.clone(), current_page).await?; + partial_close_page(flash, flash_range.clone(), next_page).await?; next_address = Some(next_page_data_start_address); } PageState::PartialOpen => { @@ -152,7 +154,7 @@ pub fn push( } } - Item::write_new(flash, next_address.unwrap(), data)?; + Item::write_new(flash, next_address.unwrap(), data).await?; Ok(()) } @@ -162,12 +164,12 @@ pub fn push( /// If you also want to remove the data use [pop_many]. /// /// Returns an iterator-like type that can be used to peek into the data. -pub fn peek_many( +pub async fn peek_many( flash: &mut S, flash_range: Range, ) -> Result, Error> { Ok(PeekIterator { - iter: QueueIterator::new(flash, flash_range)?, + iter: QueueIterator::new(flash, flash_range).await?, }) } @@ -181,12 +183,12 @@ pub fn peek_many( /// You should not depend on that data. /// /// If the data buffer is not big enough an error is returned. -pub fn peek<'d, S: NorFlash>( +pub async fn peek<'d, S: NorFlash>( flash: &mut S, flash_range: Range, data_buffer: &'d mut [u8], ) -> Result, Error> { - peek_many(flash, flash_range)?.next(data_buffer) + peek_many(flash, flash_range).await?.next(data_buffer).await } /// Pop the data from oldest to newest. @@ -194,12 +196,12 @@ pub fn peek<'d, S: NorFlash>( /// If you don't want to remove the data use [peek_many]. /// /// Returns an iterator-like type that can be used to pop the data. -pub fn pop_many( +pub async fn pop_many( flash: &mut S, flash_range: Range, ) -> Result, Error> { Ok(PopIterator { - iter: QueueIterator::new(flash, flash_range)?, + iter: QueueIterator::new(flash, flash_range).await?, }) } @@ -213,12 +215,12 @@ pub fn pop_many( /// You should not depend on that data. /// /// If the data buffer is not big enough an error is returned. -pub fn pop<'d, S: MultiwriteNorFlash>( +pub async fn pop<'d, S: MultiwriteNorFlash>( flash: &mut S, flash_range: Range, data_buffer: &'d mut [u8], ) -> Result, Error> { - pop_many(flash, flash_range)?.next(data_buffer) + pop_many(flash, flash_range).await?.next(data_buffer).await } /// Iterator for pop'ing elements in the queue. @@ -236,17 +238,17 @@ impl<'d, S: MultiwriteNorFlash> PopIterator<'d, S> { /// You should not depend on that data. /// /// If the data buffer is not big enough an error is returned. - pub fn next<'m>( + pub async fn next<'m>( &mut self, data_buffer: &'m mut [u8], ) -> Result, Error> { let reset_point = self.iter.create_reset_point(); - if let Some((item, item_address)) = self.iter.next(data_buffer)? { + if let Some((item, item_address)) = self.iter.next(data_buffer).await? { let (header, data_buffer) = item.destruct(); let ret = &mut data_buffer[..header.length as usize]; - match header.erase_data(self.iter.flash, item_address) { + match header.erase_data(self.iter.flash, item_address).await { Ok(_) => Ok(Some(ret)), Err(e) => { self.iter.recover_from_reset_point(reset_point); @@ -274,11 +276,11 @@ impl<'d, S: NorFlash> PeekIterator<'d, S> { /// You should not depend on that data. /// /// If the data buffer is not big enough an error is returned. - pub fn next<'m>( + pub async fn next<'m>( &mut self, data_buffer: &'m mut [u8], ) -> Result, Error> { - Ok(self.iter.next(data_buffer)?.map(|(item, _)| { + Ok(self.iter.next(data_buffer).await?.map(|(item, _)| { let (header, data_buffer) = item.destruct(); &mut data_buffer[..header.length as usize] })) @@ -307,7 +309,7 @@ enum CurrentAddress { } impl<'d, S: NorFlash> QueueIterator<'d, S> { - fn new(flash: &'d mut S, flash_range: Range) -> Result> { + async fn new(flash: &'d mut S, flash_range: Range) -> Result> { assert_eq!(flash_range.start % S::ERASE_SIZE as u32, 0); assert_eq!(flash_range.end % S::ERASE_SIZE as u32, 0); @@ -317,7 +319,7 @@ impl<'d, S: NorFlash> QueueIterator<'d, S> { // We start at the start of the oldest page let current_address = calculate_page_address::( flash_range.clone(), - find_oldest_page(flash, flash_range.clone())?, + find_oldest_page(flash, flash_range.clone()).await?, ) + S::WORD_SIZE as u32; Ok(Self { @@ -327,7 +329,7 @@ impl<'d, S: NorFlash> QueueIterator<'d, S> { }) } - fn next<'m>( + async fn next<'m>( &mut self, data_buffer: &'m mut [u8], ) -> Result, u32)>, Error> { @@ -338,8 +340,11 @@ impl<'d, S: NorFlash> QueueIterator<'d, S> { let (current_page, current_address) = match self.current_address { CurrentAddress::PageAfter(previous_page) => { let next_page = next_page::(self.flash_range.clone(), previous_page); - if get_page_state(self.flash, self.flash_range.clone(), next_page)?.is_open() - || next_page == find_oldest_page(self.flash, self.flash_range.clone())? + if get_page_state(self.flash, self.flash_range.clone(), next_page) + .await? + .is_open() + || next_page + == find_oldest_page(self.flash, self.flash_range.clone()).await? { return Ok(None); } @@ -363,50 +368,55 @@ impl<'d, S: NorFlash> QueueIterator<'d, S> { - S::WORD_SIZE as u32; // Search for the first item with data - if let (Some((found_item_header, found_item_address)), _) = read_item_headers( - self.flash, - current_address, - page_data_end_address, - |_, item_header, item_address| { - if item_header.crc.is_some() { - ControlFlow::Break((item_header, item_address)) - } else { - ControlFlow::Continue(()) - } - }, - )? { - let maybe_item = found_item_header.read_item( - self.flash, - data_buffer.take().unwrap(), - found_item_address, - page_data_end_address, - )?; - - match maybe_item { - item::MaybeItem::Corrupted(header, db) => { - let next_address = header.next_item_address::(found_item_address); - self.current_address = if next_address >= page_data_end_address { - CurrentAddress::PageAfter(current_page) + let mut it = HeaderIter::new(current_address, page_data_end_address); + loop { + if let (Some(found_item_header), found_item_address) = it + .traverse(self.flash, |header, _| { + if header.crc.is_some() { + ControlFlow::Break(()) } else { - CurrentAddress::Address(next_address) - }; - data_buffer.replace(db); - continue; - } - item::MaybeItem::Erased(_) => unreachable!("Item is already erased"), - item::MaybeItem::Present(item) => { - let next_address = item.header.next_item_address::(found_item_address); - self.current_address = if next_address >= page_data_end_address { - CurrentAddress::PageAfter(current_page) - } else { - CurrentAddress::Address(next_address) - }; - // Return the item we found - return Ok(Some((item, found_item_address))); + ControlFlow::Continue(()) + } + }) + .await? + { + let maybe_item = found_item_header + .read_item( + self.flash, + data_buffer.take().unwrap(), + found_item_address, + page_data_end_address, + ) + .await?; + + match maybe_item { + item::MaybeItem::Corrupted(header, db) => { + let next_address = header.next_item_address::(found_item_address); + self.current_address = if next_address >= page_data_end_address { + CurrentAddress::PageAfter(current_page) + } else { + CurrentAddress::Address(next_address) + }; + data_buffer.replace(db); + continue; + } + item::MaybeItem::Erased(_) => unreachable!("Item is already erased"), + item::MaybeItem::Present(item) => { + let next_address = + item.header.next_item_address::(found_item_address); + self.current_address = if next_address >= page_data_end_address { + CurrentAddress::PageAfter(current_page) + } else { + CurrentAddress::Address(next_address) + }; + // Return the item we found + return Ok(Some((item, found_item_address))); + } } + } else { + self.current_address = CurrentAddress::PageAfter(current_page); + break; } - } else { - self.current_address = CurrentAddress::PageAfter(current_page); } } } @@ -428,7 +438,7 @@ struct QueueIteratorResetPoint(CurrentAddress); /// data that can be stored, taking alignment requirements of the item into account. /// /// If there is no space left, `None` is returned. -pub fn find_max_fit( +pub async fn find_max_fit( flash: &mut S, flash_range: Range, ) -> Result, Error> { @@ -438,13 +448,13 @@ pub fn find_max_fit( assert!(S::ERASE_SIZE >= S::WORD_SIZE * 4); assert!(S::WORD_SIZE <= MAX_WORD_SIZE); - let current_page = find_youngest_page(flash, flash_range.clone())?; + let current_page = find_youngest_page(flash, flash_range.clone()).await?; // Check if we have space on the next page let next_page = next_page::(flash_range.clone(), current_page); - match get_page_state(flash, flash_range.clone(), next_page)? { + match get_page_state(flash, flash_range.clone(), next_page).await? { state @ PageState::Closed => { - if is_page_empty(flash, flash_range.clone(), next_page, Some(state))? { + if is_page_empty(flash, flash_range.clone(), next_page, Some(state)).await? { return Ok(Some((S::ERASE_SIZE - (2 * S::WORD_SIZE)) as u32)); } } @@ -466,31 +476,29 @@ pub fn find_max_fit( let page_data_end_address = calculate_page_end_address::(flash_range.clone(), current_page) - S::WORD_SIZE as u32; - let next_item_address = read_item_headers( - flash, - page_data_start_address, - page_data_end_address, - |_, _, _| ControlFlow::<(), ()>::Continue(()), - )? - .1; + let next_item_address = HeaderIter::new(page_data_start_address, page_data_end_address) + .traverse(flash, |_, _| ControlFlow::Continue(())) + .await? + .1; Ok(ItemHeader::available_data_bytes::( page_data_end_address - next_item_address, )) } -fn find_youngest_page( +async fn find_youngest_page( flash: &mut S, flash_range: Range, ) -> Result> { - let last_used_page = find_first_page(flash, flash_range.clone(), 0, PageState::PartialOpen)?; + let last_used_page = + find_first_page(flash, flash_range.clone(), 0, PageState::PartialOpen).await?; if let Some(last_used_page) = last_used_page { return Ok(last_used_page); } // We have no partial open page. Search for an open page to start in - let first_open_page = find_first_page(flash, flash_range, 0, PageState::Open)?; + let first_open_page = find_first_page(flash, flash_range, 0, PageState::Open).await?; if let Some(first_open_page) = first_open_page { return Ok(first_open_page); @@ -506,14 +514,15 @@ fn find_youngest_page( }) } -fn find_oldest_page( +async fn find_oldest_page( flash: &mut S, flash_range: Range, ) -> Result> { - let youngest_page = find_youngest_page(flash, flash_range.clone())?; + let youngest_page = find_youngest_page(flash, flash_range.clone()).await?; // The oldest page is the first non-open page after the youngest page - let oldest_closed_page = find_first_page(flash, flash_range, youngest_page, PageState::Closed)?; + let oldest_closed_page = + find_first_page(flash, flash_range, youngest_page, PageState::Closed).await?; Ok(oldest_closed_page.unwrap_or(youngest_page)) } @@ -528,11 +537,11 @@ fn find_oldest_page( /// If this function or the function call after this crate returns [Error::Corrupted], then it's unlikely /// that the state can be recovered. To at least make everything function again at the cost of losing the data, /// erase the flash range. -pub fn try_repair( +pub async fn try_repair( flash: &mut S, flash_range: Range, ) -> Result<(), Error> { - crate::try_general_repair(flash, flash_range.clone())?; + crate::try_general_repair(flash, flash_range.clone()).await?; Ok(()) } @@ -542,6 +551,7 @@ mod tests { use crate::mock_flash::WriteCountCheck; use super::*; + use futures::executor::block_on; type MockFlashBig = mock_flash::MockFlashBase<4, 4, 256>; type MockFlashTiny = mock_flash::MockFlashBase<2, 1, 32>; @@ -554,62 +564,86 @@ mod tests { const DATA_SIZE: usize = 22; assert_eq!( - peek(&mut flash, flash_range.clone(), &mut data_buffer).unwrap(), + block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)).unwrap(), None ); - push(&mut flash, flash_range.clone(), &[0xAA; DATA_SIZE], false).unwrap(); + block_on(push( + &mut flash, + flash_range.clone(), + &[0xAA; DATA_SIZE], + false, + )) + .unwrap(); assert_eq!( - &peek(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &[0xAA; DATA_SIZE] ); - push(&mut flash, flash_range.clone(), &[0xBB; DATA_SIZE], false).unwrap(); + block_on(push( + &mut flash, + flash_range.clone(), + &[0xBB; DATA_SIZE], + false, + )) + .unwrap(); assert_eq!( - &peek(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &[0xAA; DATA_SIZE] ); // Flash is full, this should fail - push(&mut flash, flash_range.clone(), &[0xCC; DATA_SIZE], false).unwrap_err(); + block_on(push( + &mut flash, + flash_range.clone(), + &[0xCC; DATA_SIZE], + false, + )) + .unwrap_err(); // Now we allow overwrite, so it should work - push(&mut flash, flash_range.clone(), &[0xDD; DATA_SIZE], true).unwrap(); + block_on(push( + &mut flash, + flash_range.clone(), + &[0xDD; DATA_SIZE], + true, + )) + .unwrap(); assert_eq!( - &peek(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &[0xBB; DATA_SIZE] ); assert_eq!( - &pop(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &[0xBB; DATA_SIZE] ); assert_eq!( - &peek(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &[0xDD; DATA_SIZE] ); assert_eq!( - &pop(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &[0xDD; DATA_SIZE] ); assert_eq!( - peek(&mut flash, flash_range.clone(), &mut data_buffer).unwrap(), + block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)).unwrap(), None ); assert_eq!( - pop(&mut flash, flash_range.clone(), &mut data_buffer).unwrap(), + block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)).unwrap(), None ); } @@ -624,23 +658,23 @@ mod tests { println!("{i}"); let data = vec![i as u8; i % 512 + 1]; - push(&mut flash, flash_range.clone(), &data, true).unwrap(); + block_on(push(&mut flash, flash_range.clone(), &data, true)).unwrap(); assert_eq!( - &peek(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &data, "At {i}" ); assert_eq!( - &pop(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &data, "At {i}" ); assert_eq!( - peek(&mut flash, flash_range.clone(), &mut data_buffer).unwrap(), + block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)).unwrap(), None, "At {i}" ); @@ -653,30 +687,34 @@ mod tests { let flash_range = 0x00..0x40; let mut data_buffer = [0; 1024]; - for i in 0..2000 { + for i in 0..1 { println!("{i}"); let data = vec![i as u8; i % 20 + 1]; - push(&mut flash, flash_range.clone(), &data, true).unwrap(); + println!("PUSH"); + block_on(push(&mut flash, flash_range.clone(), &data, true)).unwrap(); assert_eq!( - &peek(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &data, "At {i}" ); + println!("POP"); assert_eq!( - &pop(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &data, "At {i}" ); + println!("PEEK"); assert_eq!( - peek(&mut flash, flash_range.clone(), &mut data_buffer).unwrap(), + block_on(peek(&mut flash, flash_range.clone(), &mut data_buffer)).unwrap(), None, "At {i}" ); + println!("DONE"); } } @@ -696,26 +734,26 @@ mod tests { for i in 0..20 { let data = vec![i as u8; 50]; - push(&mut flash, flash_range.clone(), &data, false).unwrap(); + block_on(push(&mut flash, flash_range.clone(), &data, false)).unwrap(); add_ops(&mut flash, &mut push_ops); } - let mut peeker = peek_many(&mut flash, flash_range.clone()).unwrap(); + let mut peeker = block_on(peek_many(&mut flash, flash_range.clone())).unwrap(); for i in 0..5 { let mut data = vec![i as u8; 50]; assert_eq!( - peeker.next(&mut data_buffer).unwrap(), + block_on(peeker.next(&mut data_buffer)).unwrap(), Some(&mut data[..]), "At {i}" ); add_ops(peeker.iter.flash, &mut peek_ops); } - let mut popper = pop_many(&mut flash, flash_range.clone()).unwrap(); + let mut popper = block_on(pop_many(&mut flash, flash_range.clone())).unwrap(); for i in 0..5 { let data = vec![i as u8; 50]; assert_eq!( - &popper.next(&mut data_buffer).unwrap().unwrap()[..], + &block_on(popper.next(&mut data_buffer)).unwrap().unwrap()[..], &data, "At {i}" ); @@ -724,26 +762,26 @@ mod tests { for i in 20..25 { let data = vec![i as u8; 50]; - push(&mut flash, flash_range.clone(), &data, false).unwrap(); + block_on(push(&mut flash, flash_range.clone(), &data, false)).unwrap(); add_ops(&mut flash, &mut push_ops); } - let mut peeker = peek_many(&mut flash, flash_range.clone()).unwrap(); + let mut peeker = block_on(peek_many(&mut flash, flash_range.clone())).unwrap(); for i in 5..25 { let data = vec![i as u8; 50]; assert_eq!( - &peeker.next(&mut data_buffer).unwrap().unwrap()[..], + &block_on(peeker.next(&mut data_buffer)).unwrap().unwrap()[..], &data, "At {i}" ); add_ops(peeker.iter.flash, &mut peek_ops); } - let mut popper = pop_many(&mut flash, flash_range.clone()).unwrap(); + let mut popper = block_on(pop_many(&mut flash, flash_range.clone())).unwrap(); for i in 5..25 { let data = vec![i as u8; 50]; assert_eq!( - &popper.next(&mut data_buffer).unwrap().unwrap()[..], + &block_on(popper.next(&mut data_buffer)).unwrap().unwrap()[..], &data, "At {i}" ); @@ -775,14 +813,14 @@ mod tests { for i in 0..20 { let data = vec![i as u8; 50]; - push(&mut flash, flash_range.clone(), &data, false).unwrap(); + block_on(push(&mut flash, flash_range.clone(), &data, false)).unwrap(); add_ops(&mut flash, &mut push_ops); } for i in 0..5 { let data = vec![i as u8; 50]; assert_eq!( - &pop(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &data, @@ -793,14 +831,14 @@ mod tests { for i in 20..25 { let data = vec![i as u8; 50]; - push(&mut flash, flash_range.clone(), &data, false).unwrap(); + block_on(push(&mut flash, flash_range.clone(), &data, false)).unwrap(); add_ops(&mut flash, &mut push_ops); } for i in 5..25 { let data = vec![i as u8; 50]; assert_eq!( - &pop(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &data, @@ -856,19 +894,19 @@ mod tests { let flash_range = 0x00..0x40; let mut data_buffer = [0; 1024]; - push(&mut flash, flash_range.clone(), &[0xAA; 20], false).unwrap(); - push(&mut flash, flash_range.clone(), &[0xBB; 20], false).unwrap(); + block_on(push(&mut flash, flash_range.clone(), &[0xAA; 20], false)).unwrap(); + block_on(push(&mut flash, flash_range.clone(), &[0xBB; 20], false)).unwrap(); // There's now an unused gap at the end of the first page assert_eq!( - &pop(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &[0xAA; 20] ); assert_eq!( - &pop(&mut flash, flash_range.clone(), &mut data_buffer) + &block_on(pop(&mut flash, flash_range.clone(), &mut data_buffer)) .unwrap() .unwrap()[..], &[0xBB; 20] @@ -881,11 +919,17 @@ mod tests { const FLASH_RANGE: Range = 0x000..0x1000; - close_page(&mut flash, FLASH_RANGE, 0).unwrap(); - close_page(&mut flash, FLASH_RANGE, 1).unwrap(); - partial_close_page(&mut flash, FLASH_RANGE, 2).unwrap(); + block_on(close_page(&mut flash, FLASH_RANGE, 0)).unwrap(); + block_on(close_page(&mut flash, FLASH_RANGE, 1)).unwrap(); + block_on(partial_close_page(&mut flash, FLASH_RANGE, 2)).unwrap(); - assert_eq!(find_youngest_page(&mut flash, FLASH_RANGE).unwrap(), 2); - assert_eq!(find_oldest_page(&mut flash, FLASH_RANGE).unwrap(), 0); + assert_eq!( + block_on(find_youngest_page(&mut flash, FLASH_RANGE)).unwrap(), + 2 + ); + assert_eq!( + block_on(find_oldest_page(&mut flash, FLASH_RANGE)).unwrap(), + 0 + ); } }