Skip to content

Commit

Permalink
feat: add async API replacing blocking api
Browse files Browse the repository at this point in the history
Changes the sequential-storage APIs for queues and maps to be async first.

Non-async behavior is achieved using a block_on, optionally combined
with an adapter for the flash supporting embedded-storage-async.
  • Loading branch information
lulf committed Jan 3, 2024
1 parent 51ad715 commit 642aa2a
Show file tree
Hide file tree
Showing 10 changed files with 770 additions and 515 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ keywords = ["no_std", "embedded", "flash", "storage"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
embedded-storage = "0.3.0"
embedded-storage-async = "0.4.1"
defmt = { version = "0.3", optional = true }
futures = { version = "0.3.30", features = ["executor"], optional = true }

[dev-dependencies]
approx = "0.5.1"
futures = { version = "0.3.30", features = ["executor"] }

[features]
defmt = ["dep:defmt"]
_test = []
_test = ["futures"]
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ When using peek_many, you can look at all data from oldest to newest.
aid with shutdown/cancellation issues.
- When the state is corrupted, many issues can now be repaired with the repair functions in the map and queue modules
- Made changes to the entire to better survive shutoffs
- *Breaking* Convert API to async first supporting the traits from embedded-storage-async. Flash
drivers supporting `sequential-storage` can be wrapped using
[BlockingAsync](https://docs.embassy.dev/embassy-embedded-hal/git/default/adapter/struct.BlockingAsync.html), and a
simple [blocking executor](https://docs.rs/futures/0.3.30/futures/executor/fn.block_on.html) can be used to call the
API from a non-async function.

### 0.6.2 - 22-12-23

Expand Down
1 change: 1 addition & 0 deletions fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ sequential-storage = { path = "..", features = ["_test"] }
arbitrary = { version = "1.2.2", features = ["derive"] }
rand = "0.8.5"
rand_pcg = "0.3.1"
futures = { version = "0.3.30", features = ["executor"] }

# Prevent this from interfering with workspaces
[workspace]
Expand Down
21 changes: 11 additions & 10 deletions fuzz/fuzz_targets/map.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![no_main]

use futures::executor::block_on;
use libfuzzer_sys::arbitrary::Arbitrary;
use libfuzzer_sys::fuzz_target;
use rand::SeedableRng;
Expand Down Expand Up @@ -117,12 +118,12 @@ fn fuzz(ops: Input) {
match op.clone() {
Op::Store(op) => {
let item = op.into_test_item(&mut rng);
match sequential_storage::map::store_item(
match block_on(sequential_storage::map::store_item(
&mut flash,
FLASH_RANGE,
&mut buf,
item.clone(),
) {
)) {
Ok(_) => {
map.insert(item.key, item.value);
}
Expand All @@ -131,12 +132,12 @@ fn fuzz(ops: Input) {
value: MockFlashError::EarlyShutoff(_),
backtrace: _backtrace,
}) => {
match sequential_storage::map::fetch_item::<TestItem, _>(
match block_on(sequential_storage::map::fetch_item::<TestItem, _>(
&mut flash,
FLASH_RANGE,
&mut buf,
item.key,
) {
)) {
Ok(Some(check_item))
if check_item.key == item.key
&& check_item.value == item.value =>
Expand All @@ -161,11 +162,11 @@ fn fuzz(ops: Input) {
"### Encountered curruption while storing! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::map::try_repair::<TestItem, _>(
block_on(sequential_storage::map::try_repair::<TestItem, _>(
&mut flash,
FLASH_RANGE,
&mut buf,
)
))
.unwrap();
corruption_repaired = true;
retry = true;
Expand All @@ -174,12 +175,12 @@ fn fuzz(ops: Input) {
}
}
Op::Fetch(key) => {
match sequential_storage::map::fetch_item::<TestItem, _>(
match block_on(sequential_storage::map::fetch_item::<TestItem, _>(
&mut flash,
FLASH_RANGE,
&mut buf,
key,
) {
)) {
Ok(Some(fetch_result)) => {
let map_value = map
.get(&key)
Expand Down Expand Up @@ -207,11 +208,11 @@ fn fuzz(ops: Input) {
"### Encountered curruption while fetching! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::map::try_repair::<TestItem, _>(
block_on(sequential_storage::map::try_repair::<TestItem, _>(
&mut flash,
FLASH_RANGE,
&mut buf,
)
))
.unwrap();
corruption_repaired = true;
retry = true;
Expand Down
92 changes: 70 additions & 22 deletions fuzz/fuzz_targets/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![no_main]

use futures::executor::block_on;
use libfuzzer_sys::arbitrary::Arbitrary;
use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -33,6 +34,9 @@ struct PushOp {
value_len: u8,
}

#[repr(align(4))]
struct AlignedBuf([u8; MAX_VALUE_SIZE + 1]);

fn fuzz(ops: Input) {
const PAGES: usize = 4;
const WORD_SIZE: usize = 4;
Expand All @@ -45,7 +49,7 @@ fn fuzz(ops: Input) {
const FLASH_RANGE: Range<u32> = 0x000..0x1000;

let mut order = VecDeque::new();
let mut buf = [0; MAX_VALUE_SIZE + 1];
let mut buf = AlignedBuf([0; MAX_VALUE_SIZE + 1]);

let mut rng = rand_pcg::Pcg32::seed_from_u64(ops.seed);

Expand All @@ -66,10 +70,10 @@ fn fuzz(ops: Input) {
Op::Push(op) => {
let val: Vec<u8> = (0..op.value_len as usize % 16).map(|_| rng.gen()).collect();

let max_fit = match sequential_storage::queue::find_max_fit(
let max_fit = match block_on(sequential_storage::queue::find_max_fit(
&mut flash,
FLASH_RANGE,
) {
)) {
Ok(val) => val,
Err(Error::Corrupted {
backtrace: _backtrace,
Expand All @@ -79,15 +83,25 @@ fn fuzz(ops: Input) {
"### Encountered curruption while finding max fit! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap();
block_on(sequential_storage::queue::try_repair(
&mut flash,
FLASH_RANGE,
))
.unwrap();
corruption_repaired = true;
retry = true;
continue;
}
Err(e) => panic!("Error while finding max fit: {e:?}"),
};

match sequential_storage::queue::push(&mut flash, FLASH_RANGE, &val, false) {
buf.0[..val.len()].copy_from_slice(&val);
match block_on(sequential_storage::queue::push(
&mut flash,
FLASH_RANGE,
&buf.0[..val.len()],
false,
)) {
Ok(_) => {
if let Some(max_fit) = max_fit {
if val.len() > max_fit as usize {
Expand Down Expand Up @@ -128,15 +142,23 @@ fn fuzz(ops: Input) {
"### Encountered curruption while pushing! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap();
block_on(sequential_storage::queue::try_repair(
&mut flash,
FLASH_RANGE,
))
.unwrap();
corruption_repaired = true;
retry = true;
}
Err(e) => panic!("Error pushing to queue: {e:?}"),
}
}
Op::Pop => {
match sequential_storage::queue::pop(&mut flash, FLASH_RANGE, &mut buf) {
match block_on(sequential_storage::queue::pop(
&mut flash,
FLASH_RANGE,
&mut buf.0,
)) {
Ok(value) => {
assert_eq!(
value,
Expand Down Expand Up @@ -168,18 +190,22 @@ fn fuzz(ops: Input) {
"### Encountered curruption while popping (single)! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap();
block_on(sequential_storage::queue::try_repair(
&mut flash,
FLASH_RANGE,
))
.unwrap();
corruption_repaired = true;
retry = true;
}
Err(e) => panic!("Error popping (single) from queue: {e:?}"),
}
}
Op::PopMany(n) => {
let mut popper = match sequential_storage::queue::pop_many(
let mut popper = match block_on(sequential_storage::queue::pop_many(
&mut flash,
FLASH_RANGE,
) {
)) {
Ok(val) => val,
Err(Error::Corrupted {
backtrace: _backtrace,
Expand All @@ -189,7 +215,11 @@ fn fuzz(ops: Input) {
"### Encountered curruption while creating popper! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap();
block_on(sequential_storage::queue::try_repair(
&mut flash,
FLASH_RANGE,
))
.unwrap();
corruption_repaired = true;
retry = true;
continue;
Expand All @@ -198,7 +228,7 @@ fn fuzz(ops: Input) {
};

for i in 0..*n {
match popper.next(&mut buf) {
match block_on(popper.next(&mut buf.0)) {
Ok(value) => {
assert_eq!(
value,
Expand Down Expand Up @@ -234,8 +264,11 @@ fn fuzz(ops: Input) {
"### Encountered curruption while popping (many)! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE)
.unwrap();
block_on(sequential_storage::queue::try_repair(
&mut flash,
FLASH_RANGE,
))
.unwrap();
corruption_repaired = true;
retry = true;
*n -= i;
Expand All @@ -246,7 +279,11 @@ fn fuzz(ops: Input) {
}
}
Op::Peek => {
match sequential_storage::queue::peek(&mut flash, FLASH_RANGE, &mut buf) {
match block_on(sequential_storage::queue::peek(
&mut flash,
FLASH_RANGE,
&mut buf.0,
)) {
Ok(value) => {
assert_eq!(
value.map(|b| &b[..]),
Expand All @@ -261,18 +298,22 @@ fn fuzz(ops: Input) {
"### Encountered curruption while peeking (single)! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap();
block_on(sequential_storage::queue::try_repair(
&mut flash,
FLASH_RANGE,
))
.unwrap();
corruption_repaired = true;
retry = true;
}
Err(e) => panic!("Error popping (single) from queue: {e:?}"),
}
}
Op::PeekMany(n) => {
let mut peeker = match sequential_storage::queue::peek_many(
let mut peeker = match block_on(sequential_storage::queue::peek_many(
&mut flash,
FLASH_RANGE,
) {
)) {
Ok(val) => val,
Err(Error::Corrupted {
backtrace: _backtrace,
Expand All @@ -282,7 +323,11 @@ fn fuzz(ops: Input) {
"### Encountered curruption while creating peeker! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE).unwrap();
block_on(sequential_storage::queue::try_repair(
&mut flash,
FLASH_RANGE,
))
.unwrap();
corruption_repaired = true;
retry = true;
continue;
Expand All @@ -291,7 +336,7 @@ fn fuzz(ops: Input) {
};

for i in 0..*n {
match peeker.next(&mut buf) {
match block_on(peeker.next(&mut buf.0)) {
Ok(value) => {
assert_eq!(
value.map(|b| &b[..]),
Expand All @@ -309,8 +354,11 @@ fn fuzz(ops: Input) {
"### Encountered curruption while peeking (many)! Repairing now. Originated from:\n{_backtrace:#}"
);

sequential_storage::queue::try_repair(&mut flash, FLASH_RANGE)
.unwrap();
block_on(sequential_storage::queue::try_repair(
&mut flash,
FLASH_RANGE,
))
.unwrap();
corruption_repaired = true;
retry = true;
*n -= i;
Expand Down
Loading

0 comments on commit 642aa2a

Please sign in to comment.