Skip to content

Commit

Permalink
Use persistable in evm_reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ryardley committed Nov 28, 2024
1 parent 3b3e07d commit f781c8a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 86 deletions.
14 changes: 0 additions & 14 deletions packages/ciphernode/aggregator/src/plaintext_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,6 @@ pub enum PlaintextAggregatorState {
},
}

// This is required in order to satisfy PersistableData - it is only used to cover an error
// conition where there is no state in the persistable
// TODO: alter Snapshot in order to return a result snapshot
impl Default for PlaintextAggregatorState {
fn default() -> Self {
PlaintextAggregatorState::Collecting {
threshold_m: 0,
shares: OrderedSet::default(),
seed: Seed::default(),
ciphertext_output: vec![],
}
}
}

impl PlaintextAggregatorState {
pub fn init(threshold_m: usize, seed: Seed, ciphertext_output: Vec<u8>) -> Self {
PlaintextAggregatorState::Collecting {
Expand Down
100 changes: 30 additions & 70 deletions packages/ciphernode/evm/src/event_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use alloy::providers::Provider;
use alloy::rpc::types::Filter;
use alloy::transports::{BoxTransport, Transport};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot};
use data::{AutoPersist, Persistable, Repository, Snapshot};
use enclave_core::{
get_tag, BusError, EnclaveErrorType, EnclaveEvent, EventBus, EventId, Subscribe,
};
Expand Down Expand Up @@ -50,7 +49,7 @@ where
contract_address: Address,
start_block: Option<u64>,
bus: Addr<EventBus>,
repository: Repository<EvmEventReaderState>,
state: Persistable<EvmEventReaderState>,
}

#[derive(Default, serde::Serialize, serde::Deserialize, Clone)]
Expand Down Expand Up @@ -80,10 +79,8 @@ where
start_block: Option<u64>,
/// Event bus for error propagation
bus: Addr<EventBus>,
/// The in memory state of the event reader
state: EvmEventReaderState,
/// Repository to save the state of the event reader
repository: Repository<EvmEventReaderState>,
/// The auto persistable state of the event reader
state: Persistable<EvmEventReaderState>,
}

impl<P, T> EvmEventReader<P, T>
Expand All @@ -101,22 +98,10 @@ where
shutdown_tx: Some(shutdown_tx),
start_block: params.start_block,
bus: params.bus,
state: EvmEventReaderState::default(),
repository: params.repository,
state: params.state,
}
}

#[instrument(name="evm_event_reader", skip_all, fields(id = get_tag()))]
pub async fn load(params: EvmEventReaderParams<P, T>) -> Result<Self> {
Ok(if let Some(snapshot) = params.repository.read().await? {
info!("Loading from snapshot");
Self::from_snapshot(params, snapshot).await?
} else {
info!("Loading from params");
Self::new(params)
})
}

pub async fn attach(
provider: &WithChainId<P, T>,
extractor: ExtractorFn<EnclaveEvent>,
Expand All @@ -125,15 +110,21 @@ where
bus: &Addr<EventBus>,
repository: &Repository<EvmEventReaderState>,
) -> Result<Addr<Self>> {
let sync_state = repository
.clone()
.sync_or_default(EvmEventReaderState::default())
.await?;

let params = EvmEventReaderParams {
provider: provider.clone(),
extractor,
contract_address: contract_address.parse()?,
start_block,
bus: bus.clone(),
repository: repository.clone(),
state: sync_state,
};
let addr = EvmEventReader::load(params).await?.start();

let addr = EvmEventReader::new(params).start();

bus.do_send(Subscribe::new("Shutdown", addr.clone().into()));

Expand Down Expand Up @@ -286,67 +277,36 @@ where
fn handle(&mut self, wrapped: EnclaveEvmEvent, _: &mut Self::Context) -> Self::Result {
let event_id = wrapped.get_id();
info!("Processing event: {}", event_id);
info!("cache length: {}", self.state.ids.len());
if self.state.ids.contains(&event_id) {

self.state
.with((), |state| info!("cache length: {}", state.ids.len()));

if self
.state
.with(false, |state| state.ids.contains(&event_id))
{
error!(
"Event id {} has already been seen and was not forwarded to the bus",
&event_id
);
return;
}

let event_type = wrapped.event.event_type();

// Forward everything else to the event bus
self.bus.do_send(wrapped.event);

// Save processed ids
info!("Storing event(EVM) in cache {}({})", event_type, event_id);
self.state.ids.insert(event_id);
self.state.last_block = wrapped.block;
self.checkpoint();
}
}

impl<P, T> Snapshot for EvmEventReader<P, T>
where
P: Provider<T> + Clone + 'static,
T: Transport + Clone + Unpin,
{
type Snapshot = EvmEventReaderState;
fn snapshot(&self) -> Result<Self::Snapshot> {
Ok(self.state.clone())
}
}

impl<P, T> Checkpoint for EvmEventReader<P, T>
where
P: Provider<T> + Clone + 'static,
T: Transport + Clone + Unpin,
{
fn repository(&self) -> &Repository<Self::Snapshot> {
&self.repository
}
}

#[async_trait]
impl<P, T> FromSnapshotWithParams for EvmEventReader<P, T>
where
P: Provider<T> + Clone + 'static,
T: Transport + Clone + Unpin,
{
type Params = EvmEventReaderParams<P, T>;
async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result<Self> {
let (shutdown_tx, shutdown_rx) = oneshot::channel();
Ok(Self {
contract_address: params.contract_address,
provider: Some(params.provider),
extractor: params.extractor,
shutdown_rx: Some(shutdown_rx),
shutdown_tx: Some(shutdown_tx),
start_block: params.start_block,
bus: params.bus,
state: snapshot,
repository: params.repository,
})
match self.state.try_mutate(|mut state| {
state.ids.insert(event_id);
state.last_block = wrapped.block;
Ok(state)
}) {
Ok(_) => (),
Err(err) => self.bus.err(EnclaveErrorType::Evm, err),
}
}
}
3 changes: 1 addition & 2 deletions packages/ciphernode/sortition/src/sortition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use crate::DistanceSortition;
use actix::prelude::*;
use alloy::primitives::Address;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use data::Repository;
use data::{AutoPersist, Persistable};
use data::{Checkpoint, FromSnapshotWithParams, RepositoriesFactory, Repository, Snapshot};
use enclave_core::{
get_tag, BusError, CiphernodeAdded, CiphernodeRemoved, EnclaveErrorType, EnclaveEvent,
EventBus, Seed, Subscribe,
Expand Down

0 comments on commit f781c8a

Please sign in to comment.