Skip to content

Commit

Permalink
feat: add stream mode for e2store files (#1373)
Browse files Browse the repository at this point in the history
  • Loading branch information
KolbyML authored Aug 13, 2024
1 parent 8c19453 commit abf05b0
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 172 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions e2store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ surf = { version = "2.3.2", default-features = false, features = ["h1-client-rus

[dev-dependencies]
rstest = "0.18.2"
tempfile = "3.3.0"

8 changes: 6 additions & 2 deletions e2store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ E2store is a format originally developed by Nimbus as a framework for building o
## What is era?
era is a format for storing beacon chain data more information can be found here https://github.com/status-im/nimbus-eth2/blob/stable/docs/e2store.md#era-files

## What is era1
## What is era1?

era1 is a format for storing all of Ethereum's post merge blocks. It contains block headers, block bodies, and receipts for pre-merge block history which ranges block 0-15537394
era1 is a format for storing all of Ethereum's pre merge blocks. It contains block headers, block bodies, and receipts for pre-merge block history which ranges block 0-15537394

## What is the difference between `e2store/memory.rs` and `e2store/stream.rs`

`e2store/memory.rs` provides an api to load a full e2store file such as `.era`/`.era1` and manipulate it in memory. For smaller e2store files this approach works well. The issue comes when dealing with e2store files of much greater size loading the whole file into memory at once often isn't possible. This is where `e2store/stream.rs` comes in where you can stream the data you need from a e2store file as you need it. This will be required in `.era2` a format for storing full flat state snapshots.
125 changes: 8 additions & 117 deletions e2store/src/e2s.rs → e2store/src/e2store/memory.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
use anyhow::anyhow;
use ssz_derive::{Decode, Encode};

const _SLOTS_PER_HISTORICAL_ROOT: usize = 8192;
const HEADER_SIZE: u16 = 8;
const VALUE_SIZE_LIMIT: usize = 1024 * 1024 * 50; // 50 MB
use super::types::{Entry, Header};

pub struct E2StoreFile {
pub struct E2StoreMemory {
pub entries: Vec<Entry>,
}

impl TryFrom<E2StoreFile> for Vec<u8> {
impl TryFrom<E2StoreMemory> for Vec<u8> {
type Error = anyhow::Error;
fn try_from(e2store_file: E2StoreFile) -> Result<Vec<u8>, Self::Error> {
fn try_from(e2store_file: E2StoreMemory) -> Result<Vec<u8>, Self::Error> {
e2store_file.serialize()
}
}

#[allow(dead_code)]
impl E2StoreFile {
impl E2StoreMemory {
/// Serialize to a byte vector.
fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let length = self.entries.iter().map(|e| e.length() as u32).sum::<u32>() as usize;
Expand Down Expand Up @@ -51,7 +47,7 @@ impl E2StoreFile {
bytes[offset + 3],
bytes[offset + 4],
bytes[offset + 5],
]) + HEADER_SIZE as u32;
]) + Header::SERIALIZED_SIZE as u32;
let terminating_entry_index = offset + entry_length as usize;
if bytes.len() < terminating_entry_index {
return Err(anyhow!(
Expand All @@ -68,111 +64,6 @@ impl E2StoreFile {
}
}

/// Represents an e2store `Entry`
#[derive(Default, Debug, Eq, PartialEq, Clone)]
pub struct Entry {
pub header: Header,
pub value: Vec<u8>,
}

#[allow(dead_code)]
impl Entry {
pub fn new(type_: u16, value: Vec<u8>) -> Self {
Self {
header: Header {
type_,
length: value.len() as u32,
reserved: 0,
},
value,
}
}

pub fn length(&self) -> usize {
HEADER_SIZE as usize + self.header.length as usize
}

/// Serialize to a byte vector.
fn serialize(&self) -> anyhow::Result<Vec<u8>> {
let length = self.length();
let mut buf = vec![0; length];
self.write(&mut buf)?;
Ok(buf)
}

/// Write to a byte slice.
fn write(&self, buf: &mut [u8]) -> anyhow::Result<()> {
if self.length() != buf.len() {
return Err(anyhow!(
"found invalid buf length for entry: {} - expected {}",
buf.len(),
self.length()
));
}
if self.length() > VALUE_SIZE_LIMIT {
return Err(anyhow!(
"entry value size limit exceeded: {} - {}",
self.length(),
VALUE_SIZE_LIMIT
));
}
self.header.write(buf);
buf[8..].copy_from_slice(&self.value);
Ok(())
}

/// Deserialize from a byte slice.
pub fn deserialize(bytes: &[u8]) -> anyhow::Result<Self> {
let header = Header::deserialize(&bytes[0..8])?;
if header.length as usize + HEADER_SIZE as usize != bytes.len() {
return Err(anyhow!(
"found invalid buf length for entry: {} - expected {}",
bytes.len(),
header.length as usize + HEADER_SIZE as usize
));
}
Ok(Self {
header: Header::deserialize(&bytes[0..8])?,
value: bytes[8..].to_vec(),
})
}
}

/// Represents the header of an e2store `Entry`
#[derive(Clone, Debug, Decode, Encode, Default, Eq, PartialEq)]
pub struct Header {
pub type_: u16,
pub length: u32,
pub reserved: u16,
}

impl Header {
/// Write to a byte slice.
fn write(&self, buf: &mut [u8]) {
buf[0..2].copy_from_slice(&self.type_.to_le_bytes());
buf[2..6].copy_from_slice(&self.length.to_le_bytes());
buf[6..8].copy_from_slice(&self.reserved.to_le_bytes());
}

/// Deserialize from a byte slice.
fn deserialize(bytes: &[u8]) -> anyhow::Result<Self> {
if bytes.len() != HEADER_SIZE as usize {
return Err(anyhow!("invalid header size: {}", bytes.len()));
}
let type_ = u16::from_le_bytes([bytes[0], bytes[1]]);
let length = u32::from_le_bytes([bytes[2], bytes[3], bytes[4], bytes[5]]);
let reserved = u16::from_le_bytes([bytes[6], bytes[7]]);
if reserved != 0 {
return Err(anyhow!("invalid reserved value: {} - expected 0", reserved));
}
Ok(Self {
type_,
length,
reserved,
})
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -207,7 +98,7 @@ mod test {
#[test]
fn test_entry_multiple() {
let expected = "0x2a00020000000000beef0900040000000000abcdabcd";
let file = E2StoreFile::deserialize(&hex_decode(expected).unwrap()).unwrap();
let file = E2StoreMemory::deserialize(&hex_decode(expected).unwrap()).unwrap();
assert_eq!(file.entries.len(), 2);
assert_eq!(file.entries[0].header.type_, 0x2a); // 42
assert_eq!(file.entries[0].header.length, 2);
Expand All @@ -226,6 +117,6 @@ mod test {
#[case("0xbeef010000000000")] // length exceeds buffer
fn test_entry_invalid_decoding(#[case] input: &str) {
let buf = hex_decode(input).unwrap();
assert!(E2StoreFile::deserialize(&buf).is_err());
assert!(E2StoreMemory::deserialize(&buf).is_err());
}
}
3 changes: 3 additions & 0 deletions e2store/src/e2store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod memory;
pub mod stream;
pub mod types;
87 changes: 87 additions & 0 deletions e2store/src/e2store/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::{
fs::File,
io::{Read, Write},
path::PathBuf,
};

use super::types::{Entry, Header};

/// e2store/memory.rs was built to load full .era/.era2 files into memory and provide a simple API
/// to access the data. The issue for this is for larger files this wouldn't be feasible, as the
/// entire file would need to be loaded into memory. This is where e2store_file.rs comes in, it
/// provides a way to read and write e2store files in a streaming fashion.
pub struct E2StoreStream {
pub e2store_file: File,
}

impl E2StoreStream {
pub fn open(e2store_path: &PathBuf) -> anyhow::Result<Self> {
let e2store_file = File::open(e2store_path)?;
Ok(Self { e2store_file })
}

pub fn create(e2store_path: &PathBuf) -> anyhow::Result<Self> {
let e2store_file = File::create(e2store_path)?;
Ok(Self { e2store_file })
}

pub fn next_entry(&mut self) -> anyhow::Result<Entry> {
let mut buf = vec![0; 8];
self.e2store_file.read_exact(&mut buf)?;
let header = Header::deserialize(&buf)?;
let mut value = vec![0; header.length as usize];
self.e2store_file.read_exact(&mut value)?;
Ok(Entry { header, value })
}

/// Append an entry to the e2store file.
pub fn append_entry(&mut self, entry: &Entry) -> anyhow::Result<()> {
let buf = entry.serialize()?;
self.e2store_file.write_all(&buf)?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use rand::Rng;
use tempfile::TempDir;

use crate::e2store::types::VersionEntry;

use super::*;

#[test]
fn test_e2store_stream_write_and_read() -> anyhow::Result<()> {
// setup
let mut rng = rand::thread_rng();
let tmp_dir = TempDir::new()?;
let random_number: u16 = rng.gen();
let tmp_path = tmp_dir
.as_ref()
.to_path_buf()
.join(format!("{}.e2store_stream_test", random_number));

// create a new e2store file and write some data to it
let mut e2store_write_stream = E2StoreStream::create(&tmp_path)?;

let version = VersionEntry::default();
e2store_write_stream.append_entry(&version.clone().into())?;

let value: Vec<u8> = (0..100).map(|_| rng.gen_range(0..20)).collect();
let entry = Entry::new(0, value);
e2store_write_stream.append_entry(&entry)?;
drop(e2store_write_stream);

// read results and see if they match
let mut e2store_read_stream = E2StoreStream::open(&tmp_path)?;
let read_version_entry = VersionEntry::try_from(&e2store_read_stream.next_entry()?)?;
assert_eq!(version, read_version_entry);
let read_entry = e2store_read_stream.next_entry()?;
assert_eq!(entry, read_entry);

// cleanup
tmp_dir.close()?;
Ok(())
}
}
Loading

0 comments on commit abf05b0

Please sign in to comment.