From e4508474dee0cbe80f7075aa6bc5ea98f058115b Mon Sep 17 00:00:00 2001 From: sfauvel Date: Mon, 25 Nov 2024 15:09:18 +0100 Subject: [PATCH] Remove StoreAdapter --- .../src/store/adapter/dumb_adapter.rs | 236 --------- .../src/store/adapter/fail_adapter.rs | 143 ------ .../src/store/adapter/memory_adapter.rs | 240 --------- .../src/store/adapter/mod.rs | 15 - .../src/store/adapter/sqlite_adapter.rs | 481 ------------------ .../src/store/adapter/store_adapter.rs | 64 --- internal/mithril-persistence/src/store/mod.rs | 6 +- .../src/store/store_pruner.rs | 129 ----- .../repository/epoch_settings_store.rs | 9 +- .../repository/signer_registration_store.rs | 16 +- .../database/repository/stake_pool_store.rs | 10 +- .../src/dependency_injection/builder.rs | 2 +- .../src/http_server/routes/signer_routes.rs | 3 +- mithril-aggregator/src/lib.rs | 2 +- 14 files changed, 16 insertions(+), 1340 deletions(-) delete mode 100644 internal/mithril-persistence/src/store/adapter/dumb_adapter.rs delete mode 100644 internal/mithril-persistence/src/store/adapter/fail_adapter.rs delete mode 100644 internal/mithril-persistence/src/store/adapter/memory_adapter.rs delete mode 100644 internal/mithril-persistence/src/store/adapter/mod.rs delete mode 100644 internal/mithril-persistence/src/store/adapter/sqlite_adapter.rs delete mode 100644 internal/mithril-persistence/src/store/adapter/store_adapter.rs delete mode 100644 internal/mithril-persistence/src/store/store_pruner.rs diff --git a/internal/mithril-persistence/src/store/adapter/dumb_adapter.rs b/internal/mithril-persistence/src/store/adapter/dumb_adapter.rs deleted file mode 100644 index a01148c0beb..00000000000 --- a/internal/mithril-persistence/src/store/adapter/dumb_adapter.rs +++ /dev/null @@ -1,236 +0,0 @@ -use super::{AdapterError, StoreAdapter}; -use anyhow::anyhow; -use async_trait::async_trait; - -/// A [StoreAdapter] that store one fixed data record, for testing purpose. -pub struct DumbStoreAdapter { - last_key: Option, - last_value: Option, - error: Option, -} - -impl DumbStoreAdapter { - /// DumbStoreAdapter factory - pub fn new() -> Self { - Self { - last_key: None, - last_value: None, - error: None, - } - } - - /// DumbStoreAdapter factory that returns error when 'get_record' is called. - pub fn new_failing_adapter(error: &str) -> Self { - Self { - error: Some(error.to_string()), - ..Self::new() - } - } -} - -impl Default for DumbStoreAdapter { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl StoreAdapter for DumbStoreAdapter -where - R: Clone + Send + Sync, - K: PartialEq + Clone + Send + Sync, -{ - type Key = K; - type Record = R; - - async fn store_record( - &mut self, - key: &Self::Key, - record: &Self::Record, - ) -> Result<(), AdapterError> { - let key = key.clone(); - let record = record.clone(); - - self.last_key = Some(key); - self.last_value = Some(record); - - Ok(()) - } - - async fn get_record(&self, key: &Self::Key) -> Result, AdapterError> { - match &self.error { - Some(error) => Err(AdapterError::GeneralError(anyhow!(error.clone()))), - None => { - if self.record_exists(key).await? { - Ok(self.last_value.as_ref().cloned()) - } else { - Ok(None) - } - } - } - } - - async fn record_exists(&self, key: &Self::Key) -> Result { - Ok(self.last_key.is_some() && self.last_key.as_ref().unwrap() == key) - } - - async fn get_last_n_records( - &self, - how_many: usize, - ) -> Result, AdapterError> { - if how_many > 0 { - match &self.last_key { - Some(_key) => Ok(vec![( - self.last_key.as_ref().cloned().unwrap(), - self.last_value.as_ref().cloned().unwrap(), - )]), - None => Ok(Vec::new()), - } - } else { - Ok(Vec::new()) - } - } - - async fn remove(&mut self, key: &Self::Key) -> Result, AdapterError> { - if let Some(record) = self.get_record(key).await? { - self.last_key = None; - self.last_value = None; - - Ok(Some(record)) - } else { - Ok(None) - } - } - - async fn get_iter(&self) -> Result + '_>, AdapterError> { - let mut values = vec![]; - if let Some(value) = &self.last_value { - values.push(value.clone()); - } - Ok(Box::new(values.into_iter())) - } -} - -#[cfg(test)] -mod tests { - - use super::*; - - #[tokio::test] - async fn test_with_no_record_exists() { - let adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - assert!(!adapter.record_exists(&1).await.unwrap()); - } - - #[tokio::test] - async fn test_with_no_record_get() { - let adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - assert!(adapter.get_record(&1).await.unwrap().is_none()); - } - - #[tokio::test] - async fn test_write_record() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - assert!(adapter - .store_record(&1, &"record".to_string()) - .await - .is_ok()); - assert_eq!( - "record".to_owned(), - adapter.get_record(&1).await.unwrap().unwrap() - ); - } - - #[tokio::test] - async fn test_list_with_no_record() { - let adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - assert_eq!(0, adapter.get_last_n_records(10).await.unwrap().len()); - } - - #[tokio::test] - async fn test_list_with_records() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let list = adapter.get_last_n_records(10).await.unwrap(); - - assert_eq!(1, list.len()); - - let (key, record) = &list[0]; - - assert_eq!(&1, key); - assert_eq!(&("record".to_owned()), record); - } - - #[tokio::test] - async fn test_list_with_last_zero() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let list = adapter.get_last_n_records(0).await.unwrap(); - - assert_eq!(0, list.len()); - } - - #[tokio::test] - async fn test_remove_existing_record() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let value = adapter.remove(&1).await.unwrap().unwrap(); - - assert_eq!("record".to_string(), value); - assert!(!adapter.record_exists(&1).await.unwrap()); - } - - #[tokio::test] - async fn test_remove_non_existing_record() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let maybe_record = adapter.remove(&0).await.unwrap(); - - assert!(maybe_record.is_none()); - } - - #[tokio::test] - async fn test_iter_record() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let records: Vec = adapter.get_iter().await.unwrap().collect(); - - assert_eq!(vec!["record"], records); - } - - #[tokio::test] - async fn test_iter_without_record() { - let adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - let records = adapter.get_iter().await.unwrap(); - - assert_eq!(0, records.count()); - } - - #[tokio::test] - async fn test_return_error_calling_get_record() { - let adapter: DumbStoreAdapter = - DumbStoreAdapter::new_failing_adapter("error"); - let result = adapter.get_record(&"key".to_string()).await; - - assert!(result.is_err()); - } -} diff --git a/internal/mithril-persistence/src/store/adapter/fail_adapter.rs b/internal/mithril-persistence/src/store/adapter/fail_adapter.rs deleted file mode 100644 index a8638530a95..00000000000 --- a/internal/mithril-persistence/src/store/adapter/fail_adapter.rs +++ /dev/null @@ -1,143 +0,0 @@ -use super::{AdapterError, StoreAdapter}; -use anyhow::anyhow; -use async_trait::async_trait; -use std::marker::PhantomData; - -/// A [StoreAdapter] which always fails, for testing purpose. -pub struct FailStoreAdapter { - key: PhantomData, - certificate: PhantomData, -} - -impl FailStoreAdapter { - /// FailStoreAdapter factory - pub fn new() -> Self { - Self { - key: PhantomData, - certificate: PhantomData, - } - } -} - -impl Default for FailStoreAdapter { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl StoreAdapter for FailStoreAdapter -where - R: Clone + Send + Sync, - K: PartialEq + Clone + Send + Sync, -{ - type Key = K; - type Record = R; - - async fn store_record( - &mut self, - _key: &Self::Key, - _record: &Self::Record, - ) -> Result<(), AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn get_record(&self, _key: &Self::Key) -> Result, AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn record_exists(&self, _key: &Self::Key) -> Result { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn get_last_n_records( - &self, - _how_many: usize, - ) -> Result, AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn remove(&mut self, _key: &Self::Key) -> Result, AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn get_iter(&self) -> Result + '_>, AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } -} - -#[cfg(test)] -mod tests { - - use super::*; - - #[tokio::test] - async fn test_with_no_record_exists() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - - assert!(adapter.record_exists(&1).await.is_err()); - } - - #[tokio::test] - async fn test_with_no_record_get() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - - assert!(adapter.get_record(&1).await.is_err()); - } - - #[tokio::test] - async fn test_write_record() { - let mut adapter: FailStoreAdapter = FailStoreAdapter::new(); - - assert!(adapter - .store_record(&1, &"record".to_string()) - .await - .is_err()); - } - - #[tokio::test] - async fn test_list() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - - assert!(adapter.get_last_n_records(10).await.is_err()); - } - - #[tokio::test] - async fn test_list_with_records() { - let mut adapter: FailStoreAdapter = FailStoreAdapter::new(); - assert!(adapter - .store_record(&1, &"record".to_string()) - .await - .is_err()); - } - - #[tokio::test] - async fn test_list_with_last_zero() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - assert!(adapter.get_last_n_records(0).await.is_err()); - } - - #[tokio::test] - async fn test_remove_existing_record() { - let mut adapter: FailStoreAdapter = FailStoreAdapter::new(); - assert!(adapter.remove(&0).await.is_err()); - } - - #[tokio::test] - async fn test_get_iter() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - assert!(adapter.get_iter().await.is_err()); - } -} diff --git a/internal/mithril-persistence/src/store/adapter/memory_adapter.rs b/internal/mithril-persistence/src/store/adapter/memory_adapter.rs deleted file mode 100644 index 13cbd3cc82c..00000000000 --- a/internal/mithril-persistence/src/store/adapter/memory_adapter.rs +++ /dev/null @@ -1,240 +0,0 @@ -use anyhow::anyhow; -use async_trait::async_trait; -use std::{collections::HashMap, hash::Hash}; - -use super::{AdapterError, StoreAdapter}; - -/// A [StoreAdapter] that store data in memory. -pub struct MemoryAdapter { - index: Vec, - values: HashMap, -} - -impl MemoryAdapter -where - K: Hash + Eq + Send + Sync + Clone, - V: Send + Sync + Clone, -{ - /// MemoryAdapter factory - pub fn new(data: Option>) -> Result { - let data = data.unwrap_or_default(); - let mut values = HashMap::new(); - let mut index = Vec::new(); - - for (idx, elt) in data.into_iter() { - if values.insert(idx.clone(), elt).is_some() { - return Err(AdapterError::InitializationError(anyhow!( - "duplicate key found" - ))); - } - index.push(idx); - } - - Ok(Self { index, values }) - } -} - -#[async_trait] -impl StoreAdapter for MemoryAdapter -where - K: Hash + Eq + Send + Sync + Clone, - V: Send + Sync + Clone, -{ - type Key = K; - type Record = V; - - async fn store_record( - &mut self, - key: &Self::Key, - record: &Self::Record, - ) -> Result<(), AdapterError> { - let key = (*key).clone(); - let record = (*record).clone(); - - if self.values.insert(key.clone(), record).is_none() { - self.index.push(key); - } - - Ok(()) - } - - async fn get_record(&self, key: &Self::Key) -> Result, AdapterError> { - match self.values.get(key) { - Some(val) => Ok(Some(val.clone())), - None => Ok(None), - } - } - - async fn record_exists(&self, key: &Self::Key) -> Result { - Ok(self.values.contains_key(key)) - } - - async fn get_last_n_records( - &self, - how_many: usize, - ) -> Result, AdapterError> { - Ok(self - .index - .iter() - .rev() - .take(how_many) - .map(|k| (k.clone(), self.values.get(k).unwrap().clone())) - .collect()) - } - - async fn remove(&mut self, key: &Self::Key) -> Result, AdapterError> { - self.index.retain(|k| *k != *key); - - Ok(self.values.remove(key)) - } - - async fn get_iter(&self) -> Result + '_>, AdapterError> { - Ok(Box::new( - self.index - .iter() - .rev() - .map(|k| self.values.get(k).unwrap().clone()), - )) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn init_adapter(nb: u64) -> MemoryAdapter { - let mut values: Vec<(u64, String)> = Vec::new(); - if nb > 0 { - for ix in 1..=nb { - values.push((ix, format!("value {ix}"))); - } - - MemoryAdapter::new(Some(values)).unwrap() - } else { - MemoryAdapter::new(None).unwrap() - } - } - - #[tokio::test] - async fn record_exists_existing_key() { - let adapter = init_adapter(2); - - assert!(adapter.record_exists(&1).await.unwrap()); - } - #[tokio::test] - async fn record_exists_non_existing_key() { - let adapter = init_adapter(2); - - assert!(!adapter.record_exists(&0).await.unwrap()); - } - - #[tokio::test] - async fn read_existing_record() { - let adapter = init_adapter(2); - let val = adapter.get_record(&2).await.unwrap(); - - assert!(val.is_some()); - assert_eq!("value 2".to_string(), val.unwrap()); - } - - #[tokio::test] - async fn read_unexisting_record() { - let adapter = init_adapter(2); - let val = adapter.get_record(&0).await.unwrap(); - - assert!(val.is_none()); - } - - #[tokio::test] - async fn get_last_n_values() { - let adapter = init_adapter(2); - let vals = adapter.get_last_n_records(5).await.unwrap(); - - assert_eq!(2, vals.len()); - assert_eq!((2, "value 2".to_string()), vals[0]); - } - - #[tokio::test] - async fn get_last_n_existing_values() { - let adapter = init_adapter(5); - let vals = adapter.get_last_n_records(2).await.unwrap(); - - assert_eq!(2, vals.len()); - assert_eq!((5, "value 5".to_string()), vals[0]); - } - - #[tokio::test] - async fn save_new_values() { - let mut adapter = init_adapter(2); - - assert!(adapter.store_record(&10, &"ten".to_string()).await.is_ok()); - let vals = adapter.get_last_n_records(1).await.unwrap(); - - assert_eq!((10, "ten".to_string()), vals[0]); - } - - #[tokio::test] - async fn update_value() { - let mut adapter = init_adapter(2); - - assert!(adapter.store_record(&1, &"one".to_string()).await.is_ok()); - let vals = adapter.get_last_n_records(2).await.unwrap(); - - assert_eq!( - vec![(2, "value 2".to_string()), (1, "one".to_string())], - vals - ); - } - - #[tokio::test] - async fn remove_existing_value() { - let mut adapter = init_adapter(2); - let record = adapter.remove(&1).await.unwrap().unwrap(); - - assert_eq!("value 1".to_string(), record); - assert!(!adapter.record_exists(&1).await.unwrap()); - assert_eq!(1, adapter.index.len()) - } - - #[tokio::test] - async fn remove_non_existing_value() { - let mut adapter = init_adapter(2); - let maybe_record = adapter.remove(&0).await.unwrap(); - - assert!(maybe_record.is_none()); - assert_eq!(2, adapter.index.len()) - } - - #[tokio::test] - async fn test_iter_record() { - let adapter = init_adapter(5); - let records: Vec = adapter.get_iter().await.unwrap().collect(); - - assert_eq!( - vec!["value 5", "value 4", "value 3", "value 2", "value 1"], - records - ); - } - - #[tokio::test] - async fn test_iter_without_record() { - let adapter = init_adapter(0); - let records = adapter.get_iter().await.unwrap(); - - assert_eq!(0, records.count()); - } - - #[tokio::test] - async fn check_get_last_n_modified_records() { - let mut adapter = init_adapter(3); - adapter - .store_record(&1, &"updated record".to_string()) - .await - .unwrap(); - let values = adapter.get_last_n_records(2).await.unwrap(); - assert_eq!( - vec![(3, "value 3".to_string()), (2, "value 2".to_string())], - values - ); - } -} diff --git a/internal/mithril-persistence/src/store/adapter/mod.rs b/internal/mithril-persistence/src/store/adapter/mod.rs deleted file mode 100644 index 548bee87f00..00000000000 --- a/internal/mithril-persistence/src/store/adapter/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! Define a generic way to store data with the [Store Adapter][store_adapter::StoreAdapter], with -//! an adapter [in memory][MemoryAdapter] and another [sqlite][SQLiteAdapter]. - -mod memory_adapter; -mod sqlite_adapter; -mod store_adapter; - -pub use memory_adapter::MemoryAdapter; -pub use sqlite_adapter::{SQLiteAdapter, SQLiteResultIterator}; -pub use store_adapter::*; - -mod dumb_adapter; -pub use dumb_adapter::DumbStoreAdapter; -mod fail_adapter; -pub use fail_adapter::FailStoreAdapter; diff --git a/internal/mithril-persistence/src/store/adapter/sqlite_adapter.rs b/internal/mithril-persistence/src/store/adapter/sqlite_adapter.rs deleted file mode 100644 index e75b6e4a45a..00000000000 --- a/internal/mithril-persistence/src/store/adapter/sqlite_adapter.rs +++ /dev/null @@ -1,481 +0,0 @@ -use anyhow::anyhow; -use async_trait::async_trait; -use serde::{de::DeserializeOwned, Serialize}; -use sha2::{Digest, Sha256}; -use sqlite::{Connection, State, Statement}; -use std::{marker::PhantomData, sync::Arc, thread::sleep, time::Duration}; - -use super::{AdapterError, StoreAdapter}; -use crate::sqlite::SqliteConnection; - -type Result = std::result::Result; - -const DELAY_MS_ON_LOCK: u32 = 50; -const NB_RETRIES_ON_LOCK: u32 = 3; - -/// Store adapter for SQLite3 -pub struct SQLiteAdapter { - connection: Arc, - table: String, - key: PhantomData, - value: PhantomData, -} - -impl SQLiteAdapter -where - K: Serialize, - V: DeserializeOwned, -{ - /// Create a new SQLiteAdapter instance. - pub fn new(table_name: &str, connection: Arc) -> Result { - { - Self::check_table_exists(&connection, table_name)?; - } - - Ok(Self { - connection, - table: table_name.to_owned(), - key: PhantomData, - value: PhantomData, - }) - } - - fn check_table_exists(connection: &Connection, table_name: &str) -> Result<()> { - let sql = format!( - "select exists(select 1 from sqlite_master where type='table' and name='{table_name}')" - ); - let mut statement = connection - .prepare(sql) - .map_err(|e| AdapterError::OpeningStreamError(e.into()))?; - statement - .next() - .map_err(|e| AdapterError::QueryError(e.into()))?; - let table_exists = statement - .read::(0) - .map_err(|e| AdapterError::ParsingDataError(e.into()))?; - - if table_exists != 1 { - Self::create_table(connection, table_name)?; - } - - Ok(()) - } - - fn create_table(connection: &Connection, table_name: &str) -> Result<()> { - let sql = format!( - "create table {table_name} (key_hash text primary key, key json not null, value json not null)" - ); - connection - .execute(sql) - .map_err(|e| AdapterError::QueryError(e.into()))?; - - Ok(()) - } - - fn get_hash_from_key(&self, key: &K) -> Result { - let mut hasher = Sha256::new(); - hasher.update(self.serialize_key(key)?); - let checksum = hasher.finalize(); - - Ok(hex::encode(checksum)) - } - - fn serialize_key(&self, key: &K) -> Result { - serde_json::to_string(&key).map_err(|e| { - AdapterError::GeneralError( - anyhow!(e).context("SQLite adapter: Serde error while serializing store key"), - ) - }) - } - - // Connection must be locked from the calling function to be able to return - // a Statement that references this connection. - fn get_statement_for_key<'conn>( - &'conn self, - connection: &'conn Connection, - sql: String, - key: &K, - ) -> Result { - let mut statement = connection - .prepare(sql) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((1, self.get_hash_from_key(key)?.as_str())) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - - Ok(statement) - } - - fn fetch_maybe_one_value(&self, mut statement: Statement) -> Result> { - let mut retries = Some(NB_RETRIES_ON_LOCK); - let mut result = statement.next(); - - while result.is_err() { - // database is probably locked - // wait and retry strategy - retries = retries.filter(|v| v > &1).map(|v| v - 1); - - if retries.is_none() { - return Err(result - .map_err(|e| AdapterError::ParsingDataError(e.into())) - .unwrap_err()); - } - sleep(Duration::from_millis(DELAY_MS_ON_LOCK as u64)); - result = statement.next(); - } - - if State::Done == result.unwrap() { - return Ok(None); - } - let maybe_value: Option = statement - .read::(0) - .map_err(|e| AdapterError::QueryError(e.into())) - .and_then(|v| { - serde_json::from_str(&v).map_err(|e| AdapterError::ParsingDataError(e.into())) - })?; - - Ok(maybe_value) - } -} - -#[async_trait] -impl StoreAdapter for SQLiteAdapter -where - K: Send + Sync + Serialize + DeserializeOwned, - V: Send + Sync + Serialize + DeserializeOwned, -{ - type Key = K; - type Record = V; - - async fn store_record(&mut self, key: &Self::Key, record: &Self::Record) -> Result<()> { - let sql = format!( - "insert into {} (key_hash, key, value) values (?1, ?2, ?3) on conflict (key_hash) do update set value = excluded.value", - self.table - ); - let value = serde_json::to_string(record).map_err(|e| { - AdapterError::GeneralError( - anyhow!(e) - .context("SQLite adapter error: could not serialize value before insertion"), - ) - })?; - let mut statement = self - .connection - .prepare(sql) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((1, self.get_hash_from_key(key)?.as_str())) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((2, self.serialize_key(key)?.as_str())) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((3, value.as_str())) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - let _ = statement - .next() - .map_err(|e| AdapterError::ParsingDataError(e.into()))?; - - Ok(()) - } - - async fn get_record(&self, key: &Self::Key) -> Result> { - let sql = format!("select value from {} where key_hash = ?1", self.table); - let statement = self.get_statement_for_key(&self.connection, sql, key)?; - - self.fetch_maybe_one_value(statement) - } - - async fn record_exists(&self, key: &Self::Key) -> Result { - let sql = format!( - "select exists(select 1 from {} where key_hash = ?1) as record_exists", - self.table - ); - let mut statement = self.get_statement_for_key(&self.connection, sql, key)?; - statement - .next() - .map_err(|e| AdapterError::QueryError(e.into()))?; - - statement - .read::(0) - .map_err(|e| { - AdapterError::GeneralError( - anyhow!(e).context("There should be a result in this case !"), - ) - }) - .map(|res| res == 1) - } - - async fn get_last_n_records(&self, how_many: usize) -> Result> { - let sql = format!( - "select cast(key as text) as key, cast(value as text) as value from {} order by ROWID desc limit ?1", - self.table - ); - let mut statement = self - .connection - .prepare(sql) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((1, how_many as i64)) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - let cursor = statement.iter(); - - let results = cursor - .map(|row| { - let row = row.unwrap(); - let key: K = serde_json::from_str(row.read::<&str, _>(0)).unwrap(); - let value: V = serde_json::from_str(row.read::<&str, _>(1)).unwrap(); - - (key, value) - }) - .collect(); - - Ok(results) - } - - async fn remove(&mut self, key: &Self::Key) -> Result> { - let sql = format!( - "delete from {} where key_hash = ?1 returning value", - self.table - ); - let statement = self.get_statement_for_key(&self.connection, sql, key)?; - - self.fetch_maybe_one_value(statement) - } - - async fn get_iter(&self) -> Result + '_>> { - let iterator = SQLiteResultIterator::new(&self.connection, &self.table)?; - - Ok(Box::new(iterator)) - } -} - -/// Iterator over SQLite adapter results. -/// -/// **important:** For now all the results are loaded in memory, it would be better to -/// consume the cursor but this is a quick solution. -pub struct SQLiteResultIterator { - results: Vec, -} - -impl SQLiteResultIterator -where - V: DeserializeOwned, -{ - /// Create a new instance of the iterator. - pub fn new(connection: &Connection, table_name: &str) -> Result> { - let sql = format!("select value from {table_name} order by ROWID asc"); - - let cursor = connection - .prepare(sql) - .map_err(|e| AdapterError::QueryError(e.into()))? - .into_iter(); - - let results = cursor - .map(|row| { - let row = row.unwrap(); - let res: V = serde_json::from_str(row.read::<&str, _>(0)).unwrap(); - - res - }) - .collect(); - - Ok(Self { results }) - } -} - -impl Iterator for SQLiteResultIterator { - type Item = V; - - fn next(&mut self) -> Option { - self.results.pop() - } -} - -#[cfg(test)] -mod tests { - use mithril_common::test_utils::TempDir; - use sqlite::Value; - use std::path::{Path, PathBuf}; - - use super::*; - - const TABLE_NAME: &str = "key_value_store"; - - fn get_file_path(test_name: &str) -> PathBuf { - TempDir::create("sqlite_adapter", test_name).join("db.sqlite3") - } - - fn init_db(db_path: &Path, tablename: Option<&str>) -> SQLiteAdapter { - let tablename = tablename.unwrap_or(TABLE_NAME); - let connection = Connection::open_thread_safe(db_path).unwrap(); - - SQLiteAdapter::new(tablename, Arc::new(connection)).unwrap() - } - - #[tokio::test] - async fn test_store_record() { - let test_name = "test_store_record"; - let filepath = get_file_path(test_name); - let mut adapter = init_db(&filepath, None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - let connection = Connection::open(&filepath).unwrap_or_else(|_| { - panic!( - "Expecting to be able to open SQLite file '{}'.", - filepath.display() - ) - }); - let mut cursor = connection - .prepare(format!("select key_hash, key, value from {TABLE_NAME}")) - .unwrap() - .into_iter(); - let row = cursor - .try_next() - .unwrap() - .expect("Expecting at least one row in the result set."); - assert_eq!(Value::Integer(1), row[1]); - assert_eq!(Value::String("\"one\"".to_string()), row[2]); - - // We must drop the cursor else the db will be locked - drop(cursor); - - adapter.store_record(&1, &"zwei".to_string()).await.unwrap(); - let mut statement = connection - .prepare(format!("select key_hash, key, value from {TABLE_NAME}")) - .unwrap(); - let mut cursor = statement.iter(); - let row = cursor - .try_next() - .unwrap() - .expect("Expecting at least one row in the result set."); - assert_eq!(Value::String("\"zwei\"".to_string()), row[2]); - } - - #[tokio::test] - async fn test_get_record() { - let test_name = "test_get_record"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - adapter - .store_record(&3, &"three".to_string()) - .await - .unwrap(); - assert_eq!( - Some("one".to_string()), - adapter.get_record(&1).await.unwrap() - ); - assert_eq!( - Some("three".to_string()), - adapter.get_record(&3).await.unwrap() - ); - assert_eq!( - Some("two".to_string()), - adapter.get_record(&2).await.unwrap() - ); - assert_eq!(None, adapter.get_record(&4).await.unwrap()); - } - - #[tokio::test] - async fn test_get_iterator() { - let test_name = "test_get_iterator"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - adapter - .store_record(&3, &"three".to_string()) - .await - .unwrap(); - let collection: Vec<(usize, String)> = - adapter.get_iter().await.unwrap().enumerate().collect(); - assert_eq!( - vec![ - (0, "three".to_string()), - (1, "two".to_string()), - (2, "one".to_string()), - ], - collection - ); - } - - #[tokio::test] - async fn test_record_exists() { - let test_name = "test_record_exists"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - - assert!(adapter.record_exists(&1).await.unwrap()); - assert!(adapter.record_exists(&2).await.unwrap()); - assert!(!adapter.record_exists(&3).await.unwrap()); - } - - #[tokio::test] - async fn test_remove() { - let test_name = "test_remove"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - let record = adapter - .remove(&1) - .await - .expect("removing an existing record should not fail") - .expect("removing an existing record should return the deleted record"); - assert_eq!("one".to_string(), record); - let empty = adapter - .remove(&1) - .await - .expect("removing a non existing record should not fail"); - assert!(empty.is_none()); - } - - #[tokio::test] - async fn test_get_last_n_records() { - let test_name = "test_get_last_n_records"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - adapter - .store_record(&3, &"three".to_string()) - .await - .unwrap(); - assert_eq!( - vec![(3_u64, "three".to_string())], - adapter - .get_last_n_records(1) - .await - .expect("get last N records should not fail") - ); - assert_eq!( - vec![ - (3_u64, "three".to_string()), - (2_u64, "two".to_string()), - (1_u64, "one".to_string()), - ], - adapter - .get_last_n_records(5) - .await - .expect("get last N records should not fail") - ); - } - - #[tokio::test] - async fn check_get_last_n_modified_records() { - let test_name = "check_get_last_n_modified_records"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - adapter - .store_record(&3, &"three".to_string()) - .await - .unwrap(); - adapter - .store_record(&1, &"updated record".to_string()) - .await - .unwrap(); - let values = adapter.get_last_n_records(2).await.unwrap(); - assert_eq!( - vec![(3, "three".to_string()), (2, "two".to_string())], - values - ); - } -} diff --git a/internal/mithril-persistence/src/store/adapter/store_adapter.rs b/internal/mithril-persistence/src/store/adapter/store_adapter.rs deleted file mode 100644 index 8aedfd42dbc..00000000000 --- a/internal/mithril-persistence/src/store/adapter/store_adapter.rs +++ /dev/null @@ -1,64 +0,0 @@ -use async_trait::async_trait; -use mithril_common::StdError; -use thiserror::Error; - -/// [StoreAdapter] related errors -#[derive(Debug, Error)] -pub enum AdapterError { - /// Generic [StoreAdapter] error. - #[error("something wrong happened")] - GeneralError(#[source] StdError), - - /// Error raised when the store initialization fails. - #[error("problem creating the repository")] - InitializationError(#[source] StdError), - - /// Error raised when the opening of a IO stream fails. - #[error("problem opening the IO stream")] - OpeningStreamError(#[source] StdError), - - /// Error raised when the parsing of a IO stream fails. - #[error("problem parsing the IO stream")] - ParsingDataError(#[source] StdError), - - /// Error while querying the subsystem. - #[error("problem when querying the adapter")] - QueryError(#[source] StdError), -} - -/// Represent a way to store Key/Value pair data. -#[async_trait] -pub trait StoreAdapter: Sync + Send { - /// The key type - type Key; - - /// The record type - type Record; - - /// Store the given `record`. - async fn store_record( - &mut self, - key: &Self::Key, - record: &Self::Record, - ) -> Result<(), AdapterError>; - - /// Get the record stored using the given `key`. - async fn get_record(&self, key: &Self::Key) -> Result, AdapterError>; - - /// Check if a record exist for the given `key`. - async fn record_exists(&self, key: &Self::Key) -> Result; - - /// Get the last `n` records in the store - async fn get_last_n_records( - &self, - how_many: usize, - ) -> Result, AdapterError>; - - /// remove values from store - /// - /// if the value exists it is returned by the adapter otherwise None is returned - async fn remove(&mut self, key: &Self::Key) -> Result, AdapterError>; - - /// Get an iterator over the stored values, from the latest to the oldest. - async fn get_iter(&self) -> Result + '_>, AdapterError>; -} diff --git a/internal/mithril-persistence/src/store/mod.rs b/internal/mithril-persistence/src/store/mod.rs index e4805b92811..64b15cf37bd 100644 --- a/internal/mithril-persistence/src/store/mod.rs +++ b/internal/mithril-persistence/src/store/mod.rs @@ -1,9 +1,5 @@ -//! Define a generic way to store data with the [Store Adapters][adapter], and the [StakeStorer] -//! to store stakes. +//! Define traits of [StakeStorer]. -pub mod adapter; mod stake_store; -mod store_pruner; pub use stake_store::StakeStorer; -pub use store_pruner::StorePruner; diff --git a/internal/mithril-persistence/src/store/store_pruner.rs b/internal/mithril-persistence/src/store/store_pruner.rs deleted file mode 100644 index e95c2daa049..00000000000 --- a/internal/mithril-persistence/src/store/store_pruner.rs +++ /dev/null @@ -1,129 +0,0 @@ -use async_trait::async_trait; -use mithril_common::StdResult; -use tokio::sync::RwLock; - -use super::adapter::StoreAdapter; - -/// Implementing this trait will make store able to limit the number of the -/// stored records by pruning them if a limit is set. -#[async_trait] -pub trait StorePruner { - /// The key type - type Key: Sync + Send; - - /// The record type - type Record: Sync + Send; - - /// This trait requires a way to get the internal adapter. - fn get_adapter(&self) - -> &RwLock>>; - - /// Return the maximum number of elements that can exist in this store. If None, there is no limit. - fn get_max_records(&self) -> Option; - - /// Prune elements exceeding the specified limit. - async fn prune(&self) -> StdResult<()> { - let retention_len = self.get_max_records().unwrap_or(usize::MAX); - let lock = self.get_adapter(); - let mut adapter = lock.write().await; - - for (epoch, _record) in adapter - .get_last_n_records(usize::MAX) - .await? - .into_iter() - .skip(retention_len) - { - adapter.remove(&epoch).await?; - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::{cmp::min, sync::Arc}; - - use sqlite::Connection; - - use crate::store::adapter::SQLiteAdapter; - - use super::*; - - struct TestStore { - adapter: RwLock>>, - record_limit: Option, - } - - impl StorePruner for TestStore { - type Key = u64; - type Record = String; - - fn get_adapter( - &self, - ) -> &RwLock>> { - &self.adapter - } - - fn get_max_records(&self) -> Option { - self.record_limit - } - } - - fn get_data(n: u64) -> Vec<(u64, String)> { - let n = min(n, 6); - let words = ["one", "two", "three", "four", "five", "six"]; - let mut values: Vec<(u64, String)> = Vec::new(); - - for index in 0..n { - values.push((index, words[index as usize].to_string())); - } - - values - } - - async fn get_adapter(data_len: u64) -> SQLiteAdapter { - let connection = Connection::open_thread_safe(":memory:").unwrap(); - let mut adapter: SQLiteAdapter = - SQLiteAdapter::new("whatever", Arc::new(connection)).unwrap(); - - for (key, record) in get_data(data_len) { - adapter.store_record(&key, &record).await.unwrap(); - } - - adapter - } - - #[tokio::test] - async fn test_no_prune() { - for data_len in 1_u64..=6 { - let store = TestStore { - adapter: RwLock::new(Box::new(get_adapter(data_len).await)), - record_limit: None, - }; - - store.prune().await.unwrap(); - assert_eq!( - data_len as usize, - store.adapter.read().await.get_iter().await.unwrap().count(), - "test no pruning with dataset length = {data_len}" - ); - } - } - #[tokio::test] - async fn test_with_pruning() { - for data_len in 1_u64..=6 { - let store = TestStore { - adapter: RwLock::new(Box::new(get_adapter(6).await)), - record_limit: Some(data_len as usize), - }; - - store.prune().await.unwrap(); - assert_eq!( - data_len as usize, - store.adapter.read().await.get_iter().await.unwrap().count(), - "test pruning with retention limit = {data_len}" - ); - } - } -} diff --git a/mithril-aggregator/src/database/repository/epoch_settings_store.rs b/mithril-aggregator/src/database/repository/epoch_settings_store.rs index 5101927c088..01161aa83c3 100644 --- a/mithril-aggregator/src/database/repository/epoch_settings_store.rs +++ b/mithril-aggregator/src/database/repository/epoch_settings_store.rs @@ -1,11 +1,11 @@ use std::sync::Arc; +use anyhow::Context; use async_trait::async_trait; use mithril_common::entities::{CardanoTransactionsSigningConfig, Epoch, ProtocolParameters}; use mithril_common::StdResult; use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; -use mithril_persistence::store::adapter::AdapterError; use sqlite::Value; use crate::database::query::{ @@ -66,7 +66,7 @@ impl EpochSettingsStorer for EpochSettingsStore { let epoch_settings_record = self .connection .fetch_first(UpdateEpochSettingsQuery::one(epoch, epoch_settings)) - .map_err(|e| AdapterError::GeneralError(e.context("persist epoch settings failure")))? + .with_context(|| format!("persist epoch settings failure: epoch = {epoch:?}"))? .unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}")); Ok(Some(epoch_settings_record.into())) @@ -83,7 +83,7 @@ impl EpochSettingsStorer for EpochSettingsStore { let mut cursor = self .connection .fetch(GetEpochSettingsQuery::by_epoch(epoch)?) - .map_err(|e| AdapterError::GeneralError(e.context("Could not get epoch settings")))?; + .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?; if let Some(epoch_settings_record) = cursor.next() { return Ok(Some(epoch_settings_record.into())); @@ -104,8 +104,7 @@ impl EpochPruningTask for EpochSettingsStore { self.connection .apply(DeleteEpochSettingsQuery::below_epoch_threshold( epoch - threshold, - )) - .map_err(AdapterError::QueryError)?; + ))?; } Ok(()) } diff --git a/mithril-aggregator/src/database/repository/signer_registration_store.rs b/mithril-aggregator/src/database/repository/signer_registration_store.rs index 0628853732f..851316a7687 100644 --- a/mithril-aggregator/src/database/repository/signer_registration_store.rs +++ b/mithril-aggregator/src/database/repository/signer_registration_store.rs @@ -7,7 +7,6 @@ use async_trait::async_trait; use mithril_common::entities::{Epoch, PartyId, Signer, SignerWithStake}; use mithril_common::StdResult; use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; -use mithril_persistence::store::adapter::AdapterError; use crate::database::query::{ DeleteSignerRegistrationRecordQuery, GetSignerRegistrationRecordQuery, @@ -46,16 +45,14 @@ impl VerificationKeyStorer for SignerRegistrationStore { "Get signer registration record failure with signer_id: '{}', epoch: '{}'", signer.party_id, epoch ) - }) - .map_err(AdapterError::QueryError)?; + })?; let _updated_record = self .connection .fetch_first(InsertOrReplaceSignerRegistrationRecordQuery::one( SignerRegistrationRecord::from_signer_with_stake(signer, epoch), )) - .with_context(|| format!("persist verification key failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("persist verification key failure, epoch: {epoch}"))?; match existing_record { None => Ok(None), @@ -70,8 +67,7 @@ impl VerificationKeyStorer for SignerRegistrationStore { let cursor = self .connection .fetch(GetSignerRegistrationRecordQuery::by_epoch(epoch)?) - .with_context(|| format!("get verification key failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("get verification key failure, epoch: {epoch}"))?; let signer_with_stakes: HashMap = HashMap::from_iter(cursor.map(|record| (record.signer_id.to_owned(), record.into()))); @@ -86,8 +82,7 @@ impl VerificationKeyStorer for SignerRegistrationStore { let cursor = self .connection .fetch(GetSignerRegistrationRecordQuery::by_epoch(epoch)?) - .with_context(|| format!("get verification key failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("get verification key failure, epoch: {epoch}"))?; let signer_with_stakes: Vec = cursor.map(|record| record.into()).collect(); @@ -101,8 +96,7 @@ impl VerificationKeyStorer for SignerRegistrationStore { self.connection .apply(DeleteSignerRegistrationRecordQuery::below_epoch_threshold( max_epoch_to_prune, - )) - .map_err(AdapterError::QueryError)?; + ))?; Ok(()) } diff --git a/mithril-aggregator/src/database/repository/stake_pool_store.rs b/mithril-aggregator/src/database/repository/stake_pool_store.rs index 338d1d006fa..dbb02916616 100644 --- a/mithril-aggregator/src/database/repository/stake_pool_store.rs +++ b/mithril-aggregator/src/database/repository/stake_pool_store.rs @@ -8,7 +8,6 @@ use mithril_common::entities::{Epoch, StakeDistribution}; use mithril_common::signable_builder::StakeDistributionRetriever; use mithril_common::StdResult; use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; -use mithril_persistence::store::adapter::AdapterError; use mithril_persistence::store::StakeStorer; use crate::database::query::{ @@ -51,8 +50,7 @@ impl StakeStorer for StakePoolStore { .map(|(pool_id, stake)| (pool_id, epoch, stake)) .collect(), )) - .with_context(|| format!("persist stakes failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("persist stakes failure, epoch: {epoch}"))?; Ok(Some(StakeDistribution::from_iter( pools.into_iter().map(|p| (p.stake_pool_id, p.stake)), @@ -63,8 +61,7 @@ impl StakeStorer for StakePoolStore { let cursor = self .connection .fetch(GetStakePoolQuery::by_epoch(epoch)?) - .with_context(|| format!("get stakes failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("get stakes failure, epoch: {epoch}"))?; let mut stake_distribution = StakeDistribution::new(); for stake_pool in cursor { @@ -96,8 +93,7 @@ impl EpochPruningTask for StakePoolStore { self.connection .apply(DeleteStakePoolQuery::below_epoch_threshold( epoch - threshold, - )) - .map_err(AdapterError::QueryError)?; + ))?; } Ok(()) } diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 055e58e1ecc..76ff09c12cd 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -509,7 +509,7 @@ impl DependenciesBuilder { ))) } - /// Get a configured [CertificatePendingStore]. + /// Get a configured [CertificatePendingStorer]. pub async fn get_certificate_pending_store( &mut self, ) -> Result> { diff --git a/mithril-aggregator/src/http_server/routes/signer_routes.rs b/mithril-aggregator/src/http_server/routes/signer_routes.rs index eb246fbdacf..5f6c9d9a0b8 100644 --- a/mithril-aggregator/src/http_server/routes/signer_routes.rs +++ b/mithril-aggregator/src/http_server/routes/signer_routes.rs @@ -272,7 +272,6 @@ mod tests { test_utils::MithrilFixtureBuilder, test_utils::{apispec::APISpec, fake_data}, }; - use mithril_persistence::store::adapter::AdapterError; use crate::{ database::{record::SignerRecord, repository::MockSignerGetter}, @@ -617,7 +616,7 @@ mod tests { let mut mock_verification_key_store = MockVerificationKeyStorer::new(); mock_verification_key_store .expect_get_signers() - .return_once(|_| Err(AdapterError::GeneralError(anyhow!("invalid query")).into())); + .return_once(|_| Err(anyhow!("invalid query"))); let mut dependency_manager = initialize_dependencies().await; dependency_manager.verification_key_store = Arc::new(mock_verification_key_store); diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index 291dcf8f1bd..a608173ee69 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -54,7 +54,7 @@ pub use snapshotter::{ CompressedArchiveSnapshotter, DumbSnapshotter, SnapshotError, Snapshotter, SnapshotterCompressionAlgorithm, }; -pub use store::{EpochSettingsStorer, VerificationKeyStorer}; +pub use store::{CertificatePendingStorer, EpochSettingsStorer, VerificationKeyStorer}; pub use tools::{ CExplorerSignerRetriever, SignersImporter, SignersImporterPersister, SignersImporterRetriever, SingleSignatureAuthenticator,