Skip to content

Commit

Permalink
feat: add pagination to the id_indexed_v1 store (#1226)
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev authored Apr 2, 2024
1 parent 36ee0c6 commit 2e59f62
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 6 deletions.
18 changes: 12 additions & 6 deletions trin-state/src/jsonrpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{fmt::Debug, sync::Arc};

use discv5::{enr::NodeId, Enr};
use portalnet::overlay::errors::OverlayRequestError;
Expand Down Expand Up @@ -83,8 +83,8 @@ impl StateRequestHandler {
)
.await
}
StateEndpoint::PaginateLocalContentKeys(_, _) => {
Err("Pagination not implemented for state network".to_string())
StateEndpoint::PaginateLocalContentKeys(offset, limit) => {
paginate(network, offset, limit)
}
};

Expand Down Expand Up @@ -296,8 +296,7 @@ async fn store(
.store
.write()
.put(content_key, content_value.encode())
.map(|_| true)
.map_err(|err| OverlayRequestError::Failure(err.to_string())),
.map(|_| true),
)
}

Expand Down Expand Up @@ -353,9 +352,16 @@ async fn gossip(
}
}

fn paginate(network: Arc<StateNetwork>, offset: u64, limit: u64) -> Result<Value, String> {
to_json_result(
"PaginateLocalContentKeys",
network.overlay.store.read().paginate(offset, limit),
)
}

fn to_json_result(
request: &str,
result: Result<impl Serialize, OverlayRequestError>,
result: Result<impl Serialize, impl Debug>,
) -> Result<Value, String> {
result
.map(|value| json!(value))
Expand Down
15 changes: 15 additions & 0 deletions trin-state/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ethportal_api::{
content_value::state::{ContractBytecode, TrieNode},
distance::Distance,
portal_wire::ProtocolId,
state::PaginateLocalContentInfo,
},
ContentValue, OverlayContentKey, StateContentKey, StateContentValue,
};
Expand Down Expand Up @@ -82,6 +83,20 @@ impl StateStorage {
})
}

/// Returns a paginated list of all locally available content keys, according to the provided
/// offset and limit.
pub fn paginate(
&self,
offset: u64,
limit: u64,
) -> Result<PaginateLocalContentInfo, ContentStoreError> {
let paginate_result = self.store.paginate(offset, limit)?;
Ok(PaginateLocalContentInfo {
content_keys: paginate_result.content_keys,
total_entries: paginate_result.entry_count,
})
}

/// Get a summary of the current state of storage
pub fn get_summary_info(&self) -> String {
self.store.get_summary_info()
Expand Down
10 changes: 10 additions & 0 deletions trin-storage/src/versioned/id_indexed_v1/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ pub fn lookup_farthest(content_type: &ContentType) -> String {
)
}

pub fn paginate(content_type: &ContentType) -> String {
format!(
"SELECT content_key FROM {}
ORDER BY content_key
LIMIT :limit
OFFSET :offset",
table_name(content_type)
)
}

pub fn entry_count_and_size(content_type: &ContentType) -> String {
format!(
"SELECT COUNT(*) as count, TOTAL(content_size) as used_capacity FROM {}",
Expand Down
131 changes: 131 additions & 0 deletions trin-storage/src/versioned/id_indexed_v1/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ struct FarthestQueryResult {
distance_u32: u32,
}

/// The result of the pagination lookup.
#[derive(Debug, PartialEq, Eq)]
pub struct PaginateResult<K> {
/// The content keys of the queried page
pub content_keys: Vec<K>,
/// The total count of entries in the database
pub entry_count: u64,
}

/// The store for storing content key/value pairs.
///
/// Different SQL table is created for each `ContentType`, with content-id as a primary key.
Expand Down Expand Up @@ -289,6 +298,40 @@ impl IdIndexedV1Store {
Ok(())
}

/// Returns a paginated list of all locally available content keys, according to the provided
/// offset and limit.
pub fn paginate<K: ethportal_api::OverlayContentKey>(
&self,
offset: u64,
limit: u64,
) -> Result<PaginateResult<K>, ContentStoreError> {
let timer = self.metrics.start_process_timer("paginate");

let conn = self.config.sql_connection_pool.get()?;
let content_keys: Result<Vec<K>, rusqlite::Error> = conn
.prepare(&sql::paginate(&self.config.content_type))?
.query_map(
named_params! {
":limit": limit,
":offset": offset,
},
|row| {
let bytes = row.get::<&str, Vec<u8>>("content_key")?;
K::try_from(bytes).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(0, Type::Blob, e.into())
})
},
)?
.collect();
let usage_stats = self.get_usage_stats_internal()?;

self.metrics.stop_process_timer(timer);
Ok(PaginateResult {
content_keys: content_keys?,
entry_count: usage_stats.entry_count,
})
}

/// Updates metrics and returns summary.
pub fn get_summary_info(&self) -> String {
// Call `get_usage_stats` to update metrics.
Expand Down Expand Up @@ -855,4 +898,92 @@ mod tests {

Ok(())
}

#[test]
fn pagination_empty() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = create_config(&temp_dir);
let store = IdIndexedV1Store::create(ContentType::State, config)?;

assert_eq!(
store.paginate::<IdentityContentKey>(/* offset= */ 0, /* limit= */ 10)?,
PaginateResult {
content_keys: vec![],
entry_count: 0,
}
);
Ok(())
}

#[test]
fn pagination() -> Result<()> {
let temp_dir = TempDir::new()?;
let config = create_config(&temp_dir);
let mut store = IdIndexedV1Store::create(ContentType::State, config.clone())?;

let entry_count = 12;

let mut content_keys = vec![];
for _ in 0..entry_count {
let (key, value) = generate_key_value(&config, 0);
store.insert(&key, value).unwrap();
content_keys.push(key);
}
content_keys.sort_by_key(|key| key.to_vec());

// Paginate in steps of 4, there should be exactly 3 pages
assert_eq!(
store.paginate::<IdentityContentKey>(/* offset= */ 0, /* limit= */ 4)?,
PaginateResult {
content_keys: content_keys[0..4].into(),
entry_count,
}
);
assert_eq!(
store.paginate::<IdentityContentKey>(/* offset= */ 4, /* limit= */ 4)?,
PaginateResult {
content_keys: content_keys[4..8].into(),
entry_count,
}
);
assert_eq!(
store.paginate::<IdentityContentKey>(/* offset= */ 8, /* limit= */ 4)?,
PaginateResult {
content_keys: content_keys[8..].into(),
entry_count,
}
);
assert_eq!(
store.paginate::<IdentityContentKey>(/* offset= */ 12, /* limit= */ 4)?,
PaginateResult {
content_keys: vec![],
entry_count,
}
);

// Paginate in steps of 5, last page should have only 2
assert_eq!(
store.paginate::<IdentityContentKey>(/* offset= */ 0, /* limit= */ 5)?,
PaginateResult {
content_keys: content_keys[0..5].into(),
entry_count,
}
);
assert_eq!(
store.paginate::<IdentityContentKey>(/* offset= */ 5, /* limit= */ 5)?,
PaginateResult {
content_keys: content_keys[5..10].into(),
entry_count,
}
);
assert_eq!(
store.paginate::<IdentityContentKey>(/* offset= */ 10, /* limit= */ 5)?,
PaginateResult {
content_keys: content_keys[10..].into(),
entry_count,
}
);

Ok(())
}
}

0 comments on commit 2e59f62

Please sign in to comment.