diff --git a/trin-state/src/storage.rs b/trin-state/src/storage.rs index 5cacfbb29..f9d06910f 100644 --- a/trin-state/src/storage.rs +++ b/trin-state/src/storage.rs @@ -12,7 +12,7 @@ use keccak_hash::keccak; use trin_storage::{ error::ContentStoreError, versioned::{create_store, ContentType, IdIndexedV1Store, IdIndexedV1StoreConfig}, - ContentId, ContentStore, PortalStorageConfig, ShouldWeStoreContent, BYTES_IN_MB_U64, + ContentId, ContentStore, PortalStorageConfig, ShouldWeStoreContent, }; /// Storage layer for the state network. Encapsulates state network specific data and logic. @@ -69,15 +69,7 @@ impl ContentStore for StateStorage { impl StateStorage { pub fn new(config: PortalStorageConfig) -> Result { let sql_connection_pool = config.sql_connection_pool.clone(); - let config = IdIndexedV1StoreConfig { - content_type: ContentType::State, - network: ProtocolId::State, - node_id: config.node_id, - node_data_dir: config.node_data_dir, - distance_fn: config.distance_fn, - sql_connection_pool: config.sql_connection_pool, - storage_capacity_bytes: config.storage_capacity_mb * BYTES_IN_MB_U64, - }; + let config = IdIndexedV1StoreConfig::new(ContentType::State, ProtocolId::State, config); Ok(Self { store: create_store(ContentType::State, config, sql_connection_pool)?, }) diff --git a/trin-storage/src/versioned/id_indexed_v1/config.rs b/trin-storage/src/versioned/id_indexed_v1/config.rs index 380b6590c..33179799b 100644 --- a/trin-storage/src/versioned/id_indexed_v1/config.rs +++ b/trin-storage/src/versioned/id_indexed_v1/config.rs @@ -7,7 +7,7 @@ use r2d2_sqlite::SqliteConnectionManager; use crate::{ versioned::{usage_stats::UsageStats, ContentType}, - DistanceFunction, + DistanceFunction, PortalStorageConfig, BYTES_IN_MB_U64, }; /// The fraction of the storage capacity that we should aim for when pruning. @@ -29,6 +29,22 @@ pub struct IdIndexedV1StoreConfig { } impl IdIndexedV1StoreConfig { + pub fn new( + content_type: ContentType, + network: ProtocolId, + config: PortalStorageConfig, + ) -> Self { + Self { + content_type, + network, + node_id: config.node_id, + node_data_dir: config.node_data_dir, + storage_capacity_bytes: config.storage_capacity_mb * BYTES_IN_MB_U64, + sql_connection_pool: config.sql_connection_pool, + distance_fn: config.distance_fn, + } + } + pub fn target_capacity(&self) -> u64 { (self.storage_capacity_bytes as f64 * TARGET_CAPACITY_FRACTION).round() as u64 } diff --git a/trin-storage/src/versioned/id_indexed_v1/migration.rs b/trin-storage/src/versioned/id_indexed_v1/migration.rs new file mode 100644 index 000000000..da0499648 --- /dev/null +++ b/trin-storage/src/versioned/id_indexed_v1/migration.rs @@ -0,0 +1,161 @@ +use crate::{ + error::ContentStoreError, + versioned::{ + id_indexed_v1::sql, + usage_stats::{update_usage_stats, UsageStats}, + ContentType, + }, +}; + +use super::IdIndexedV1StoreConfig; + +pub fn migrate_legacy_history_store( + config: &IdIndexedV1StoreConfig, +) -> Result<(), ContentStoreError> { + if config.content_type != ContentType::History { + panic!("Can't migrate LegacyHistory store for non History content type.") + } + let content_type = &config.content_type; + + // Rename old table and drop old indicies (they can't be renamed). + config.sql_connection_pool.get()?.execute_batch(&format!( + "ALTER TABLE history RENAME TO {}; + DROP INDEX history_distance_short_idx; + DROP INDEX history_content_size_idx;", + sql::table_name(content_type) + ))?; + + // Update usage stats + let conn = config.sql_connection_pool.get()?; + let usage_stats = conn.query_row(&sql::entry_count_and_size(content_type), [], |row| { + Ok(UsageStats { + entry_count: row.get("count")?, + total_entry_size_bytes: row.get::<&str, f64>("used_capacity")?.round() as u64, + }) + })?; + update_usage_stats(&conn, content_type, &usage_stats)?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use anyhow::Result; + use ethportal_api::{types::portal_wire::ProtocolId, IdentityContentKey, OverlayContentKey}; + use rand::Rng; + + use crate::{ + test_utils::{create_test_portal_storage_config_with_capacity, generate_random_bytes}, + versioned::{ + create_store, usage_stats::get_usage_stats, IdIndexedV1Store, LegacyHistoryStore, + VersionedContentStore, + }, + }; + + use super::*; + + const STORAGE_CAPACITY_MB: u64 = 10; + + fn generate_key_value_with_content_size() -> (IdentityContentKey, Vec) { + let key = IdentityContentKey::random(); + let value = generate_random_bytes(rand::thread_rng().gen_range(100..200)); + (key, value) + } + + #[test] + fn legacy_history_empty() -> Result<()> { + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + + // initialize legacy store + let legacy_history_store = LegacyHistoryStore::new(config.clone())?; + drop(legacy_history_store); + + // migrate + let config = IdIndexedV1StoreConfig::new(ContentType::History, ProtocolId::History, config); + migrate_legacy_history_store(&config)?; + + // make sure we can initialize new store and that it's empty + IdIndexedV1Store::create(ContentType::History, config.clone())?; + assert_eq!( + get_usage_stats(&config.sql_connection_pool.get()?, &ContentType::History)?, + UsageStats { + entry_count: 0, + total_entry_size_bytes: 0 + } + ); + + Ok(()) + } + + #[test] + fn legacy_history_with_content() -> Result<()> { + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + + let mut key_value_map = HashMap::new(); + + // initialize legacy store + let mut legacy_history_store = LegacyHistoryStore::new(config.clone())?; + for _ in 0..10 { + let (key, value) = generate_key_value_with_content_size(); + legacy_history_store.store(&key, &value)?; + key_value_map.insert(key, value); + } + drop(legacy_history_store); + + // migrate + let config = IdIndexedV1StoreConfig::new(ContentType::History, ProtocolId::History, config); + migrate_legacy_history_store(&config)?; + + // create IdIndexedV1Store and verify content + let store = IdIndexedV1Store::create(ContentType::History, config)?; + for (key, value) in key_value_map.into_iter() { + assert_eq!( + store.lookup_content_value(&key.content_id().into())?, + Some(value), + ); + } + + Ok(()) + } + + #[test] + fn legacy_history_using_create_store() -> Result<()> { + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + + let mut key_value_map = HashMap::new(); + + // initialize legacy store + let mut legacy_history_store: LegacyHistoryStore = create_store( + ContentType::History, + config.clone(), + config.sql_connection_pool.clone(), + )?; + for _ in 0..10 { + let (key, value) = generate_key_value_with_content_size(); + legacy_history_store.store(&key, &value)?; + key_value_map.insert(key, value); + } + drop(legacy_history_store); + + // create IdIndexedV1Store and verify content + let config = IdIndexedV1StoreConfig::new(ContentType::History, ProtocolId::History, config); + let store: IdIndexedV1Store = create_store( + ContentType::History, + config.clone(), + config.sql_connection_pool.clone(), + )?; + for (key, value) in key_value_map.into_iter() { + assert_eq!( + store.lookup_content_value(&key.content_id().into())?, + Some(value), + ); + } + + Ok(()) + } +} diff --git a/trin-storage/src/versioned/id_indexed_v1/mod.rs b/trin-storage/src/versioned/id_indexed_v1/mod.rs index 703e2263c..a1f7db5d0 100644 --- a/trin-storage/src/versioned/id_indexed_v1/mod.rs +++ b/trin-storage/src/versioned/id_indexed_v1/mod.rs @@ -1,4 +1,5 @@ mod config; +mod migration; mod sql; mod store; diff --git a/trin-storage/src/versioned/id_indexed_v1/store.rs b/trin-storage/src/versioned/id_indexed_v1/store.rs index 1c4c81490..7634ea20b 100644 --- a/trin-storage/src/versioned/id_indexed_v1/store.rs +++ b/trin-storage/src/versioned/id_indexed_v1/store.rs @@ -5,7 +5,7 @@ use rusqlite::{named_params, types::Type, OptionalExtension}; use tracing::{debug, error, warn}; use trin_metrics::storage::StorageMetricsReporter; -use super::{sql, IdIndexedV1StoreConfig}; +use super::{migration::migrate_legacy_history_store, sql, IdIndexedV1StoreConfig}; use crate::{ error::ContentStoreError, utils::get_total_size_of_directory_in_bytes, @@ -56,10 +56,13 @@ impl VersionedContentStore for IdIndexedV1Store { } fn migrate_from( - _content_type: &ContentType, + content_type: &ContentType, old_version: StoreVersion, - _config: &Self::Config, + config: &Self::Config, ) -> Result<(), ContentStoreError> { + if content_type == &ContentType::History && old_version == StoreVersion::LegacyHistory { + return migrate_legacy_history_store(config); + } Err(ContentStoreError::UnsupportedStoreMigration { old_version, new_version: Self::version(), diff --git a/trin-storage/src/versioned/sql.rs b/trin-storage/src/versioned/sql.rs index 9b3493095..c3b842106 100644 --- a/trin-storage/src/versioned/sql.rs +++ b/trin-storage/src/versioned/sql.rs @@ -1,5 +1,7 @@ use super::ContentType; +// The store_info queries + pub const STORE_INFO_CREATE_TABLE: &str = " CREATE TABLE IF NOT EXISTS store_info ( content_type TEXT PRIMARY KEY, @@ -16,6 +18,8 @@ pub const STORE_INFO_LOOKUP: &str = " WHERE content_type = :content_type LIMIT 1"; +// The usage_stats queries + pub const USAGE_STATS_CREATE_TABLE: &str = " CREATE TABLE IF NOT EXISTS usage_stats ( content_type TEXT PRIMARY KEY, @@ -72,3 +76,10 @@ pub fn create_usage_stats_triggers( " ) } + +// The table management queries + +pub const TABLE_EXISTS: &str = " + SELECT name + FROM sqlite_master + WHERE type='table' AND name=:table_name"; diff --git a/trin-storage/src/versioned/utils.rs b/trin-storage/src/versioned/utils.rs index 575061aea..086a6fc69 100644 --- a/trin-storage/src/versioned/utils.rs +++ b/trin-storage/src/versioned/utils.rs @@ -13,7 +13,7 @@ pub fn create_store( config: S::Config, sql_connection_pool: Pool, ) -> Result { - let old_version = lookup_store_version(&content_type, &sql_connection_pool.get()?)?; + let old_version = get_store_version(&content_type, &sql_connection_pool.get()?)?; match old_version { Some(old_version) => { @@ -31,20 +31,41 @@ pub fn create_store( S::create(content_type, config) } -fn lookup_store_version( +fn get_store_version( content_type: &ContentType, conn: &PooledConnection, ) -> Result, ContentStoreError> { - Ok(conn + let version = conn .query_row( sql::STORE_INFO_LOOKUP, named_params! { ":content_type": content_type.as_ref() }, - |row| { - let version: StoreVersion = row.get("version")?; - Ok(version) - }, + |row| row.get::<&str, StoreVersion>("version"), ) - .optional()?) + .optional()?; + + match version { + Some(_) => Ok(version), + None => get_default_store_version(content_type, conn), + } +} + +fn get_default_store_version( + content_type: &ContentType, + conn: &PooledConnection, +) -> Result, ContentStoreError> { + match content_type { + ContentType::History => { + let exists = conn + .prepare(sql::TABLE_EXISTS)? + .exists(named_params! {":table_name": "history"})?; + if exists { + Ok(Some(StoreVersion::LegacyHistory)) + } else { + Ok(None) + } + } + _ => Ok(None), + } } fn update_store_info( @@ -63,7 +84,6 @@ fn update_store_info( } #[cfg(test)] -#[allow(clippy::unwrap_used)] pub mod test { use anyhow::Result; @@ -74,12 +94,28 @@ pub mod test { const STORAGE_CAPACITY_MB: u64 = 10; #[test] - fn lookup_no_store_version() -> Result<()> { + fn get_store_version_missing() -> Result<()> { let (_temp_dir, config) = create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; let conn = config.sql_connection_pool.get()?; - assert_eq!(lookup_store_version(&ContentType::State, &conn)?, None); + assert_eq!(get_store_version(&ContentType::History, &conn)?, None); + Ok(()) + } + + #[test] + fn get_store_version_default_history() -> Result<()> { + let (_temp_dir, config) = + create_test_portal_storage_config_with_capacity(STORAGE_CAPACITY_MB)?; + let conn = config.sql_connection_pool.get()?; + + let create_dummy_history_table_sql = "CREATE TABLE history (content_id blob PRIMARY KEY);"; + conn.execute(create_dummy_history_table_sql, [])?; + + assert_eq!( + get_store_version(&ContentType::History, &conn)?, + Some(StoreVersion::LegacyHistory) + ); Ok(()) } @@ -92,7 +128,7 @@ pub mod test { update_store_info(&ContentType::State, StoreVersion::IdIndexedV1, &conn)?; assert_eq!( - lookup_store_version(&ContentType::State, &conn)?, + get_store_version(&ContentType::State, &conn)?, Some(StoreVersion::IdIndexedV1) ); Ok(()) @@ -107,14 +143,14 @@ pub mod test { // Set store version update_store_info(&ContentType::State, StoreVersion::LegacyHistory, &conn)?; assert_eq!( - lookup_store_version(&ContentType::State, &conn)?, + get_store_version(&ContentType::State, &conn)?, Some(StoreVersion::LegacyHistory) ); // Update store version update_store_info(&ContentType::State, StoreVersion::IdIndexedV1, &conn)?; assert_eq!( - lookup_store_version(&ContentType::State, &conn)?, + get_store_version(&ContentType::State, &conn)?, Some(StoreVersion::IdIndexedV1) ); @@ -135,7 +171,7 @@ pub mod test { )?; assert_eq!( - lookup_store_version(&ContentType::State, &sql_connection_pool.get()?)?, + get_store_version(&ContentType::State, &sql_connection_pool.get()?)?, Some(StoreVersion::IdIndexedV1) ); @@ -158,7 +194,7 @@ pub mod test { create_store::(ContentType::State, config.clone(), sql_connection_pool)?; assert_eq!( - lookup_store_version(&ContentType::State, &config.sql_connection_pool.get()?)?, + get_store_version(&ContentType::State, &config.sql_connection_pool.get()?)?, Some(StoreVersion::IdIndexedV1) ); @@ -173,14 +209,15 @@ pub mod test { let sql_connection_pool = config.sql_connection_pool.clone(); update_store_info( - &ContentType::State, + &ContentType::History, StoreVersion::LegacyHistory, &sql_connection_pool.get().unwrap(), ) .unwrap(); // Should panic - MockContentStore doesn't support migration. - create_store::(ContentType::State, config, sql_connection_pool).unwrap(); + create_store::(ContentType::History, config, sql_connection_pool) + .unwrap(); } pub struct MockContentStore;