diff --git a/packages/ciphernode/aggregator/src/plaintext_aggregator.rs b/packages/ciphernode/aggregator/src/plaintext_aggregator.rs index d18b8469..efad4d9c 100644 --- a/packages/ciphernode/aggregator/src/plaintext_aggregator.rs +++ b/packages/ciphernode/aggregator/src/plaintext_aggregator.rs @@ -28,20 +28,6 @@ pub enum PlaintextAggregatorState { }, } -// This is required in order to satisfy PersistableData - it is only used to cover an error -// conition where there is no state in the persistable -// TODO: alter Snapshot in order to return a result snapshot -impl Default for PlaintextAggregatorState { - fn default() -> Self { - PlaintextAggregatorState::Collecting { - threshold_m: 0, - shares: OrderedSet::default(), - seed: Seed::default(), - ciphertext_output: vec![], - } - } -} - impl PlaintextAggregatorState { pub fn init(threshold_m: usize, seed: Seed, ciphertext_output: Vec) -> Self { PlaintextAggregatorState::Collecting { diff --git a/packages/ciphernode/evm/src/event_reader.rs b/packages/ciphernode/evm/src/event_reader.rs index 397ad902..684ce392 100644 --- a/packages/ciphernode/evm/src/event_reader.rs +++ b/packages/ciphernode/evm/src/event_reader.rs @@ -8,8 +8,7 @@ use alloy::providers::Provider; use alloy::rpc::types::Filter; use alloy::transports::{BoxTransport, Transport}; use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use data::{AutoPersist, Persistable, Repository, Snapshot}; use enclave_core::{ get_tag, BusError, EnclaveErrorType, EnclaveEvent, EventBus, EventId, Subscribe, }; @@ -50,7 +49,7 @@ where contract_address: Address, start_block: Option, bus: Addr, - repository: Repository, + state: Persistable, } #[derive(Default, serde::Serialize, serde::Deserialize, Clone)] @@ -80,10 +79,8 @@ where start_block: Option, /// Event bus for error propagation bus: Addr, - /// The in memory state of the event reader - state: EvmEventReaderState, - /// Repository to save the state of the event reader - repository: Repository, + /// The auto persistable state of the event reader + state: Persistable, } impl EvmEventReader @@ -101,22 +98,10 @@ where shutdown_tx: Some(shutdown_tx), start_block: params.start_block, bus: params.bus, - state: EvmEventReaderState::default(), - repository: params.repository, + state: params.state, } } - #[instrument(name="evm_event_reader", skip_all, fields(id = get_tag()))] - pub async fn load(params: EvmEventReaderParams) -> Result { - Ok(if let Some(snapshot) = params.repository.read().await? { - info!("Loading from snapshot"); - Self::from_snapshot(params, snapshot).await? - } else { - info!("Loading from params"); - Self::new(params) - }) - } - pub async fn attach( provider: &WithChainId, extractor: ExtractorFn, @@ -125,15 +110,21 @@ where bus: &Addr, repository: &Repository, ) -> Result> { + let sync_state = repository + .clone() + .sync_or_default(EvmEventReaderState::default()) + .await?; + let params = EvmEventReaderParams { provider: provider.clone(), extractor, contract_address: contract_address.parse()?, start_block, bus: bus.clone(), - repository: repository.clone(), + state: sync_state, }; - let addr = EvmEventReader::load(params).await?.start(); + + let addr = EvmEventReader::new(params).start(); bus.do_send(Subscribe::new("Shutdown", addr.clone().into())); @@ -286,14 +277,21 @@ where fn handle(&mut self, wrapped: EnclaveEvmEvent, _: &mut Self::Context) -> Self::Result { let event_id = wrapped.get_id(); info!("Processing event: {}", event_id); - info!("cache length: {}", self.state.ids.len()); - if self.state.ids.contains(&event_id) { + + self.state + .with((), |state| info!("cache length: {}", state.ids.len())); + + if self + .state + .with(false, |state| state.ids.contains(&event_id)) + { error!( "Event id {} has already been seen and was not forwarded to the bus", &event_id ); return; } + let event_type = wrapped.event.event_type(); // Forward everything else to the event bus @@ -301,52 +299,14 @@ where // Save processed ids info!("Storing event(EVM) in cache {}({})", event_type, event_id); - self.state.ids.insert(event_id); - self.state.last_block = wrapped.block; - self.checkpoint(); - } -} -impl Snapshot for EvmEventReader -where - P: Provider + Clone + 'static, - T: Transport + Clone + Unpin, -{ - type Snapshot = EvmEventReaderState; - fn snapshot(&self) -> Result { - Ok(self.state.clone()) - } -} - -impl Checkpoint for EvmEventReader -where - P: Provider + Clone + 'static, - T: Transport + Clone + Unpin, -{ - fn repository(&self) -> &Repository { - &self.repository - } -} - -#[async_trait] -impl FromSnapshotWithParams for EvmEventReader -where - P: Provider + Clone + 'static, - T: Transport + Clone + Unpin, -{ - type Params = EvmEventReaderParams; - async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Ok(Self { - contract_address: params.contract_address, - provider: Some(params.provider), - extractor: params.extractor, - shutdown_rx: Some(shutdown_rx), - shutdown_tx: Some(shutdown_tx), - start_block: params.start_block, - bus: params.bus, - state: snapshot, - repository: params.repository, - }) + match self.state.try_mutate(|mut state| { + state.ids.insert(event_id); + state.last_block = wrapped.block; + Ok(state) + }) { + Ok(_) => (), + Err(err) => self.bus.err(EnclaveErrorType::Evm, err), + } } } diff --git a/packages/ciphernode/sortition/src/sortition.rs b/packages/ciphernode/sortition/src/sortition.rs index e7c0d645..e7f509b1 100644 --- a/packages/ciphernode/sortition/src/sortition.rs +++ b/packages/ciphernode/sortition/src/sortition.rs @@ -2,9 +2,8 @@ use crate::DistanceSortition; use actix::prelude::*; use alloy::primitives::Address; use anyhow::{anyhow, Result}; -use async_trait::async_trait; +use data::Repository; use data::{AutoPersist, Persistable}; -use data::{Checkpoint, FromSnapshotWithParams, RepositoriesFactory, Repository, Snapshot}; use enclave_core::{ get_tag, BusError, CiphernodeAdded, CiphernodeRemoved, EnclaveErrorType, EnclaveEvent, EventBus, Seed, Subscribe,