diff --git a/Cargo.lock b/Cargo.lock index 21fe46d2ef1..673d3f019bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3588,7 +3588,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.5.114" +version = "0.5.115" dependencies = [ "anyhow", "async-trait", @@ -3859,7 +3859,7 @@ dependencies = [ [[package]] name = "mithril-persistence" -version = "0.2.36" +version = "0.2.37" dependencies = [ "anyhow", "async-trait", diff --git a/internal/mithril-persistence/Cargo.toml b/internal/mithril-persistence/Cargo.toml index 0543efe65ad..e1d1a96b44a 100644 --- a/internal/mithril-persistence/Cargo.toml +++ b/internal/mithril-persistence/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-persistence" -version = "0.2.36" +version = "0.2.37" description = "Common types, interfaces, and utilities to persist data for Mithril nodes." authors = { workspace = true } edition = { workspace = true } diff --git a/internal/mithril-persistence/src/store/adapter/dumb_adapter.rs b/internal/mithril-persistence/src/store/adapter/dumb_adapter.rs deleted file mode 100644 index a01148c0beb..00000000000 --- a/internal/mithril-persistence/src/store/adapter/dumb_adapter.rs +++ /dev/null @@ -1,236 +0,0 @@ -use super::{AdapterError, StoreAdapter}; -use anyhow::anyhow; -use async_trait::async_trait; - -/// A [StoreAdapter] that store one fixed data record, for testing purpose. -pub struct DumbStoreAdapter { - last_key: Option, - last_value: Option, - error: Option, -} - -impl DumbStoreAdapter { - /// DumbStoreAdapter factory - pub fn new() -> Self { - Self { - last_key: None, - last_value: None, - error: None, - } - } - - /// DumbStoreAdapter factory that returns error when 'get_record' is called. - pub fn new_failing_adapter(error: &str) -> Self { - Self { - error: Some(error.to_string()), - ..Self::new() - } - } -} - -impl Default for DumbStoreAdapter { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl StoreAdapter for DumbStoreAdapter -where - R: Clone + Send + Sync, - K: PartialEq + Clone + Send + Sync, -{ - type Key = K; - type Record = R; - - async fn store_record( - &mut self, - key: &Self::Key, - record: &Self::Record, - ) -> Result<(), AdapterError> { - let key = key.clone(); - let record = record.clone(); - - self.last_key = Some(key); - self.last_value = Some(record); - - Ok(()) - } - - async fn get_record(&self, key: &Self::Key) -> Result, AdapterError> { - match &self.error { - Some(error) => Err(AdapterError::GeneralError(anyhow!(error.clone()))), - None => { - if self.record_exists(key).await? { - Ok(self.last_value.as_ref().cloned()) - } else { - Ok(None) - } - } - } - } - - async fn record_exists(&self, key: &Self::Key) -> Result { - Ok(self.last_key.is_some() && self.last_key.as_ref().unwrap() == key) - } - - async fn get_last_n_records( - &self, - how_many: usize, - ) -> Result, AdapterError> { - if how_many > 0 { - match &self.last_key { - Some(_key) => Ok(vec![( - self.last_key.as_ref().cloned().unwrap(), - self.last_value.as_ref().cloned().unwrap(), - )]), - None => Ok(Vec::new()), - } - } else { - Ok(Vec::new()) - } - } - - async fn remove(&mut self, key: &Self::Key) -> Result, AdapterError> { - if let Some(record) = self.get_record(key).await? { - self.last_key = None; - self.last_value = None; - - Ok(Some(record)) - } else { - Ok(None) - } - } - - async fn get_iter(&self) -> Result + '_>, AdapterError> { - let mut values = vec![]; - if let Some(value) = &self.last_value { - values.push(value.clone()); - } - Ok(Box::new(values.into_iter())) - } -} - -#[cfg(test)] -mod tests { - - use super::*; - - #[tokio::test] - async fn test_with_no_record_exists() { - let adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - assert!(!adapter.record_exists(&1).await.unwrap()); - } - - #[tokio::test] - async fn test_with_no_record_get() { - let adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - assert!(adapter.get_record(&1).await.unwrap().is_none()); - } - - #[tokio::test] - async fn test_write_record() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - assert!(adapter - .store_record(&1, &"record".to_string()) - .await - .is_ok()); - assert_eq!( - "record".to_owned(), - adapter.get_record(&1).await.unwrap().unwrap() - ); - } - - #[tokio::test] - async fn test_list_with_no_record() { - let adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - assert_eq!(0, adapter.get_last_n_records(10).await.unwrap().len()); - } - - #[tokio::test] - async fn test_list_with_records() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let list = adapter.get_last_n_records(10).await.unwrap(); - - assert_eq!(1, list.len()); - - let (key, record) = &list[0]; - - assert_eq!(&1, key); - assert_eq!(&("record".to_owned()), record); - } - - #[tokio::test] - async fn test_list_with_last_zero() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let list = adapter.get_last_n_records(0).await.unwrap(); - - assert_eq!(0, list.len()); - } - - #[tokio::test] - async fn test_remove_existing_record() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let value = adapter.remove(&1).await.unwrap().unwrap(); - - assert_eq!("record".to_string(), value); - assert!(!adapter.record_exists(&1).await.unwrap()); - } - - #[tokio::test] - async fn test_remove_non_existing_record() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let maybe_record = adapter.remove(&0).await.unwrap(); - - assert!(maybe_record.is_none()); - } - - #[tokio::test] - async fn test_iter_record() { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - adapter - .store_record(&1, &"record".to_string()) - .await - .unwrap(); - let records: Vec = adapter.get_iter().await.unwrap().collect(); - - assert_eq!(vec!["record"], records); - } - - #[tokio::test] - async fn test_iter_without_record() { - let adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - let records = adapter.get_iter().await.unwrap(); - - assert_eq!(0, records.count()); - } - - #[tokio::test] - async fn test_return_error_calling_get_record() { - let adapter: DumbStoreAdapter = - DumbStoreAdapter::new_failing_adapter("error"); - let result = adapter.get_record(&"key".to_string()).await; - - assert!(result.is_err()); - } -} diff --git a/internal/mithril-persistence/src/store/adapter/fail_adapter.rs b/internal/mithril-persistence/src/store/adapter/fail_adapter.rs deleted file mode 100644 index a8638530a95..00000000000 --- a/internal/mithril-persistence/src/store/adapter/fail_adapter.rs +++ /dev/null @@ -1,143 +0,0 @@ -use super::{AdapterError, StoreAdapter}; -use anyhow::anyhow; -use async_trait::async_trait; -use std::marker::PhantomData; - -/// A [StoreAdapter] which always fails, for testing purpose. -pub struct FailStoreAdapter { - key: PhantomData, - certificate: PhantomData, -} - -impl FailStoreAdapter { - /// FailStoreAdapter factory - pub fn new() -> Self { - Self { - key: PhantomData, - certificate: PhantomData, - } - } -} - -impl Default for FailStoreAdapter { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl StoreAdapter for FailStoreAdapter -where - R: Clone + Send + Sync, - K: PartialEq + Clone + Send + Sync, -{ - type Key = K; - type Record = R; - - async fn store_record( - &mut self, - _key: &Self::Key, - _record: &Self::Record, - ) -> Result<(), AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn get_record(&self, _key: &Self::Key) -> Result, AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn record_exists(&self, _key: &Self::Key) -> Result { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn get_last_n_records( - &self, - _how_many: usize, - ) -> Result, AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn remove(&mut self, _key: &Self::Key) -> Result, AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } - - async fn get_iter(&self) -> Result + '_>, AdapterError> { - Err(AdapterError::GeneralError(anyhow!( - "Fail adapter always fails" - ))) - } -} - -#[cfg(test)] -mod tests { - - use super::*; - - #[tokio::test] - async fn test_with_no_record_exists() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - - assert!(adapter.record_exists(&1).await.is_err()); - } - - #[tokio::test] - async fn test_with_no_record_get() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - - assert!(adapter.get_record(&1).await.is_err()); - } - - #[tokio::test] - async fn test_write_record() { - let mut adapter: FailStoreAdapter = FailStoreAdapter::new(); - - assert!(adapter - .store_record(&1, &"record".to_string()) - .await - .is_err()); - } - - #[tokio::test] - async fn test_list() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - - assert!(adapter.get_last_n_records(10).await.is_err()); - } - - #[tokio::test] - async fn test_list_with_records() { - let mut adapter: FailStoreAdapter = FailStoreAdapter::new(); - assert!(adapter - .store_record(&1, &"record".to_string()) - .await - .is_err()); - } - - #[tokio::test] - async fn test_list_with_last_zero() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - assert!(adapter.get_last_n_records(0).await.is_err()); - } - - #[tokio::test] - async fn test_remove_existing_record() { - let mut adapter: FailStoreAdapter = FailStoreAdapter::new(); - assert!(adapter.remove(&0).await.is_err()); - } - - #[tokio::test] - async fn test_get_iter() { - let adapter: FailStoreAdapter = FailStoreAdapter::new(); - assert!(adapter.get_iter().await.is_err()); - } -} diff --git a/internal/mithril-persistence/src/store/adapter/memory_adapter.rs b/internal/mithril-persistence/src/store/adapter/memory_adapter.rs deleted file mode 100644 index 13cbd3cc82c..00000000000 --- a/internal/mithril-persistence/src/store/adapter/memory_adapter.rs +++ /dev/null @@ -1,240 +0,0 @@ -use anyhow::anyhow; -use async_trait::async_trait; -use std::{collections::HashMap, hash::Hash}; - -use super::{AdapterError, StoreAdapter}; - -/// A [StoreAdapter] that store data in memory. -pub struct MemoryAdapter { - index: Vec, - values: HashMap, -} - -impl MemoryAdapter -where - K: Hash + Eq + Send + Sync + Clone, - V: Send + Sync + Clone, -{ - /// MemoryAdapter factory - pub fn new(data: Option>) -> Result { - let data = data.unwrap_or_default(); - let mut values = HashMap::new(); - let mut index = Vec::new(); - - for (idx, elt) in data.into_iter() { - if values.insert(idx.clone(), elt).is_some() { - return Err(AdapterError::InitializationError(anyhow!( - "duplicate key found" - ))); - } - index.push(idx); - } - - Ok(Self { index, values }) - } -} - -#[async_trait] -impl StoreAdapter for MemoryAdapter -where - K: Hash + Eq + Send + Sync + Clone, - V: Send + Sync + Clone, -{ - type Key = K; - type Record = V; - - async fn store_record( - &mut self, - key: &Self::Key, - record: &Self::Record, - ) -> Result<(), AdapterError> { - let key = (*key).clone(); - let record = (*record).clone(); - - if self.values.insert(key.clone(), record).is_none() { - self.index.push(key); - } - - Ok(()) - } - - async fn get_record(&self, key: &Self::Key) -> Result, AdapterError> { - match self.values.get(key) { - Some(val) => Ok(Some(val.clone())), - None => Ok(None), - } - } - - async fn record_exists(&self, key: &Self::Key) -> Result { - Ok(self.values.contains_key(key)) - } - - async fn get_last_n_records( - &self, - how_many: usize, - ) -> Result, AdapterError> { - Ok(self - .index - .iter() - .rev() - .take(how_many) - .map(|k| (k.clone(), self.values.get(k).unwrap().clone())) - .collect()) - } - - async fn remove(&mut self, key: &Self::Key) -> Result, AdapterError> { - self.index.retain(|k| *k != *key); - - Ok(self.values.remove(key)) - } - - async fn get_iter(&self) -> Result + '_>, AdapterError> { - Ok(Box::new( - self.index - .iter() - .rev() - .map(|k| self.values.get(k).unwrap().clone()), - )) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - fn init_adapter(nb: u64) -> MemoryAdapter { - let mut values: Vec<(u64, String)> = Vec::new(); - if nb > 0 { - for ix in 1..=nb { - values.push((ix, format!("value {ix}"))); - } - - MemoryAdapter::new(Some(values)).unwrap() - } else { - MemoryAdapter::new(None).unwrap() - } - } - - #[tokio::test] - async fn record_exists_existing_key() { - let adapter = init_adapter(2); - - assert!(adapter.record_exists(&1).await.unwrap()); - } - #[tokio::test] - async fn record_exists_non_existing_key() { - let adapter = init_adapter(2); - - assert!(!adapter.record_exists(&0).await.unwrap()); - } - - #[tokio::test] - async fn read_existing_record() { - let adapter = init_adapter(2); - let val = adapter.get_record(&2).await.unwrap(); - - assert!(val.is_some()); - assert_eq!("value 2".to_string(), val.unwrap()); - } - - #[tokio::test] - async fn read_unexisting_record() { - let adapter = init_adapter(2); - let val = adapter.get_record(&0).await.unwrap(); - - assert!(val.is_none()); - } - - #[tokio::test] - async fn get_last_n_values() { - let adapter = init_adapter(2); - let vals = adapter.get_last_n_records(5).await.unwrap(); - - assert_eq!(2, vals.len()); - assert_eq!((2, "value 2".to_string()), vals[0]); - } - - #[tokio::test] - async fn get_last_n_existing_values() { - let adapter = init_adapter(5); - let vals = adapter.get_last_n_records(2).await.unwrap(); - - assert_eq!(2, vals.len()); - assert_eq!((5, "value 5".to_string()), vals[0]); - } - - #[tokio::test] - async fn save_new_values() { - let mut adapter = init_adapter(2); - - assert!(adapter.store_record(&10, &"ten".to_string()).await.is_ok()); - let vals = adapter.get_last_n_records(1).await.unwrap(); - - assert_eq!((10, "ten".to_string()), vals[0]); - } - - #[tokio::test] - async fn update_value() { - let mut adapter = init_adapter(2); - - assert!(adapter.store_record(&1, &"one".to_string()).await.is_ok()); - let vals = adapter.get_last_n_records(2).await.unwrap(); - - assert_eq!( - vec![(2, "value 2".to_string()), (1, "one".to_string())], - vals - ); - } - - #[tokio::test] - async fn remove_existing_value() { - let mut adapter = init_adapter(2); - let record = adapter.remove(&1).await.unwrap().unwrap(); - - assert_eq!("value 1".to_string(), record); - assert!(!adapter.record_exists(&1).await.unwrap()); - assert_eq!(1, adapter.index.len()) - } - - #[tokio::test] - async fn remove_non_existing_value() { - let mut adapter = init_adapter(2); - let maybe_record = adapter.remove(&0).await.unwrap(); - - assert!(maybe_record.is_none()); - assert_eq!(2, adapter.index.len()) - } - - #[tokio::test] - async fn test_iter_record() { - let adapter = init_adapter(5); - let records: Vec = adapter.get_iter().await.unwrap().collect(); - - assert_eq!( - vec!["value 5", "value 4", "value 3", "value 2", "value 1"], - records - ); - } - - #[tokio::test] - async fn test_iter_without_record() { - let adapter = init_adapter(0); - let records = adapter.get_iter().await.unwrap(); - - assert_eq!(0, records.count()); - } - - #[tokio::test] - async fn check_get_last_n_modified_records() { - let mut adapter = init_adapter(3); - adapter - .store_record(&1, &"updated record".to_string()) - .await - .unwrap(); - let values = adapter.get_last_n_records(2).await.unwrap(); - assert_eq!( - vec![(3, "value 3".to_string()), (2, "value 2".to_string())], - values - ); - } -} diff --git a/internal/mithril-persistence/src/store/adapter/mod.rs b/internal/mithril-persistence/src/store/adapter/mod.rs deleted file mode 100644 index 548bee87f00..00000000000 --- a/internal/mithril-persistence/src/store/adapter/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! Define a generic way to store data with the [Store Adapter][store_adapter::StoreAdapter], with -//! an adapter [in memory][MemoryAdapter] and another [sqlite][SQLiteAdapter]. - -mod memory_adapter; -mod sqlite_adapter; -mod store_adapter; - -pub use memory_adapter::MemoryAdapter; -pub use sqlite_adapter::{SQLiteAdapter, SQLiteResultIterator}; -pub use store_adapter::*; - -mod dumb_adapter; -pub use dumb_adapter::DumbStoreAdapter; -mod fail_adapter; -pub use fail_adapter::FailStoreAdapter; diff --git a/internal/mithril-persistence/src/store/adapter/sqlite_adapter.rs b/internal/mithril-persistence/src/store/adapter/sqlite_adapter.rs deleted file mode 100644 index e75b6e4a45a..00000000000 --- a/internal/mithril-persistence/src/store/adapter/sqlite_adapter.rs +++ /dev/null @@ -1,481 +0,0 @@ -use anyhow::anyhow; -use async_trait::async_trait; -use serde::{de::DeserializeOwned, Serialize}; -use sha2::{Digest, Sha256}; -use sqlite::{Connection, State, Statement}; -use std::{marker::PhantomData, sync::Arc, thread::sleep, time::Duration}; - -use super::{AdapterError, StoreAdapter}; -use crate::sqlite::SqliteConnection; - -type Result = std::result::Result; - -const DELAY_MS_ON_LOCK: u32 = 50; -const NB_RETRIES_ON_LOCK: u32 = 3; - -/// Store adapter for SQLite3 -pub struct SQLiteAdapter { - connection: Arc, - table: String, - key: PhantomData, - value: PhantomData, -} - -impl SQLiteAdapter -where - K: Serialize, - V: DeserializeOwned, -{ - /// Create a new SQLiteAdapter instance. - pub fn new(table_name: &str, connection: Arc) -> Result { - { - Self::check_table_exists(&connection, table_name)?; - } - - Ok(Self { - connection, - table: table_name.to_owned(), - key: PhantomData, - value: PhantomData, - }) - } - - fn check_table_exists(connection: &Connection, table_name: &str) -> Result<()> { - let sql = format!( - "select exists(select 1 from sqlite_master where type='table' and name='{table_name}')" - ); - let mut statement = connection - .prepare(sql) - .map_err(|e| AdapterError::OpeningStreamError(e.into()))?; - statement - .next() - .map_err(|e| AdapterError::QueryError(e.into()))?; - let table_exists = statement - .read::(0) - .map_err(|e| AdapterError::ParsingDataError(e.into()))?; - - if table_exists != 1 { - Self::create_table(connection, table_name)?; - } - - Ok(()) - } - - fn create_table(connection: &Connection, table_name: &str) -> Result<()> { - let sql = format!( - "create table {table_name} (key_hash text primary key, key json not null, value json not null)" - ); - connection - .execute(sql) - .map_err(|e| AdapterError::QueryError(e.into()))?; - - Ok(()) - } - - fn get_hash_from_key(&self, key: &K) -> Result { - let mut hasher = Sha256::new(); - hasher.update(self.serialize_key(key)?); - let checksum = hasher.finalize(); - - Ok(hex::encode(checksum)) - } - - fn serialize_key(&self, key: &K) -> Result { - serde_json::to_string(&key).map_err(|e| { - AdapterError::GeneralError( - anyhow!(e).context("SQLite adapter: Serde error while serializing store key"), - ) - }) - } - - // Connection must be locked from the calling function to be able to return - // a Statement that references this connection. - fn get_statement_for_key<'conn>( - &'conn self, - connection: &'conn Connection, - sql: String, - key: &K, - ) -> Result { - let mut statement = connection - .prepare(sql) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((1, self.get_hash_from_key(key)?.as_str())) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - - Ok(statement) - } - - fn fetch_maybe_one_value(&self, mut statement: Statement) -> Result> { - let mut retries = Some(NB_RETRIES_ON_LOCK); - let mut result = statement.next(); - - while result.is_err() { - // database is probably locked - // wait and retry strategy - retries = retries.filter(|v| v > &1).map(|v| v - 1); - - if retries.is_none() { - return Err(result - .map_err(|e| AdapterError::ParsingDataError(e.into())) - .unwrap_err()); - } - sleep(Duration::from_millis(DELAY_MS_ON_LOCK as u64)); - result = statement.next(); - } - - if State::Done == result.unwrap() { - return Ok(None); - } - let maybe_value: Option = statement - .read::(0) - .map_err(|e| AdapterError::QueryError(e.into())) - .and_then(|v| { - serde_json::from_str(&v).map_err(|e| AdapterError::ParsingDataError(e.into())) - })?; - - Ok(maybe_value) - } -} - -#[async_trait] -impl StoreAdapter for SQLiteAdapter -where - K: Send + Sync + Serialize + DeserializeOwned, - V: Send + Sync + Serialize + DeserializeOwned, -{ - type Key = K; - type Record = V; - - async fn store_record(&mut self, key: &Self::Key, record: &Self::Record) -> Result<()> { - let sql = format!( - "insert into {} (key_hash, key, value) values (?1, ?2, ?3) on conflict (key_hash) do update set value = excluded.value", - self.table - ); - let value = serde_json::to_string(record).map_err(|e| { - AdapterError::GeneralError( - anyhow!(e) - .context("SQLite adapter error: could not serialize value before insertion"), - ) - })?; - let mut statement = self - .connection - .prepare(sql) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((1, self.get_hash_from_key(key)?.as_str())) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((2, self.serialize_key(key)?.as_str())) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((3, value.as_str())) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - let _ = statement - .next() - .map_err(|e| AdapterError::ParsingDataError(e.into()))?; - - Ok(()) - } - - async fn get_record(&self, key: &Self::Key) -> Result> { - let sql = format!("select value from {} where key_hash = ?1", self.table); - let statement = self.get_statement_for_key(&self.connection, sql, key)?; - - self.fetch_maybe_one_value(statement) - } - - async fn record_exists(&self, key: &Self::Key) -> Result { - let sql = format!( - "select exists(select 1 from {} where key_hash = ?1) as record_exists", - self.table - ); - let mut statement = self.get_statement_for_key(&self.connection, sql, key)?; - statement - .next() - .map_err(|e| AdapterError::QueryError(e.into()))?; - - statement - .read::(0) - .map_err(|e| { - AdapterError::GeneralError( - anyhow!(e).context("There should be a result in this case !"), - ) - }) - .map(|res| res == 1) - } - - async fn get_last_n_records(&self, how_many: usize) -> Result> { - let sql = format!( - "select cast(key as text) as key, cast(value as text) as value from {} order by ROWID desc limit ?1", - self.table - ); - let mut statement = self - .connection - .prepare(sql) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - statement - .bind((1, how_many as i64)) - .map_err(|e| AdapterError::InitializationError(e.into()))?; - let cursor = statement.iter(); - - let results = cursor - .map(|row| { - let row = row.unwrap(); - let key: K = serde_json::from_str(row.read::<&str, _>(0)).unwrap(); - let value: V = serde_json::from_str(row.read::<&str, _>(1)).unwrap(); - - (key, value) - }) - .collect(); - - Ok(results) - } - - async fn remove(&mut self, key: &Self::Key) -> Result> { - let sql = format!( - "delete from {} where key_hash = ?1 returning value", - self.table - ); - let statement = self.get_statement_for_key(&self.connection, sql, key)?; - - self.fetch_maybe_one_value(statement) - } - - async fn get_iter(&self) -> Result + '_>> { - let iterator = SQLiteResultIterator::new(&self.connection, &self.table)?; - - Ok(Box::new(iterator)) - } -} - -/// Iterator over SQLite adapter results. -/// -/// **important:** For now all the results are loaded in memory, it would be better to -/// consume the cursor but this is a quick solution. -pub struct SQLiteResultIterator { - results: Vec, -} - -impl SQLiteResultIterator -where - V: DeserializeOwned, -{ - /// Create a new instance of the iterator. - pub fn new(connection: &Connection, table_name: &str) -> Result> { - let sql = format!("select value from {table_name} order by ROWID asc"); - - let cursor = connection - .prepare(sql) - .map_err(|e| AdapterError::QueryError(e.into()))? - .into_iter(); - - let results = cursor - .map(|row| { - let row = row.unwrap(); - let res: V = serde_json::from_str(row.read::<&str, _>(0)).unwrap(); - - res - }) - .collect(); - - Ok(Self { results }) - } -} - -impl Iterator for SQLiteResultIterator { - type Item = V; - - fn next(&mut self) -> Option { - self.results.pop() - } -} - -#[cfg(test)] -mod tests { - use mithril_common::test_utils::TempDir; - use sqlite::Value; - use std::path::{Path, PathBuf}; - - use super::*; - - const TABLE_NAME: &str = "key_value_store"; - - fn get_file_path(test_name: &str) -> PathBuf { - TempDir::create("sqlite_adapter", test_name).join("db.sqlite3") - } - - fn init_db(db_path: &Path, tablename: Option<&str>) -> SQLiteAdapter { - let tablename = tablename.unwrap_or(TABLE_NAME); - let connection = Connection::open_thread_safe(db_path).unwrap(); - - SQLiteAdapter::new(tablename, Arc::new(connection)).unwrap() - } - - #[tokio::test] - async fn test_store_record() { - let test_name = "test_store_record"; - let filepath = get_file_path(test_name); - let mut adapter = init_db(&filepath, None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - let connection = Connection::open(&filepath).unwrap_or_else(|_| { - panic!( - "Expecting to be able to open SQLite file '{}'.", - filepath.display() - ) - }); - let mut cursor = connection - .prepare(format!("select key_hash, key, value from {TABLE_NAME}")) - .unwrap() - .into_iter(); - let row = cursor - .try_next() - .unwrap() - .expect("Expecting at least one row in the result set."); - assert_eq!(Value::Integer(1), row[1]); - assert_eq!(Value::String("\"one\"".to_string()), row[2]); - - // We must drop the cursor else the db will be locked - drop(cursor); - - adapter.store_record(&1, &"zwei".to_string()).await.unwrap(); - let mut statement = connection - .prepare(format!("select key_hash, key, value from {TABLE_NAME}")) - .unwrap(); - let mut cursor = statement.iter(); - let row = cursor - .try_next() - .unwrap() - .expect("Expecting at least one row in the result set."); - assert_eq!(Value::String("\"zwei\"".to_string()), row[2]); - } - - #[tokio::test] - async fn test_get_record() { - let test_name = "test_get_record"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - adapter - .store_record(&3, &"three".to_string()) - .await - .unwrap(); - assert_eq!( - Some("one".to_string()), - adapter.get_record(&1).await.unwrap() - ); - assert_eq!( - Some("three".to_string()), - adapter.get_record(&3).await.unwrap() - ); - assert_eq!( - Some("two".to_string()), - adapter.get_record(&2).await.unwrap() - ); - assert_eq!(None, adapter.get_record(&4).await.unwrap()); - } - - #[tokio::test] - async fn test_get_iterator() { - let test_name = "test_get_iterator"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - adapter - .store_record(&3, &"three".to_string()) - .await - .unwrap(); - let collection: Vec<(usize, String)> = - adapter.get_iter().await.unwrap().enumerate().collect(); - assert_eq!( - vec![ - (0, "three".to_string()), - (1, "two".to_string()), - (2, "one".to_string()), - ], - collection - ); - } - - #[tokio::test] - async fn test_record_exists() { - let test_name = "test_record_exists"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - - assert!(adapter.record_exists(&1).await.unwrap()); - assert!(adapter.record_exists(&2).await.unwrap()); - assert!(!adapter.record_exists(&3).await.unwrap()); - } - - #[tokio::test] - async fn test_remove() { - let test_name = "test_remove"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - let record = adapter - .remove(&1) - .await - .expect("removing an existing record should not fail") - .expect("removing an existing record should return the deleted record"); - assert_eq!("one".to_string(), record); - let empty = adapter - .remove(&1) - .await - .expect("removing a non existing record should not fail"); - assert!(empty.is_none()); - } - - #[tokio::test] - async fn test_get_last_n_records() { - let test_name = "test_get_last_n_records"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - adapter - .store_record(&3, &"three".to_string()) - .await - .unwrap(); - assert_eq!( - vec![(3_u64, "three".to_string())], - adapter - .get_last_n_records(1) - .await - .expect("get last N records should not fail") - ); - assert_eq!( - vec![ - (3_u64, "three".to_string()), - (2_u64, "two".to_string()), - (1_u64, "one".to_string()), - ], - adapter - .get_last_n_records(5) - .await - .expect("get last N records should not fail") - ); - } - - #[tokio::test] - async fn check_get_last_n_modified_records() { - let test_name = "check_get_last_n_modified_records"; - let mut adapter = init_db(&get_file_path(test_name), None); - adapter.store_record(&1, &"one".to_string()).await.unwrap(); - adapter.store_record(&2, &"two".to_string()).await.unwrap(); - adapter - .store_record(&3, &"three".to_string()) - .await - .unwrap(); - adapter - .store_record(&1, &"updated record".to_string()) - .await - .unwrap(); - let values = adapter.get_last_n_records(2).await.unwrap(); - assert_eq!( - vec![(3, "three".to_string()), (2, "two".to_string())], - values - ); - } -} diff --git a/internal/mithril-persistence/src/store/adapter/store_adapter.rs b/internal/mithril-persistence/src/store/adapter/store_adapter.rs deleted file mode 100644 index 8aedfd42dbc..00000000000 --- a/internal/mithril-persistence/src/store/adapter/store_adapter.rs +++ /dev/null @@ -1,64 +0,0 @@ -use async_trait::async_trait; -use mithril_common::StdError; -use thiserror::Error; - -/// [StoreAdapter] related errors -#[derive(Debug, Error)] -pub enum AdapterError { - /// Generic [StoreAdapter] error. - #[error("something wrong happened")] - GeneralError(#[source] StdError), - - /// Error raised when the store initialization fails. - #[error("problem creating the repository")] - InitializationError(#[source] StdError), - - /// Error raised when the opening of a IO stream fails. - #[error("problem opening the IO stream")] - OpeningStreamError(#[source] StdError), - - /// Error raised when the parsing of a IO stream fails. - #[error("problem parsing the IO stream")] - ParsingDataError(#[source] StdError), - - /// Error while querying the subsystem. - #[error("problem when querying the adapter")] - QueryError(#[source] StdError), -} - -/// Represent a way to store Key/Value pair data. -#[async_trait] -pub trait StoreAdapter: Sync + Send { - /// The key type - type Key; - - /// The record type - type Record; - - /// Store the given `record`. - async fn store_record( - &mut self, - key: &Self::Key, - record: &Self::Record, - ) -> Result<(), AdapterError>; - - /// Get the record stored using the given `key`. - async fn get_record(&self, key: &Self::Key) -> Result, AdapterError>; - - /// Check if a record exist for the given `key`. - async fn record_exists(&self, key: &Self::Key) -> Result; - - /// Get the last `n` records in the store - async fn get_last_n_records( - &self, - how_many: usize, - ) -> Result, AdapterError>; - - /// remove values from store - /// - /// if the value exists it is returned by the adapter otherwise None is returned - async fn remove(&mut self, key: &Self::Key) -> Result, AdapterError>; - - /// Get an iterator over the stored values, from the latest to the oldest. - async fn get_iter(&self) -> Result + '_>, AdapterError>; -} diff --git a/internal/mithril-persistence/src/store/mod.rs b/internal/mithril-persistence/src/store/mod.rs index e4805b92811..64b15cf37bd 100644 --- a/internal/mithril-persistence/src/store/mod.rs +++ b/internal/mithril-persistence/src/store/mod.rs @@ -1,9 +1,5 @@ -//! Define a generic way to store data with the [Store Adapters][adapter], and the [StakeStorer] -//! to store stakes. +//! Define traits of [StakeStorer]. -pub mod adapter; mod stake_store; -mod store_pruner; pub use stake_store::StakeStorer; -pub use store_pruner::StorePruner; diff --git a/internal/mithril-persistence/src/store/store_pruner.rs b/internal/mithril-persistence/src/store/store_pruner.rs deleted file mode 100644 index e95c2daa049..00000000000 --- a/internal/mithril-persistence/src/store/store_pruner.rs +++ /dev/null @@ -1,129 +0,0 @@ -use async_trait::async_trait; -use mithril_common::StdResult; -use tokio::sync::RwLock; - -use super::adapter::StoreAdapter; - -/// Implementing this trait will make store able to limit the number of the -/// stored records by pruning them if a limit is set. -#[async_trait] -pub trait StorePruner { - /// The key type - type Key: Sync + Send; - - /// The record type - type Record: Sync + Send; - - /// This trait requires a way to get the internal adapter. - fn get_adapter(&self) - -> &RwLock>>; - - /// Return the maximum number of elements that can exist in this store. If None, there is no limit. - fn get_max_records(&self) -> Option; - - /// Prune elements exceeding the specified limit. - async fn prune(&self) -> StdResult<()> { - let retention_len = self.get_max_records().unwrap_or(usize::MAX); - let lock = self.get_adapter(); - let mut adapter = lock.write().await; - - for (epoch, _record) in adapter - .get_last_n_records(usize::MAX) - .await? - .into_iter() - .skip(retention_len) - { - adapter.remove(&epoch).await?; - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::{cmp::min, sync::Arc}; - - use sqlite::Connection; - - use crate::store::adapter::SQLiteAdapter; - - use super::*; - - struct TestStore { - adapter: RwLock>>, - record_limit: Option, - } - - impl StorePruner for TestStore { - type Key = u64; - type Record = String; - - fn get_adapter( - &self, - ) -> &RwLock>> { - &self.adapter - } - - fn get_max_records(&self) -> Option { - self.record_limit - } - } - - fn get_data(n: u64) -> Vec<(u64, String)> { - let n = min(n, 6); - let words = ["one", "two", "three", "four", "five", "six"]; - let mut values: Vec<(u64, String)> = Vec::new(); - - for index in 0..n { - values.push((index, words[index as usize].to_string())); - } - - values - } - - async fn get_adapter(data_len: u64) -> SQLiteAdapter { - let connection = Connection::open_thread_safe(":memory:").unwrap(); - let mut adapter: SQLiteAdapter = - SQLiteAdapter::new("whatever", Arc::new(connection)).unwrap(); - - for (key, record) in get_data(data_len) { - adapter.store_record(&key, &record).await.unwrap(); - } - - adapter - } - - #[tokio::test] - async fn test_no_prune() { - for data_len in 1_u64..=6 { - let store = TestStore { - adapter: RwLock::new(Box::new(get_adapter(data_len).await)), - record_limit: None, - }; - - store.prune().await.unwrap(); - assert_eq!( - data_len as usize, - store.adapter.read().await.get_iter().await.unwrap().count(), - "test no pruning with dataset length = {data_len}" - ); - } - } - #[tokio::test] - async fn test_with_pruning() { - for data_len in 1_u64..=6 { - let store = TestStore { - adapter: RwLock::new(Box::new(get_adapter(6).await)), - record_limit: Some(data_len as usize), - }; - - store.prune().await.unwrap(); - assert_eq!( - data_len as usize, - store.adapter.read().await.get_iter().await.unwrap().count(), - "test pruning with retention limit = {data_len}" - ); - } - } -} diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index a84f1ad4764..b7356b9e7cf 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.5.114" +version = "0.5.115" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/database/migration.rs b/mithril-aggregator/src/database/migration.rs index 8a1a6b9bd8c..c1fd946a622 100644 --- a/mithril-aggregator/src/database/migration.rs +++ b/mithril-aggregator/src/database/migration.rs @@ -854,7 +854,31 @@ update certificate r#" insert into signed_entity_type (signed_entity_type_id, name) values (4, 'Cardano Database'); -"#, + "#, + ), + // Migration 33 + // Add the `certificate_pending` table and migration data from the previous + // `certificate_pending` JSON format. + SqlMigration::new( + 33, + r#" +create table new_pending_certificate ( + epoch integer not null, + pending_certificate text not null, + created_at text not null, + primary key (epoch) +); +create table if not exists pending_certificate (key_hash text primary key, key json not null, value json not null); +insert into new_pending_certificate (epoch, pending_certificate, created_at) + select + json_extract(pending_certificate.value, '$.epoch') as epoch, + pending_certificate.value, + strftime('%Y-%m-%dT%H:%M:%fZ', current_timestamp) + from pending_certificate; + +drop table pending_certificate; +alter table new_pending_certificate rename to pending_certificate; + "#, ), ] } diff --git a/mithril-aggregator/src/database/query/mod.rs b/mithril-aggregator/src/database/query/mod.rs index 0af0fe26a14..c30bcbedcdf 100644 --- a/mithril-aggregator/src/database/query/mod.rs +++ b/mithril-aggregator/src/database/query/mod.rs @@ -3,6 +3,7 @@ mod buffered_single_signature; mod certificate; mod epoch_settings; mod open_message; +mod pending_certificate; mod signed_entity; mod signer; mod signer_registration; @@ -13,6 +14,7 @@ pub use buffered_single_signature::*; pub use certificate::*; pub use epoch_settings::*; pub use open_message::*; +pub use pending_certificate::*; pub use signed_entity::*; pub use signer::*; pub use signer_registration::*; diff --git a/mithril-aggregator/src/database/query/pending_certificate/delete_pending_certificate.rs b/mithril-aggregator/src/database/query/pending_certificate/delete_pending_certificate.rs new file mode 100644 index 00000000000..142493e1f8a --- /dev/null +++ b/mithril-aggregator/src/database/query/pending_certificate/delete_pending_certificate.rs @@ -0,0 +1,31 @@ +use mithril_persistence::sqlite::{Query, WhereCondition}; + +use crate::database::record::CertificatePendingRecord; + +/// Query to delete old [CertificatePendingRecord] from the sqlite database +pub struct DeletePendingCertificateRecordQuery { + condition: WhereCondition, +} + +impl DeletePendingCertificateRecordQuery { + pub fn get() -> Self { + Self { + condition: WhereCondition::default(), + } + } +} + +impl Query for DeletePendingCertificateRecordQuery { + type Entity = CertificatePendingRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + let projection = Self::Entity::expand_projection("pending_certificate"); + format!("delete from pending_certificate where {condition} returning {projection}") + } +} diff --git a/mithril-aggregator/src/database/query/pending_certificate/get_pending_certificate.rs b/mithril-aggregator/src/database/query/pending_certificate/get_pending_certificate.rs new file mode 100644 index 00000000000..c2ee5362df7 --- /dev/null +++ b/mithril-aggregator/src/database/query/pending_certificate/get_pending_certificate.rs @@ -0,0 +1,31 @@ +use mithril_persistence::sqlite::{Query, WhereCondition}; + +use crate::database::record::CertificatePendingRecord; + +/// Simple queries to retrieve [CertificatePendingRecord] from the sqlite database. +pub struct GetPendingCertificateRecordQuery { + condition: WhereCondition, +} + +impl GetPendingCertificateRecordQuery { + pub fn get() -> Self { + Self { + condition: WhereCondition::default(), + } + } +} + +impl Query for GetPendingCertificateRecordQuery { + type Entity = CertificatePendingRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + let projection = Self::Entity::expand_projection("pending_certificate"); + format!( + "select {projection} from pending_certificate where {condition} order by ROWID desc" + ) + } +} diff --git a/mithril-aggregator/src/database/query/pending_certificate/mod.rs b/mithril-aggregator/src/database/query/pending_certificate/mod.rs new file mode 100644 index 00000000000..842e1b32498 --- /dev/null +++ b/mithril-aggregator/src/database/query/pending_certificate/mod.rs @@ -0,0 +1,7 @@ +mod delete_pending_certificate; +mod get_pending_certificate; +mod save_pending_certificate; + +pub use delete_pending_certificate::*; +pub use get_pending_certificate::*; +pub use save_pending_certificate::*; diff --git a/mithril-aggregator/src/database/query/pending_certificate/save_pending_certificate.rs b/mithril-aggregator/src/database/query/pending_certificate/save_pending_certificate.rs new file mode 100644 index 00000000000..50984c4f737 --- /dev/null +++ b/mithril-aggregator/src/database/query/pending_certificate/save_pending_certificate.rs @@ -0,0 +1,42 @@ +use chrono::Utc; +use sqlite::Value; + +use mithril_persistence::sqlite::{Query, WhereCondition}; + +use crate::database::record::CertificatePendingRecord; + +/// Query to save [CertificatePendingRecord] in the sqlite database +pub struct SavePendingCertificateRecordQuery { + condition: WhereCondition, +} + +impl SavePendingCertificateRecordQuery { + pub fn save(pending_certificate_record: CertificatePendingRecord) -> Self { + let condition = WhereCondition::new( + "(epoch, pending_certificate, created_at) values (?*, ?*, ?*)", + vec![ + Value::Integer(*pending_certificate_record.epoch as i64), + Value::String(pending_certificate_record.pending_certificate), + Value::String(Utc::now().to_rfc3339()), + ], + ); + + Self { condition } + } +} + +impl Query for SavePendingCertificateRecordQuery { + type Entity = CertificatePendingRecord; + + fn filters(&self) -> WhereCondition { + self.condition.clone() + } + + fn get_definition(&self, condition: &str) -> String { + // it is important to alias the fields with the same name as the table + // since the table cannot be aliased in a RETURNING statement in SQLite. + + let projection = Self::Entity::expand_projection("pending_certificate"); + format!("insert or replace into pending_certificate {condition} returning {projection}") + } +} diff --git a/mithril-aggregator/src/database/record/certificate_pending.rs b/mithril-aggregator/src/database/record/certificate_pending.rs new file mode 100644 index 00000000000..2022dd315f3 --- /dev/null +++ b/mithril-aggregator/src/database/record/certificate_pending.rs @@ -0,0 +1,98 @@ +use chrono::{DateTime, Utc}; +use mithril_common::{ + entities::{CertificatePending, Epoch}, + StdError, +}; +use mithril_persistence::sqlite::{HydrationError, Projection, SourceAlias, SqLiteEntity}; + +/// CertificatePending record is the representation of a stored pending certificate. +pub struct CertificatePendingRecord { + /// Current Epoch + pub epoch: Epoch, + + /// Pending certificate serialization as json + pub pending_certificate: String, + + /// Date and time when the pending certificate was created + pub created_at: DateTime, +} + +impl CertificatePendingRecord { + /// Construct a [Projection] that will allow to hydrate this `CertificatePendingRecord` and expend table alias. + pub fn expand_projection(table: &str) -> String { + let aliases = SourceAlias::new(&[("{:pending_certificate:}", table)]); + Self::get_projection().expand(aliases) + } +} + +impl SqLiteEntity for CertificatePendingRecord { + fn hydrate(row: sqlite::Row) -> Result + where + Self: Sized, + { + let epoch_int = row.read::(0); + let pending_certificate_json = row.read::<&str, _>(1); + let created_at = &row.read::<&str, _>(2); + + let record = Self { + pending_certificate: pending_certificate_json.to_string(), + created_at: DateTime::parse_from_rfc3339(created_at) + .map_err(|e| { + HydrationError::InvalidData(format!( + "Could not turn string '{created_at}' to rfc3339 Datetime. Error: {e}" + )) + })? + .with_timezone(&Utc), + epoch: Epoch(epoch_int.try_into().map_err(|e| { + HydrationError::InvalidData(format!( + "Could not cast i64 ({epoch_int}) to u64. Error: '{e}'" + )) + })?), + }; + + Ok(record) + } + + fn get_projection() -> Projection { + let mut projection = Projection::default(); + + projection.add_field("epoch", "{:pending_certificate:}.epoch", "integer"); + projection.add_field( + "pending_certificate", + "{:pending_certificate:}.pending_certificate", + "text", + ); + projection.add_field("created_at", "{:pending_certificate:}.created_at", "text"); + + projection + } +} + +impl TryFrom for CertificatePendingRecord { + type Error = StdError; + + fn try_from(value: CertificatePending) -> Result { + let record = Self { + epoch: value.epoch, + pending_certificate: serde_json::to_string(&value)?, + created_at: Utc::now(), + }; + Ok(record) + } +} + +impl TryFrom for CertificatePending { + type Error = StdError; + fn try_from(record: CertificatePendingRecord) -> Result { + let c: CertificatePending = serde_json::from_str(&record.pending_certificate)?; + let pending_certificate = Self { + epoch: record.epoch, + signed_entity_type: c.signed_entity_type, + protocol_parameters: c.protocol_parameters, + next_protocol_parameters: c.next_protocol_parameters, + signers: c.signers, + next_signers: c.next_signers, + }; + Ok(pending_certificate) + } +} diff --git a/mithril-aggregator/src/database/record/mod.rs b/mithril-aggregator/src/database/record/mod.rs index 272732d476b..a05f1d8474b 100644 --- a/mithril-aggregator/src/database/record/mod.rs +++ b/mithril-aggregator/src/database/record/mod.rs @@ -2,6 +2,7 @@ mod buffered_single_signature_record; mod certificate; +mod certificate_pending; mod epoch_settings; mod open_message; mod open_message_with_single_signatures; @@ -13,6 +14,7 @@ mod stake_pool; pub use buffered_single_signature_record::*; pub use certificate::*; +pub use certificate_pending::*; pub use epoch_settings::*; pub use open_message::*; pub use open_message_with_single_signatures::*; diff --git a/mithril-aggregator/src/database/repository/epoch_settings_store.rs b/mithril-aggregator/src/database/repository/epoch_settings_store.rs index 5101927c088..35ba3b70aa9 100644 --- a/mithril-aggregator/src/database/repository/epoch_settings_store.rs +++ b/mithril-aggregator/src/database/repository/epoch_settings_store.rs @@ -1,11 +1,11 @@ use std::sync::Arc; +use anyhow::Context; use async_trait::async_trait; use mithril_common::entities::{CardanoTransactionsSigningConfig, Epoch, ProtocolParameters}; use mithril_common::StdResult; use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; -use mithril_persistence::store::adapter::AdapterError; use sqlite::Value; use crate::database::query::{ @@ -66,7 +66,7 @@ impl EpochSettingsStorer for EpochSettingsStore { let epoch_settings_record = self .connection .fetch_first(UpdateEpochSettingsQuery::one(epoch, epoch_settings)) - .map_err(|e| AdapterError::GeneralError(e.context("persist epoch settings failure")))? + .with_context(|| format!("persist epoch settings failure for epoch {epoch:?}"))? .unwrap_or_else(|| panic!("No entity returned by the persister, epoch = {epoch:?}")); Ok(Some(epoch_settings_record.into())) @@ -83,7 +83,7 @@ impl EpochSettingsStorer for EpochSettingsStore { let mut cursor = self .connection .fetch(GetEpochSettingsQuery::by_epoch(epoch)?) - .map_err(|e| AdapterError::GeneralError(e.context("Could not get epoch settings")))?; + .with_context(|| format!("Could not get epoch settings: epoch = {epoch:?}"))?; if let Some(epoch_settings_record) = cursor.next() { return Ok(Some(epoch_settings_record.into())); @@ -104,8 +104,7 @@ impl EpochPruningTask for EpochSettingsStore { self.connection .apply(DeleteEpochSettingsQuery::below_epoch_threshold( epoch - threshold, - )) - .map_err(AdapterError::QueryError)?; + ))?; } Ok(()) } diff --git a/mithril-aggregator/src/database/repository/mod.rs b/mithril-aggregator/src/database/repository/mod.rs index df1bfce3998..b5882a2595f 100644 --- a/mithril-aggregator/src/database/repository/mod.rs +++ b/mithril-aggregator/src/database/repository/mod.rs @@ -4,6 +4,7 @@ mod cardano_transaction_repository; mod certificate_repository; mod epoch_settings_store; mod open_message_repository; +mod pending_certificate_repository; mod signed_entity_store; mod signer_registration_store; mod signer_store; @@ -14,6 +15,7 @@ pub use buffered_single_signature_repository::*; pub use certificate_repository::*; pub use epoch_settings_store::*; pub use open_message_repository::*; +pub use pending_certificate_repository::*; pub use signed_entity_store::*; pub use signer_registration_store::*; pub use signer_store::*; diff --git a/mithril-aggregator/src/database/repository/pending_certificate_repository.rs b/mithril-aggregator/src/database/repository/pending_certificate_repository.rs new file mode 100644 index 00000000000..b393a43fa88 --- /dev/null +++ b/mithril-aggregator/src/database/repository/pending_certificate_repository.rs @@ -0,0 +1,224 @@ +use async_trait::async_trait; +use std::sync::Arc; + +use mithril_common::{entities::CertificatePending, StdResult}; +use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; + +use crate::{ + database::query::{ + DeletePendingCertificateRecordQuery, GetPendingCertificateRecordQuery, + SavePendingCertificateRecordQuery, + }, + store::CertificatePendingStorer, +}; + +/// Pending certificate repository +pub struct CertificatePendingRepository { + connection: Arc, +} + +impl CertificatePendingRepository { + /// Create a new CertificatePendingRepository service + pub fn new(connection: Arc) -> Self { + Self { connection } + } +} + +#[async_trait] +impl CertificatePendingStorer for CertificatePendingRepository { + /// Fetch the current [CertificatePending] if any. + async fn get(&self) -> StdResult> { + self.connection + .fetch_first(GetPendingCertificateRecordQuery::get())? + .map(TryInto::try_into) + .transpose() + } + + /// Save the given [CertificatePending]. + async fn save(&self, pending_certificate: CertificatePending) -> StdResult<()> { + self.connection + .apply(SavePendingCertificateRecordQuery::save( + pending_certificate.try_into()?, + ))?; + + Ok(()) + } + + /// Remove the current [CertificatePending] if any. + async fn remove(&self) -> StdResult> { + self.connection + .fetch_first(DeletePendingCertificateRecordQuery::get())? + .map(TryInto::try_into) + .transpose() + } +} + +#[cfg(test)] +mod test { + + use crate::database::test_helper::{main_db_connection, FakeStoreAdapter}; + + use super::*; + + use mithril_common::entities::{Epoch, SignedEntityType}; + use mithril_common::test_utils::fake_data; + use mithril_persistence::sqlite::ConnectionBuilder; + + async fn get_certificate_pending_store(is_populated: bool) -> CertificatePendingRepository { + let connection = Arc::new(main_db_connection().unwrap()); + + let store = CertificatePendingRepository::new(connection); + if is_populated { + let certificate_pending = CertificatePending::new( + Epoch(0), + SignedEntityType::dummy(), + fake_data::protocol_parameters(), + fake_data::protocol_parameters(), + fake_data::signers(4), + fake_data::signers(5), + ); + + store.save(certificate_pending).await.unwrap(); + } + store + } + + #[tokio::test] + async fn get_certificate_pending_with_existing_certificate() { + let store = get_certificate_pending_store(true).await; + let result = store.get().await.unwrap(); + + assert!(result.is_some()); + } + + #[tokio::test] + async fn get_certificate_pending_with_no_existing_certificate() { + let store = get_certificate_pending_store(false).await; + let result = store.get().await.unwrap(); + + assert!(result.is_none()); + } + + #[tokio::test] + async fn save_certificate_pending_once() { + let store = get_certificate_pending_store(false).await; + let signed_entity_type = SignedEntityType::dummy(); + let certificate_pending = CertificatePending::new( + Epoch(2), + signed_entity_type, + fake_data::protocol_parameters(), + fake_data::protocol_parameters(), + fake_data::signers(1), + fake_data::signers(2), + ); + + assert!(store.save(certificate_pending).await.is_ok()); + assert!(store.get().await.unwrap().is_some()); + } + + #[tokio::test] + async fn update_certificate_pending() { + let store = get_certificate_pending_store(true).await; + let old_certificate_pending = CertificatePending::new( + Epoch(2), + SignedEntityType::MithrilStakeDistribution(Epoch(2)), + fake_data::protocol_parameters(), + fake_data::protocol_parameters(), + fake_data::signers(1), + fake_data::signers(2), + ); + assert!(store.save(old_certificate_pending).await.is_ok()); + + let new_certificate_pending = CertificatePending::new( + Epoch(4), + SignedEntityType::MithrilStakeDistribution(Epoch(4)), + fake_data::protocol_parameters(), + fake_data::protocol_parameters(), + fake_data::signers(3), + fake_data::signers(1), + ); + + assert!(store.save(new_certificate_pending.clone()).await.is_ok()); + + let certificate_pending = store.get().await.unwrap().unwrap(); + assert_eq!(new_certificate_pending, certificate_pending); + } + + #[tokio::test] + async fn remove_certificate_pending() { + let store = get_certificate_pending_store(true).await; + let epoch = Epoch(0); + let certificate_pending = store.remove().await.unwrap().unwrap(); + + assert_eq!(epoch, certificate_pending.epoch); + assert!(store.get().await.unwrap().is_none()); + } + + #[tokio::test] + async fn should_migrate_data_from_adapter() { + let certificate_pending = CertificatePending::new( + Epoch(0), + SignedEntityType::dummy(), + fake_data::protocol_parameters(), + fake_data::protocol_parameters(), + fake_data::signers(4), + fake_data::signers(5), + ); + + let migrations = crate::database::migration::get_migrations(); + + let connection = Arc::new(ConnectionBuilder::open_memory().build().unwrap()); + let pending_certificate_adapter = + FakeStoreAdapter::new(connection.clone(), "pending_certificate"); + pending_certificate_adapter.create_table(); + + ConnectionBuilder::open_memory() + .apply_migrations( + &connection, + migrations + .clone() + .into_iter() + .filter(|m| m.version < 33) + .collect::>(), + ) + .unwrap(); + + assert!(connection + .prepare("select key_hash from pending_certificate;") + .is_ok()); + + // Here we can add some data with the old schema. + pending_certificate_adapter + .store_record( + "Certificate", + &"certificate_pending".to_string(), + &certificate_pending, + ) + .unwrap(); + + assert!(pending_certificate_adapter.has_key_hash("Certificate")); + + // We finish the migration + ConnectionBuilder::open_memory() + .apply_migrations(&connection, migrations) + .unwrap(); + + assert!(connection + .prepare("select key_hash from certificate_pending;") + .is_err()); + assert!(connection + .prepare("select * from pending_certificate;") + .is_ok()); + + let value: i64 = connection + .query_single_cell("select count(*) from pending_certificate", &[]) + .unwrap(); + assert_eq!(value, 1); + + // We can check that data are migrated. + let store = CertificatePendingRepository::new(connection); + let pending_certificate = store.get().await.unwrap(); + + assert_eq!(pending_certificate, Some(certificate_pending)); + } +} diff --git a/mithril-aggregator/src/database/repository/signer_registration_store.rs b/mithril-aggregator/src/database/repository/signer_registration_store.rs index 6b8ee183de5..08af7d66aea 100644 --- a/mithril-aggregator/src/database/repository/signer_registration_store.rs +++ b/mithril-aggregator/src/database/repository/signer_registration_store.rs @@ -7,7 +7,6 @@ use async_trait::async_trait; use mithril_common::entities::{Epoch, PartyId, Signer, SignerWithStake}; use mithril_common::StdResult; use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; -use mithril_persistence::store::adapter::AdapterError; use crate::database::query::{ DeleteSignerRegistrationRecordQuery, GetSignerRegistrationRecordQuery, @@ -46,16 +45,14 @@ impl VerificationKeyStorer for SignerRegistrationStore { "Get signer registration record failure with signer_id: '{}', epoch: '{}'", signer.party_id, epoch ) - }) - .map_err(AdapterError::QueryError)?; + })?; let _updated_record = self .connection .fetch_first(InsertOrReplaceSignerRegistrationRecordQuery::one( SignerRegistrationRecord::from_signer_with_stake(signer, epoch), )) - .with_context(|| format!("persist verification key failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("persist verification key failure, epoch: {epoch}"))?; match existing_record { None => Ok(None), @@ -70,8 +67,7 @@ impl VerificationKeyStorer for SignerRegistrationStore { let cursor = self .connection .fetch(GetSignerRegistrationRecordQuery::by_epoch(epoch)?) - .with_context(|| format!("get verification key failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("get verification key failure, epoch: {epoch}"))?; let signer_with_stakes: HashMap = HashMap::from_iter(cursor.map(|record| (record.signer_id.to_owned(), record.into()))); @@ -86,8 +82,7 @@ impl VerificationKeyStorer for SignerRegistrationStore { let cursor = self .connection .fetch(GetSignerRegistrationRecordQuery::by_epoch(epoch)?) - .with_context(|| format!("get verification key failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("get verification key failure, epoch: {epoch}"))?; let signer_with_stakes: Vec = cursor.map(|record| record.into()).collect(); @@ -98,13 +93,10 @@ impl VerificationKeyStorer for SignerRegistrationStore { } async fn prune_verification_keys(&self, max_epoch_to_prune: Epoch) -> StdResult<()> { - let _deleted_records = self - .connection - .fetch_first( - // we want to prune including the given epoch (+1) - DeleteSignerRegistrationRecordQuery::below_epoch_threshold(max_epoch_to_prune + 1), - ) - .map_err(AdapterError::QueryError)?; + self.connection + .apply(DeleteSignerRegistrationRecordQuery::below_epoch_threshold( + max_epoch_to_prune, + ))?; Ok(()) } @@ -113,10 +105,50 @@ impl VerificationKeyStorer for SignerRegistrationStore { #[cfg(test)] mod tests { use crate::database::test_helper::{insert_signer_registrations, main_db_connection}; - use crate::store::test_verification_key_storer; + + use mithril_common::entities::{Epoch, PartyId, Signer, SignerWithStake}; + use mithril_common::test_utils::fake_keys; + use std::collections::HashMap; + use std::sync::Arc; + + use crate::VerificationKeyStorer; use super::*; + /// Build simple fake signers with stakes. + /// It could be done by `fake_data::signers_with_stakes` which produce verification keys dynamically + /// but take longer. + fn build_fake_signers_with_stakes(nb: u64) -> Vec { + let verification_keys = fake_keys::signer_verification_key(); + let nb_keys = verification_keys.len() as u64; + (1..=nb) + .map(|party_idx| SignerWithStake { + party_id: format!("party_id:{party_idx}"), + verification_key: verification_keys[(party_idx % nb_keys) as usize] + .try_into() + .unwrap(), + verification_key_signature: None, + operational_certificate: None, + kes_period: None, + stake: 10, + }) + .collect() + } + + fn build_signers( + nb_epoch: u64, + signers_per_epoch: usize, + ) -> HashMap> { + (1..=nb_epoch) + .map(|epoch| { + ( + Epoch(epoch), + build_fake_signers_with_stakes(signers_per_epoch as u64), + ) + }) + .collect() + } + fn insert_golden_signer_registration(connection: &SqliteConnection) { connection .execute( @@ -151,20 +183,183 @@ mod tests { } pub fn init_signer_registration_store( - initial_data: Vec<(Epoch, HashMap)>, - ) -> Arc { + initial_data: HashMap>, + ) -> Arc { let connection = main_db_connection().unwrap(); - let initial_data: Vec<(Epoch, Vec)> = initial_data - .into_iter() - .map(|(e, signers)| (e, signers.into_values().collect::>())) - .collect(); + + let initial_data = initial_data.into_iter().collect(); insert_signer_registrations(&connection, initial_data).unwrap(); Arc::new(SignerRegistrationStore::new(Arc::new(connection))) } - test_verification_key_storer!( - test_signer_registration_store => - crate::database::repository::signer_registration_store::tests::init_signer_registration_store - ); + #[tokio::test] + pub async fn save_key_in_empty_store() { + let signers = build_signers(0, 0); + let store = init_signer_registration_store(signers); + let res = store + .save_verification_key( + Epoch(0), + SignerWithStake { + party_id: "0".to_string(), + verification_key: fake_keys::signer_verification_key()[0].try_into().unwrap(), + verification_key_signature: None, + operational_certificate: None, + kes_period: None, + stake: 10, + }, + ) + .await + .unwrap(); + + assert!(res.is_none()); + } + + #[tokio::test] + pub async fn update_signer_in_store() { + let signers = build_signers(1, 1); + let signers_on_epoch = signers.get(&Epoch(1)).unwrap().clone(); + let first_signer = signers_on_epoch.first().unwrap(); + + let store = init_signer_registration_store(signers); + let res = store + .save_verification_key( + Epoch(1), + SignerWithStake { + party_id: first_signer.party_id.clone(), + verification_key: fake_keys::signer_verification_key()[2].try_into().unwrap(), + verification_key_signature: None, + operational_certificate: None, + kes_period: None, + stake: 10, + }, + ) + .await + .unwrap(); + + assert_eq!( + Some(SignerWithStake { + party_id: first_signer.party_id.clone(), + verification_key: fake_keys::signer_verification_key()[2].try_into().unwrap(), + verification_key_signature: None, + operational_certificate: None, + kes_period: None, + stake: 10, + }), + res, + ); + } + + #[tokio::test] + pub async fn get_verification_keys_for_empty_epoch() { + let signers = build_signers(2, 1); + let store = init_signer_registration_store(signers); + let res = store.get_verification_keys(Epoch(0)).await.unwrap(); + + assert!(res.is_none()); + } + + #[tokio::test] + pub async fn get_signers_for_empty_epoch() { + let signers = build_signers(2, 1); + let store = init_signer_registration_store(signers); + let res = store.get_signers(Epoch(0)).await.unwrap(); + + assert!(res.is_none()); + } + + #[tokio::test] + pub async fn get_verification_keys_for_existing_epoch() { + let signers = build_signers(2, 2); + let store = init_signer_registration_store(signers.clone()); + + let epoch = Epoch(1); + let expected_signers = signers + .get(&epoch) + .unwrap() + .iter() + .map(|s| (s.party_id.clone(), Signer::from(s.clone()))) + .collect::>(); + + let res = store.get_verification_keys(epoch).await.unwrap().unwrap(); + + assert_eq!(expected_signers, res); + } + + #[tokio::test] + pub async fn get_signers_for_existing_epoch() { + let signers = build_signers(2, 2); + let store = init_signer_registration_store(signers.clone()); + + let epoch = Epoch(1); + let mut expected_signers = signers.get(&epoch).unwrap().clone(); + expected_signers.sort_by(|a, b| a.party_id.cmp(&b.party_id)); + + let mut res = store.get_signers(epoch).await.unwrap().unwrap(); + res.sort_by(|a, b| a.party_id.cmp(&b.party_id)); + + assert_eq!(expected_signers, res); + } + + #[tokio::test] + pub async fn can_prune_keys_from_given_epoch_retention_limit() { + let signers = build_signers(6, 2); + let store = init_signer_registration_store(signers); + + for epoch in 1..6 { + assert!( + store + .get_verification_keys(Epoch(epoch)) + .await + .unwrap() + .is_some(), + "Keys should exist before pruning" + ); + store + .prune_verification_keys(Epoch(epoch) + 1) + .await + .expect("Pruning should not fail"); + + let pruned_epoch_keys = store.get_verification_keys(Epoch(epoch)).await.unwrap(); + assert_eq!(None, pruned_epoch_keys); + } + } + + async fn get_epochs_in_database_until( + store: &SignerRegistrationStore, + until_epoch: Epoch, + ) -> Vec { + let mut epochs_in_database = vec![]; + for epoch_number in 1..=(*until_epoch) { + let current_epoch = Epoch(epoch_number); + if store + .get_verification_keys(current_epoch) + .await + .unwrap() + .is_some() + { + epochs_in_database.push(current_epoch); + } + } + + epochs_in_database + } + + #[tokio::test] + async fn prune_epoch_older_than_threshold() { + let signers = build_signers(5, 2); + let store = init_signer_registration_store(signers); + + assert_eq!( + vec!(Epoch(1), Epoch(2), Epoch(3), Epoch(4), Epoch(5)), + get_epochs_in_database_until(&store, Epoch(8)).await + ); + + store.prune_verification_keys(Epoch(4)).await.unwrap(); + + assert_eq!( + vec!(Epoch(4), Epoch(5)), + get_epochs_in_database_until(&store, Epoch(8)).await + ); + } } diff --git a/mithril-aggregator/src/database/repository/stake_pool_store.rs b/mithril-aggregator/src/database/repository/stake_pool_store.rs index 338d1d006fa..dbb02916616 100644 --- a/mithril-aggregator/src/database/repository/stake_pool_store.rs +++ b/mithril-aggregator/src/database/repository/stake_pool_store.rs @@ -8,7 +8,6 @@ use mithril_common::entities::{Epoch, StakeDistribution}; use mithril_common::signable_builder::StakeDistributionRetriever; use mithril_common::StdResult; use mithril_persistence::sqlite::{ConnectionExtensions, SqliteConnection}; -use mithril_persistence::store::adapter::AdapterError; use mithril_persistence::store::StakeStorer; use crate::database::query::{ @@ -51,8 +50,7 @@ impl StakeStorer for StakePoolStore { .map(|(pool_id, stake)| (pool_id, epoch, stake)) .collect(), )) - .with_context(|| format!("persist stakes failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("persist stakes failure, epoch: {epoch}"))?; Ok(Some(StakeDistribution::from_iter( pools.into_iter().map(|p| (p.stake_pool_id, p.stake)), @@ -63,8 +61,7 @@ impl StakeStorer for StakePoolStore { let cursor = self .connection .fetch(GetStakePoolQuery::by_epoch(epoch)?) - .with_context(|| format!("get stakes failure, epoch: {epoch}")) - .map_err(AdapterError::GeneralError)?; + .with_context(|| format!("get stakes failure, epoch: {epoch}"))?; let mut stake_distribution = StakeDistribution::new(); for stake_pool in cursor { @@ -96,8 +93,7 @@ impl EpochPruningTask for StakePoolStore { self.connection .apply(DeleteStakePoolQuery::below_epoch_threshold( epoch - threshold, - )) - .map_err(AdapterError::QueryError)?; + ))?; } Ok(()) } diff --git a/mithril-aggregator/src/database/test_helper.rs b/mithril-aggregator/src/database/test_helper.rs index 7bba7fec546..f5bff30ce2c 100644 --- a/mithril-aggregator/src/database/test_helper.rs +++ b/mithril-aggregator/src/database/test_helper.rs @@ -1,6 +1,8 @@ use chrono::Utc; +use serde::Serialize; use sqlite::{ConnectionThreadSafe, Value}; use std::path::Path; +use std::sync::Arc; use uuid::Uuid; use mithril_common::entities::{ @@ -456,3 +458,52 @@ pub fn insert_stake_pool( Ok(()) } + +/// A simple struct that helps initialize a database with the old adapter behavior for testing purposes. +pub struct FakeStoreAdapter { + connection: Arc, + table: &'static str, +} + +impl FakeStoreAdapter { + pub fn new(connection: Arc, table: &'static str) -> Self { + Self { connection, table } + } + + pub fn create_table(&self) { + let sql = format!( + "create table {} (key_hash text primary key, key json not null, value json not null)", + self.table + ); + self.connection.execute(sql).unwrap(); + } + + pub fn has_key_hash(&self, key_hash: &str) -> bool { + let sql = format!( + "select exists(select 1 from {} where key_hash = ?1) as record_exists", + self.table + ); + let parameters = [Value::String(key_hash.to_string())]; + let result: i64 = self.connection.query_single_cell(sql, ¶meters).unwrap(); + result == 1 + } + + pub fn store_record( + &self, + key_hash: &str, + key: &K, + record: &V, + ) -> StdResult<()> { + let sql = format!( + "insert into {} (key_hash, key, value) values (?1, ?2, ?3) on conflict (key_hash) do update set value = excluded.value", + self.table + ); + let mut statement = self.connection.prepare(sql)?; + statement.bind((1, key_hash))?; + statement.bind((2, serde_json::to_string(&key)?.as_str()))?; + statement.bind((3, serde_json::to_string(record)?.as_str()))?; + let _ = statement.next()?; + + Ok(()) + } +} diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 9395bf86010..3576fdafdf4 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -29,7 +29,7 @@ use mithril_common::{ CardanoImmutableDigester, DumbImmutableFileObserver, ImmutableDigester, ImmutableFileObserver, ImmutableFileSystemObserver, }, - entities::{CertificatePending, CompressionAlgorithm, Epoch, SignedEntityTypeDiscriminants}, + entities::{CompressionAlgorithm, Epoch, SignedEntityTypeDiscriminants}, era::{ adapters::{EraReaderAdapterBuilder, EraReaderDummyAdapter}, EraChecker, EraMarker, EraReader, EraReaderAdapter, SupportedEra, @@ -46,7 +46,6 @@ use mithril_common::{ use mithril_persistence::{ database::{repository::CardanoTransactionRepository, ApplicationNodeType, SqlMigration}, sqlite::{ConnectionBuilder, ConnectionOptions, SqliteConnection, SqliteConnectionPool}, - store::adapter::{MemoryAdapter, SQLiteAdapter, StoreAdapter}, }; use super::{DependenciesBuilderError, EpochServiceWrapper, Result}; @@ -57,9 +56,9 @@ use crate::{ }, configuration::ExecutionEnvironment, database::repository::{ - BufferedSingleSignatureRepository, CertificateRepository, EpochSettingsStore, - OpenMessageRepository, SignedEntityStore, SignedEntityStorer, SignerRegistrationStore, - SignerStore, SingleSignatureRepository, StakePoolStore, + BufferedSingleSignatureRepository, CertificatePendingRepository, CertificateRepository, + EpochSettingsStore, OpenMessageRepository, SignedEntityStore, SignedEntityStorer, + SignerRegistrationStore, SignerStore, SingleSignatureRepository, StakePoolStore, }, entities::AggregatorEpochSettings, event_store::{EventMessage, EventStore, TransmitterService}, @@ -71,13 +70,13 @@ use crate::{ MithrilSignedEntityService, MithrilStakeDistributionService, ProverService, SignedEntityService, StakeDistributionService, UpkeepService, UsageReporter, }, + store::CertificatePendingStorer, tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter}, - AggregatorConfig, AggregatorRunner, AggregatorRuntime, CertificatePendingStore, - CompressedArchiveSnapshotter, Configuration, DependencyContainer, DumbSnapshotUploader, - DumbSnapshotter, EpochSettingsStorer, LocalSnapshotUploader, MetricsService, - MithrilSignerRegisterer, MultiSigner, MultiSignerImpl, RemoteSnapshotUploader, - SingleSignatureAuthenticator, SnapshotUploader, SnapshotUploaderType, Snapshotter, - SnapshotterCompressionAlgorithm, VerificationKeyStorer, + AggregatorConfig, AggregatorRunner, AggregatorRuntime, CompressedArchiveSnapshotter, + Configuration, DependencyContainer, DumbSnapshotUploader, DumbSnapshotter, EpochSettingsStorer, + LocalSnapshotUploader, MetricsService, MithrilSignerRegisterer, MultiSigner, MultiSignerImpl, + RemoteSnapshotUploader, SingleSignatureAuthenticator, SnapshotUploader, SnapshotUploaderType, + Snapshotter, SnapshotterCompressionAlgorithm, VerificationKeyStorer, }; const SQLITE_FILE: &str = "aggregator.sqlite3"; @@ -122,7 +121,7 @@ pub struct DependenciesBuilder { pub multi_signer: Option>, /// Certificate pending store. - pub certificate_pending_store: Option>, + pub certificate_pending_store: Option>, /// Certificate repository. pub certificate_repository: Option>, @@ -502,41 +501,20 @@ impl DependenciesBuilder { Ok(self.multi_signer.as_ref().cloned().unwrap()) } - async fn build_certificate_pending_store(&mut self) -> Result> { - let adapter: Box> = match self - .configuration - .environment - { - ExecutionEnvironment::Production => { - let adapter = - SQLiteAdapter::new("pending_certificate", self.get_sqlite_connection().await?) - .map_err(|e| DependenciesBuilderError::Initialization { - message: "Cannot create SQLite adapter for PendingCertificate Store." - .to_string(), - error: Some(e.into()), - })?; - - Box::new(adapter) - } - _ => { - let adapter = MemoryAdapter::new(None).map_err(|e| { - DependenciesBuilderError::Initialization { - message: "Cannot create Memory adapter for PendingCertificate Store." - .to_string(), - error: Some(e.into()), - } - })?; - Box::new(adapter) - } - }; - - Ok(Arc::new(CertificatePendingStore::new(adapter))) + async fn build_certificate_pending_storer( + &mut self, + ) -> Result> { + Ok(Arc::new(CertificatePendingRepository::new( + self.get_sqlite_connection().await?, + ))) } - /// Get a configured [CertificatePendingStore]. - pub async fn get_certificate_pending_store(&mut self) -> Result> { + /// Get a configured [CertificatePendingStorer]. + pub async fn get_certificate_pending_storer( + &mut self, + ) -> Result> { if self.certificate_pending_store.is_none() { - self.certificate_pending_store = Some(self.build_certificate_pending_store().await?); + self.certificate_pending_store = Some(self.build_certificate_pending_storer().await?); } Ok(self.certificate_pending_store.as_ref().cloned().unwrap()) @@ -1337,6 +1315,7 @@ impl DependenciesBuilder { async fn build_upkeep_service(&mut self) -> Result> { let stake_pool_pruning_task = self.get_stake_store().await?; let epoch_settings_pruning_task = self.get_epoch_settings_store().await?; + let mithril_registerer_pruning_task = self.get_mithril_registerer().await?; let upkeep_service = Arc::new(AggregatorUpkeepService::new( self.get_sqlite_connection().await?, @@ -1344,7 +1323,11 @@ impl DependenciesBuilder { .await?, self.get_event_store_sqlite_connection().await?, self.get_signed_entity_lock().await?, - vec![stake_pool_pruning_task, epoch_settings_pruning_task], + vec![ + stake_pool_pruning_task, + epoch_settings_pruning_task, + mithril_registerer_pruning_task, + ], self.root_logger(), )); @@ -1431,7 +1414,7 @@ impl DependenciesBuilder { stake_store: self.get_stake_store().await?, snapshot_uploader: self.get_snapshot_uploader().await?, multi_signer: self.get_multi_signer().await?, - certificate_pending_store: self.get_certificate_pending_store().await?, + certificate_pending_store: self.get_certificate_pending_storer().await?, certificate_repository: self.get_certificate_repository().await?, open_message_repository: self.get_open_message_repository().await?, verification_key_store: self.get_verification_key_store().await?, diff --git a/mithril-aggregator/src/dependency_injection/containers.rs b/mithril-aggregator/src/dependency_injection/containers.rs index 69f27a55a04..5ea4294073a 100644 --- a/mithril-aggregator/src/dependency_injection/containers.rs +++ b/mithril-aggregator/src/dependency_injection/containers.rs @@ -39,9 +39,9 @@ use crate::{ }, signer_registerer::SignerRecorder, snapshot_uploaders::SnapshotUploader, - CertificatePendingStore, EpochSettingsStorer, MetricsService, SignerRegisterer, - SignerRegistrationRoundOpener, SingleSignatureAuthenticator, Snapshotter, - VerificationKeyStorer, + store::CertificatePendingStorer, + EpochSettingsStorer, MetricsService, SignerRegisterer, SignerRegistrationRoundOpener, + SingleSignatureAuthenticator, Snapshotter, VerificationKeyStorer, }; /// EpochServiceWrapper wraps a [EpochService] @@ -77,7 +77,7 @@ pub struct DependencyContainer { pub multi_signer: Arc, /// Certificate pending store. - pub certificate_pending_store: Arc, + pub certificate_pending_store: Arc, /// Certificate store. pub certificate_repository: Arc, diff --git a/mithril-aggregator/src/http_server/routes/certificate_routes.rs b/mithril-aggregator/src/http_server/routes/certificate_routes.rs index 7f9b72550a5..0004fe4e3db 100644 --- a/mithril-aggregator/src/http_server/routes/certificate_routes.rs +++ b/mithril-aggregator/src/http_server/routes/certificate_routes.rs @@ -49,10 +49,10 @@ fn certificate_certificate_hash( } mod handlers { + use crate::store::CertificatePendingStorer; use crate::MetricsService; use crate::{ - http_server::routes::reply, services::MessageService, CertificatePendingStore, - ToCertificatePendingMessageAdapter, + http_server::routes::reply, services::MessageService, ToCertificatePendingMessageAdapter, }; use mithril_common::CardanoNetwork; @@ -67,7 +67,7 @@ mod handlers { pub async fn certificate_pending( logger: Logger, network: CardanoNetwork, - certificate_pending_store: Arc, + certificate_pending_store: Arc, ) -> Result { match certificate_pending_store.get().await { Ok(Some(certificate_pending)) => Ok(reply::json( @@ -126,12 +126,16 @@ mod handlers { #[cfg(test)] mod tests { + use super::*; + use crate::store::MockCertificatePendingStorer; + use crate::{ + http_server::SERVER_BASE_PATH, initialize_dependencies, services::MockMessageService, + }; use anyhow::anyhow; use mithril_common::{ entities::CertificatePending, test_utils::{apispec::APISpec, fake_data}, }; - use mithril_persistence::store::adapter::DumbStoreAdapter; use serde_json::Value::Null; use std::sync::Arc; use warp::{ @@ -139,13 +143,6 @@ mod tests { test::request, }; - use crate::{ - http_server::SERVER_BASE_PATH, initialize_dependencies, services::MockMessageService, - CertificatePendingStore, - }; - - use super::*; - fn setup_router( state: RouterState, ) -> impl Filter + Clone { @@ -230,8 +227,11 @@ mod tests { let path = "/certificate-pending"; let mut dependency_manager = initialize_dependencies().await; - let certificate_pending_store_store = - CertificatePendingStore::new(Box::new(DumbStoreAdapter::new_failing_adapter("error"))); + let mut certificate_pending_store_store = MockCertificatePendingStorer::new(); + certificate_pending_store_store + .expect_get() + .returning(|| Err(anyhow!("error"))); + dependency_manager.certificate_pending_store = Arc::new(certificate_pending_store_store); let response = request() diff --git a/mithril-aggregator/src/http_server/routes/middlewares.rs b/mithril-aggregator/src/http_server/routes/middlewares.rs index 29b25679ccd..be126401610 100644 --- a/mithril-aggregator/src/http_server/routes/middlewares.rs +++ b/mithril-aggregator/src/http_server/routes/middlewares.rs @@ -11,9 +11,9 @@ use crate::event_store::{EventMessage, TransmitterService}; use crate::http_server::routes::http_server_child_logger; use crate::http_server::routes::router::{RouterConfig, RouterState}; use crate::services::{CertifierService, MessageService, ProverService, SignedEntityService}; +use crate::store::CertificatePendingStorer; use crate::{ - CertificatePendingStore, MetricsService, SignerRegisterer, SingleSignatureAuthenticator, - VerificationKeyStorer, + MetricsService, SignerRegisterer, SingleSignatureAuthenticator, VerificationKeyStorer, }; /// Extract a value from the configuration @@ -54,7 +54,7 @@ pub(crate) fn log_route_call( /// With certificate pending store pub(crate) fn with_certificate_pending_store( router_state: &RouterState, -) -> impl Filter,), Error = Infallible> + Clone { +) -> impl Filter,), Error = Infallible> + Clone { let certificate_pending_store = router_state.dependencies.certificate_pending_store.clone(); warp::any().map(move || certificate_pending_store.clone()) } diff --git a/mithril-aggregator/src/http_server/routes/signer_routes.rs b/mithril-aggregator/src/http_server/routes/signer_routes.rs index eb246fbdacf..5f6c9d9a0b8 100644 --- a/mithril-aggregator/src/http_server/routes/signer_routes.rs +++ b/mithril-aggregator/src/http_server/routes/signer_routes.rs @@ -272,7 +272,6 @@ mod tests { test_utils::MithrilFixtureBuilder, test_utils::{apispec::APISpec, fake_data}, }; - use mithril_persistence::store::adapter::AdapterError; use crate::{ database::{record::SignerRecord, repository::MockSignerGetter}, @@ -617,7 +616,7 @@ mod tests { let mut mock_verification_key_store = MockVerificationKeyStorer::new(); mock_verification_key_store .expect_get_signers() - .return_once(|_| Err(AdapterError::GeneralError(anyhow!("invalid query")).into())); + .return_once(|_| Err(anyhow!("invalid query"))); let mut dependency_manager = initialize_dependencies().await; dependency_manager.verification_key_store = Arc::new(mock_verification_key_store); diff --git a/mithril-aggregator/src/lib.rs b/mithril-aggregator/src/lib.rs index b948593208c..a608173ee69 100644 --- a/mithril-aggregator/src/lib.rs +++ b/mithril-aggregator/src/lib.rs @@ -54,9 +54,7 @@ pub use snapshotter::{ CompressedArchiveSnapshotter, DumbSnapshotter, SnapshotError, Snapshotter, SnapshotterCompressionAlgorithm, }; -pub use store::{ - CertificatePendingStore, EpochSettingsStorer, VerificationKeyStore, VerificationKeyStorer, -}; +pub use store::{CertificatePendingStorer, EpochSettingsStorer, VerificationKeyStorer}; pub use tools::{ CExplorerSignerRetriever, SignersImporter, SignersImporterPersister, SignersImporterRetriever, SingleSignatureAuthenticator, diff --git a/mithril-aggregator/src/services/epoch_service.rs b/mithril-aggregator/src/services/epoch_service.rs index 8dff89e6e3d..e4a27c02ee4 100644 --- a/mithril-aggregator/src/services/epoch_service.rs +++ b/mithril-aggregator/src/services/epoch_service.rs @@ -817,19 +817,17 @@ impl EpochService for FakeEpochService { mod tests { use mithril_common::chain_observer::FakeObserver; use mithril_common::entities::{ - BlockNumber, CardanoTransactionsSigningConfig, PartyId, Stake, StakeDistribution, + BlockNumber, CardanoTransactionsSigningConfig, Stake, StakeDistribution, }; use mithril_common::era::SupportedEra; use mithril_common::test_utils::{ fake_data, MithrilFixture, MithrilFixtureBuilder, StakeDistributionGenerationMethod, }; - use mithril_persistence::store::adapter::MemoryAdapter; - use std::collections::{BTreeSet, HashMap}; + use mockall::predicate::eq; use crate::services::MockStakeDistributionService; - use crate::store::FakeEpochSettingsStorer; + use crate::store::{FakeEpochSettingsStorer, MockVerificationKeyStorer}; use crate::test_tools::TestLogger; - use crate::VerificationKeyStore; use super::*; @@ -915,16 +913,6 @@ mod tests { } } - fn map_signers_for_vkey_store( - signers: &[SignerWithStake], - ) -> HashMap { - signers - .iter() - .cloned() - .map(|s| (s.party_id.to_owned(), s)) - .collect() - } - struct EpochServiceBuilder { cardano_transactions_signing_config: CardanoTransactionsSigningConfig, future_protocol_parameters: ProtocolParameters, @@ -988,19 +976,23 @@ mod tests { self.stored_signer_registration_epoch_settings.clone(), ), ]); - let vkey_store = VerificationKeyStore::new(Box::new( - MemoryAdapter::new(Some(vec![ - ( - signer_retrieval_epoch, - map_signers_for_vkey_store(&self.signers_with_stake), - ), - ( - next_signer_retrieval_epoch, - map_signers_for_vkey_store(&self.next_signers_with_stake), - ), - ])) - .unwrap(), - )); + + let verification_key_store = { + let mut store = MockVerificationKeyStorer::new(); + let signers_with_stake = self.signers_with_stake.clone(); + store + .expect_get_signers() + .with(eq(signer_retrieval_epoch)) + .returning(move |_| Ok(Some(signers_with_stake.clone()))); + + let next_signers_with_stake = self.next_signers_with_stake.clone(); + store + .expect_get_signers() + .with(eq(next_signer_retrieval_epoch)) + .returning(move |_| Ok(Some(next_signers_with_stake.clone()))); + store + }; + let chain_observer = FakeObserver::default(); chain_observer.set_current_era(self.cardano_era).await; let era_checker = EraChecker::new(self.mithril_era, Epoch::default()); @@ -1030,7 +1022,7 @@ mod tests { }, EpochServiceDependencies::new( Arc::new(epoch_settings_storer), - Arc::new(vkey_store), + Arc::new(verification_key_store), Arc::new(chain_observer), Arc::new(era_checker), Arc::new(stake_distribution_service), diff --git a/mithril-aggregator/src/signer_registerer.rs b/mithril-aggregator/src/signer_registerer.rs index e314950b180..e7db1130237 100644 --- a/mithril-aggregator/src/signer_registerer.rs +++ b/mithril-aggregator/src/signer_registerer.rs @@ -11,7 +11,7 @@ use mithril_common::{ StdError, StdResult, }; -use crate::VerificationKeyStorer; +use crate::{services::EpochPruningTask, VerificationKeyStorer}; use mithril_common::chain_observer::ChainObserverError; @@ -164,6 +164,26 @@ impl SignerRegistrationRoundOpener for MithrilSignerRegisterer { stake_distribution, }); + Ok(()) + } + + async fn close_registration_round(&self) -> StdResult<()> { + let mut current_round = self.current_round.write().await; + *current_round = None; + + Ok(()) + } +} + +#[async_trait] +impl EpochPruningTask for MithrilSignerRegisterer { + fn pruned_data(&self) -> &'static str { + "Signer registration" + } + + async fn prune(&self, epoch: Epoch) -> StdResult<()> { + let registration_epoch = epoch.offset_to_recording_epoch(); + if let Some(retention_limit) = self.verification_key_epoch_retention_limit { self.verification_key_store .prune_verification_keys(registration_epoch - retention_limit) @@ -173,19 +193,11 @@ impl SignerRegistrationRoundOpener for MithrilSignerRegisterer { "VerificationKeyStorer can not prune verification keys below epoch: '{}'", registration_epoch - retention_limit ) - }) - .map_err(|e| SignerRegistrationError::StoreError(anyhow!(e)))?; + })?; } Ok(()) } - - async fn close_registration_round(&self) -> StdResult<()> { - let mut current_round = self.current_round.write().await; - *current_round = None; - - Ok(()) - } } #[async_trait] @@ -287,23 +299,26 @@ mod tests { use mithril_common::{ chain_observer::FakeObserver, - entities::{Epoch, PartyId, Signer, SignerWithStake}, - test_utils::{fake_data, MithrilFixtureBuilder}, + entities::{Epoch, Signer}, + test_utils::MithrilFixtureBuilder, }; - use mithril_persistence::store::adapter::MemoryAdapter; + use mockall::predicate::eq; use crate::{ + database::{repository::SignerRegistrationStore, test_helper::main_db_connection}, MithrilSignerRegisterer, SignerRegisterer, SignerRegistrationRoundOpener, - VerificationKeyStore, VerificationKeyStorer, + VerificationKeyStorer, }; + use crate::{services::EpochPruningTask, store::MockVerificationKeyStorer}; use super::MockSignerRecorder; #[tokio::test] async fn can_register_signer_if_registration_round_is_opened_with_operational_certificate() { - let verification_key_store = Arc::new(VerificationKeyStore::new(Box::new( - MemoryAdapter::>::new(None).unwrap(), + let verification_key_store = Arc::new(SignerRegistrationStore::new(Arc::new( + main_db_connection().unwrap(), ))); + let mut signer_recorder = MockSignerRecorder::new(); signer_recorder .expect_record_signer_registration() @@ -346,9 +361,10 @@ mod tests { #[tokio::test] async fn can_register_signer_if_registration_round_is_opened_without_operational_certificate() { - let verification_key_store = Arc::new(VerificationKeyStore::new(Box::new( - MemoryAdapter::>::new(None).unwrap(), + let verification_key_store = Arc::new(SignerRegistrationStore::new(Arc::new( + main_db_connection().unwrap(), ))); + let mut signer_recorder = MockSignerRecorder::new(); signer_recorder .expect_record_signer_registration() @@ -394,9 +410,10 @@ mod tests { #[tokio::test] async fn cant_register_signer_if_registration_round_is_not_opened() { - let verification_key_store = Arc::new(VerificationKeyStore::new(Box::new( - MemoryAdapter::>::new(None).unwrap(), + let verification_key_store = Arc::new(SignerRegistrationStore::new(Arc::new( + main_db_connection().unwrap(), ))); + let signer_recorder = MockSignerRecorder::new(); let signer_registerer = MithrilSignerRegisterer::new( Arc::new(FakeObserver::default()), @@ -415,50 +432,44 @@ mod tests { } #[tokio::test] - async fn should_prune_verification_keys_older_than_two_epochs_at_round_opening() { - let initial_keys = (1..=5) - .map(|epoch| { - let signers: HashMap = HashMap::from_iter( - fake_data::signers_with_stakes(1) - .into_iter() - .map(|s| (s.party_id.to_owned(), s)), - ); - (Epoch(epoch), signers) - }) - .collect(); - let verification_key_store = Arc::new(VerificationKeyStore::new(Box::new( - MemoryAdapter::>::new(Some(initial_keys)) - .unwrap(), - ))); - let signer_recorder = MockSignerRecorder::new(); + async fn mock_prune_epoch_older_than_threshold() { + const PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD: u64 = 10; + let retention_limit = Some(PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD); + + let mut verification_key_store = MockVerificationKeyStorer::new(); + verification_key_store + .expect_prune_verification_keys() + .with(eq(Epoch(4).offset_to_recording_epoch())) + .times(1) + .returning(|_| Ok(())); + let signer_registerer = MithrilSignerRegisterer::new( Arc::new(FakeObserver::default()), - verification_key_store.clone(), - Arc::new(signer_recorder), - Some(2), + Arc::new(verification_key_store), + Arc::new(MockSignerRecorder::new()), + retention_limit, ); - let fixture = MithrilFixtureBuilder::default().with_signers(5).build(); - signer_registerer - .open_registration_round(Epoch(5), fixture.stake_distribution()) - .await - .expect("Opening a registration round should not fail"); + let current_epoch = Epoch(4) + PROTOCOL_INITIALIZER_PRUNE_EPOCH_THRESHOLD; + signer_registerer.prune(current_epoch).await.unwrap(); + } - for epoch in 1..=3 { - let verification_keys = verification_key_store - .get_verification_keys(Epoch(epoch)) - .await - .unwrap(); - assert_eq!(None, verification_keys); - } + #[tokio::test] + async fn mock_without_threshold_nothing_is_pruned() { + let retention_limit = None; - let verification_keys = verification_key_store - .get_verification_keys(Epoch(4)) - .await - .unwrap(); - assert!( - verification_keys.is_some(), - "Verification keys of the previous epoch should not have been pruned" + let mut verification_key_store = MockVerificationKeyStorer::new(); + verification_key_store + .expect_prune_verification_keys() + .never(); + + let signer_registerer = MithrilSignerRegisterer::new( + Arc::new(FakeObserver::default()), + Arc::new(verification_key_store), + Arc::new(MockSignerRecorder::new()), + retention_limit, ); + + signer_registerer.prune(Epoch(100)).await.unwrap(); } } diff --git a/mithril-aggregator/src/store/mod.rs b/mithril-aggregator/src/store/mod.rs index 7650c223665..cf9f848361a 100644 --- a/mithril-aggregator/src/store/mod.rs +++ b/mithril-aggregator/src/store/mod.rs @@ -3,14 +3,10 @@ mod pending_certificate_store; mod verification_key_store; pub use epoch_settings_storer::EpochSettingsStorer; -pub use pending_certificate_store::CertificatePendingStore; -pub use verification_key_store::{VerificationKeyStore, VerificationKeyStorer}; +pub use pending_certificate_store::*; +pub use verification_key_store::VerificationKeyStorer; #[cfg(test)] pub use epoch_settings_storer::FakeEpochSettingsStorer; #[cfg(test)] -pub use verification_key_store::test_suite as verification_key_store_test_suite; -#[cfg(test)] -pub(crate) use verification_key_store::test_verification_key_storer; -#[cfg(test)] pub use verification_key_store::MockVerificationKeyStorer; diff --git a/mithril-aggregator/src/store/pending_certificate_store.rs b/mithril-aggregator/src/store/pending_certificate_store.rs index 21dcdcad948..992510a378e 100644 --- a/mithril-aggregator/src/store/pending_certificate_store.rs +++ b/mithril-aggregator/src/store/pending_certificate_store.rs @@ -1,141 +1,18 @@ -use anyhow::{anyhow, Context}; +use async_trait::async_trait; use mithril_common::StdResult; -use tokio::sync::RwLock; use mithril_common::entities::CertificatePending; -use mithril_persistence::store::adapter::StoreAdapter; - -type Adapter = Box>; - -const KEY: &str = "certificate_pending"; /// Store for [CertificatePending]. -pub struct CertificatePendingStore { - adapter: RwLock, -} - -impl CertificatePendingStore { - /// Create a new instance. - pub fn new(adapter: Adapter) -> Self { - Self { - adapter: RwLock::new(adapter), - } - } - +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait CertificatePendingStorer: Sync + Send { /// Fetch the current [CertificatePending] if any. - pub async fn get(&self) -> StdResult> { - self.adapter - .read() - .await - .get_record(&KEY.to_string()) - .await - .with_context(|| "Certificate pending store: could not GET store.".to_string()) - } + async fn get(&self) -> StdResult>; /// Save the given [CertificatePending]. - pub async fn save(&self, certificate: CertificatePending) -> StdResult<()> { - self - .adapter - .write() - .await - .store_record(&KEY.to_string(), &certificate) - .await - .with_context(|| format!("Certificate pending store: error while saving pending certificate for epoch '{}'.", certificate.epoch)) - } + async fn save(&self, certificate: CertificatePending) -> StdResult<()>; /// Remove and return the current [CertificatePending] if any. - pub async fn remove(&self) -> StdResult> { - self.adapter - .write() - .await - .remove(&KEY.to_string()) - .await - .map_err(|e| anyhow!(e)) - .with_context(|| { - format!( - "Could not delete certificate pending (key = '{}') from store.", - &KEY - ) - }) - } -} - -#[cfg(test)] -mod test { - use super::*; - - use mithril_common::entities::{Epoch, SignedEntityType}; - use mithril_common::test_utils::fake_data; - use mithril_persistence::store::adapter::DumbStoreAdapter; - - async fn get_certificate_pending_store(is_populated: bool) -> CertificatePendingStore { - let mut adapter: DumbStoreAdapter = DumbStoreAdapter::new(); - - if is_populated { - let certificate_pending = CertificatePending::new( - Epoch(0), - SignedEntityType::dummy(), - fake_data::protocol_parameters(), - fake_data::protocol_parameters(), - fake_data::signers(4), - fake_data::signers(5), - ); - adapter - .store_record(&KEY.to_string(), &certificate_pending) - .await - .unwrap(); - } - CertificatePendingStore::new(Box::new(adapter)) - } - - #[tokio::test] - async fn get_certificate_pending_with_existing_certificate() { - let store = get_certificate_pending_store(true).await; - let result = store.get().await.unwrap(); - - assert!(result.is_some()); - } - - #[tokio::test] - async fn get_certificate_pending_with_no_existing_certificate() { - let store = get_certificate_pending_store(false).await; - let result = store.get().await.unwrap(); - - assert!(result.is_none()); - } - - #[tokio::test] - async fn save_certificate_pending_once() { - let store = get_certificate_pending_store(false).await; - let signed_entity_type = SignedEntityType::dummy(); - let certificate_pending = CertificatePending::new( - Epoch(2), - signed_entity_type, - fake_data::protocol_parameters(), - fake_data::protocol_parameters(), - fake_data::signers(1), - fake_data::signers(2), - ); - - assert!(store.save(certificate_pending).await.is_ok()); - assert!(store.get().await.unwrap().is_some()); - } - - #[tokio::test] - async fn update_certificate_pending() { - let store = get_certificate_pending_store(true).await; - let certificate_pending = store.get().await.unwrap().unwrap(); - - assert!(store.save(certificate_pending).await.is_ok()); - } - - #[tokio::test] - async fn remove_certificate_pending() { - let store = get_certificate_pending_store(true).await; - let epoch = Epoch(0); - let certificate_pending = store.remove().await.unwrap().unwrap(); - - assert_eq!(epoch, certificate_pending.epoch); - assert!(store.get().await.unwrap().is_none()); - } + async fn remove(&self) -> StdResult>; } diff --git a/mithril-aggregator/src/store/verification_key_store.rs b/mithril-aggregator/src/store/verification_key_store.rs index dd8968f9668..fa3b57b689d 100644 --- a/mithril-aggregator/src/store/verification_key_store.rs +++ b/mithril-aggregator/src/store/verification_key_store.rs @@ -1,13 +1,8 @@ -use anyhow::Context; use async_trait::async_trait; use mithril_common::StdResult; use std::collections::HashMap; -use tokio::sync::RwLock; use mithril_common::entities::{Epoch, PartyId, Signer, SignerWithStake}; -use mithril_persistence::store::adapter::StoreAdapter; - -type Adapter = Box>>; /// Store and get signers verification keys for given epoch. /// @@ -36,343 +31,3 @@ pub trait VerificationKeyStorer: Sync + Send { /// Prune all verification keys that are at or below the given epoch. async fn prune_verification_keys(&self, max_epoch_to_prune: Epoch) -> StdResult<()>; } - -/// Store for the `VerificationKey`. -pub struct VerificationKeyStore { - adapter: RwLock, -} - -impl VerificationKeyStore { - /// Create a new instance. - pub fn new(adapter: Adapter) -> Self { - Self { - adapter: RwLock::new(adapter), - } - } -} - -#[async_trait] -impl VerificationKeyStorer for VerificationKeyStore { - async fn save_verification_key( - &self, - epoch: Epoch, - signer: SignerWithStake, - ) -> StdResult> { - let mut signers = self - .adapter - .read() - .await - .get_record(&epoch) - .await? - .unwrap_or_default(); - let prev_signer = signers.insert(signer.party_id.to_owned(), signer.clone()); - self.adapter - .write() - .await - .store_record(&epoch, &signers) - .await?; - - Ok(prev_signer) - } - - async fn get_verification_keys( - &self, - epoch: Epoch, - ) -> StdResult>> { - let record = self - .adapter - .read() - .await - .get_record(&epoch) - .await - .with_context(|| format!("Could not get verification keys for epoch {epoch}."))?; - - Ok(record.map(|h| h.into_iter().map(|(k, v)| (k, v.into())).collect())) - } - - async fn get_signers(&self, epoch: Epoch) -> StdResult>> { - let record = self - .adapter - .read() - .await - .get_record(&epoch) - .await - .with_context(|| format!("Could not get signers for epoch {epoch}."))?; - - Ok(record.map(|h| h.into_values().collect())) - } - - async fn prune_verification_keys(&self, max_epoch_to_prune: Epoch) -> StdResult<()> { - let mut adapter = self.adapter.write().await; - - for (epoch, _record) in adapter - .get_last_n_records(usize::MAX) - .await - .with_context(|| { - "Pruning verification keys: could not read last records from database".to_string() - })? - .into_iter() - .filter(|(e, _)| e <= &max_epoch_to_prune) - { - adapter.remove(&epoch).await - .with_context(|| format!("Pruning verification keys: could not remove record for epoch '{epoch}' from the database."))?; - } - - Ok(()) - } -} - -/// Macro that generate tests that a [VerificationKeyStorer] must pass -#[cfg(test)] -macro_rules! test_verification_key_storer { - ($suit_name:ident => $store_builder:expr) => { - #[cfg(test)] - mod $suit_name { - use crate::store::verification_key_store_test_suite as test_suite; - - #[tokio::test] - async fn save_key_in_empty_store() { - test_suite::save_key_in_empty_store(&$store_builder).await; - } - - #[tokio::test] - async fn update_signer_in_store() { - test_suite::update_signer_in_store(&$store_builder).await; - } - - #[tokio::test] - async fn get_verification_keys_for_empty_epoch() { - test_suite::get_verification_keys_for_empty_epoch(&$store_builder).await; - } - - #[tokio::test] - async fn get_signers_for_empty_epoch() { - test_suite::get_signers_for_empty_epoch(&$store_builder).await; - } - - #[tokio::test] - async fn get_verification_keys_for_existing_epoch() { - test_suite::get_verification_keys_for_existing_epoch(&$store_builder).await; - } - - #[tokio::test] - async fn get_signers_for_existing_epoch() { - test_suite::get_signers_for_existing_epoch(&$store_builder).await; - } - - #[tokio::test] - async fn can_prune_keys_from_given_epoch_retention_limit() { - test_suite::can_prune_keys_from_given_epoch_retention_limit(&$store_builder).await; - } - } - }; -} - -#[cfg(test)] -pub(crate) use test_verification_key_storer; - -#[macro_use] -#[cfg(test)] -pub mod test_suite { - use mithril_common::entities::{Epoch, PartyId, Signer, SignerWithStake}; - use mithril_common::test_utils::fake_keys; - use std::collections::{BTreeMap, BTreeSet, HashMap}; - use std::sync::Arc; - - use crate::VerificationKeyStorer; - - /// A builder of [VerificationKeyStorer], the arguments are: - /// * initial_data - type StoreBuilder = - dyn Fn(Vec<(Epoch, HashMap)>) -> Arc; - - fn build_signers( - nb_epoch: u64, - signers_per_epoch: usize, - ) -> Vec<(Epoch, HashMap)> { - let mut values = vec![]; - - for epoch in 1..=nb_epoch { - let mut signers: HashMap = - HashMap::with_capacity(signers_per_epoch); - - for party_idx in 1..=signers_per_epoch { - let party_id = format!("party_id:e{epoch}:{party_idx}"); - signers.insert( - party_id.clone(), - SignerWithStake { - party_id: party_id.clone(), - verification_key: fake_keys::signer_verification_key()[0] - .try_into() - .unwrap(), - verification_key_signature: None, - operational_certificate: None, - kes_period: None, - stake: 10, - }, - ); - } - values.push((Epoch(epoch), signers)); - } - - values - } - - pub async fn save_key_in_empty_store(store_builder: &StoreBuilder) { - let signers = build_signers(0, 0); - let store = store_builder(signers); - let res = store - .save_verification_key( - Epoch(0), - SignerWithStake { - party_id: "0".to_string(), - verification_key: fake_keys::signer_verification_key()[0].try_into().unwrap(), - verification_key_signature: None, - operational_certificate: None, - kes_period: None, - stake: 10, - }, - ) - .await - .unwrap(); - - assert!(res.is_none()); - } - - pub async fn update_signer_in_store(store_builder: &StoreBuilder) { - let signers = build_signers(1, 1); - let store = store_builder(signers); - let res = store - .save_verification_key( - Epoch(1), - SignerWithStake { - party_id: "party_id:e1:1".to_string(), - verification_key: fake_keys::signer_verification_key()[2].try_into().unwrap(), - verification_key_signature: None, - operational_certificate: None, - kes_period: None, - stake: 10, - }, - ) - .await - .unwrap(); - - assert_eq!( - Some(SignerWithStake { - party_id: "party_id:e1:1".to_string(), - verification_key: fake_keys::signer_verification_key()[2].try_into().unwrap(), - verification_key_signature: None, - operational_certificate: None, - kes_period: None, - stake: 10, - }), - res, - ); - } - - pub async fn get_verification_keys_for_empty_epoch(store_builder: &StoreBuilder) { - let signers = build_signers(2, 1); - let store = store_builder(signers); - let res = store.get_verification_keys(Epoch(0)).await.unwrap(); - - assert!(res.is_none()); - } - - pub async fn get_signers_for_empty_epoch(store_builder: &StoreBuilder) { - let signers = build_signers(2, 1); - let store = store_builder(signers); - let res = store.get_signers(Epoch(0)).await.unwrap(); - - assert!(res.is_none()); - } - - pub async fn get_verification_keys_for_existing_epoch(store_builder: &StoreBuilder) { - let signers = build_signers(2, 2); - let store = store_builder(signers.clone()); - - let expected_signers: Option> = signers - .into_iter() - .filter(|(e, _)| e == 1) - .map(|(_, signers)| { - BTreeMap::from_iter(signers.into_iter().map(|(p, s)| (p, s.into()))) - }) - .next(); - let res = store - .get_verification_keys(Epoch(1)) - .await - .unwrap() - .map(|x| BTreeMap::from_iter(x.into_iter())); - - assert_eq!(expected_signers, res); - } - - pub async fn get_signers_for_existing_epoch(store_builder: &StoreBuilder) { - let signers = build_signers(2, 2); - let store = store_builder(signers.clone()); - - let expected_signers: Option> = signers - .into_iter() - .filter(|(e, _)| e == 1) - .map(|(_, signers)| BTreeSet::from_iter(signers.into_values())) - .next(); - let res = store - .get_signers(Epoch(1)) - .await - .unwrap() - .map(|x| BTreeSet::from_iter(x.into_iter())); - - assert_eq!(expected_signers, res); - } - - pub async fn can_prune_keys_from_given_epoch_retention_limit(store_builder: &StoreBuilder) { - let signers = build_signers(6, 2); - let store = store_builder(signers); - - for epoch in 1..6 { - assert!( - store - .get_verification_keys(Epoch(epoch)) - .await - .unwrap() - .is_some(), - "Keys should exist before pruning" - ); - store - .prune_verification_keys(Epoch(epoch)) - .await - .expect("Pruning should not fail"); - - let pruned_epoch_keys = store.get_verification_keys(Epoch(epoch)).await.unwrap(); - assert_eq!(None, pruned_epoch_keys); - } - } -} - -#[cfg(test)] -mod tests { - use mithril_common::entities::{Epoch, PartyId, SignerWithStake}; - use mithril_persistence::store::adapter::MemoryAdapter; - use std::{collections::HashMap, sync::Arc}; - - use crate::{VerificationKeyStore, VerificationKeyStorer}; - - pub fn init_store( - initial_data: Vec<(Epoch, HashMap)>, - ) -> Arc { - let values = if initial_data.is_empty() { - None - } else { - Some(initial_data) - }; - - let adapter: MemoryAdapter> = - MemoryAdapter::new(values).unwrap(); - - Arc::new(VerificationKeyStore::new(Box::new(adapter))) - } - - test_verification_key_storer!( - test_verification_key_store => - crate::store::verification_key_store::tests::init_store - ); -}