Skip to content

Commit

Permalink
Refactor to push persistence to EvmEventReader
Browse files Browse the repository at this point in the history
  • Loading branch information
ryardley committed Nov 5, 2024
1 parent d6df850 commit dc95319
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 280 deletions.
1 change: 1 addition & 0 deletions packages/ciphernode/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions packages/ciphernode/enclave_node/src/datastore.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
use std::path::PathBuf;

use actix::{Actor, Addr};
use anyhow::Result;
use config::AppConfig;
use data::{DataStore, InMemStore, SledStore};
use enclave_core::EventBus;
use router::{Repositories, RepositoriesFactory};

pub fn get_sled_store(bus: &Addr<EventBus>, db_file: &PathBuf) -> Result<DataStore> {
Ok((&SledStore::new(bus, db_file)?.start()).into())
}

pub fn get_in_mem_store() -> DataStore {
(&InMemStore::new(true).start()).into()
}

pub fn setup_datastore(config: &AppConfig, bus: &Addr<EventBus>) -> Result<DataStore> {
let store: DataStore = if !config.use_in_mem_store() {
(&SledStore::new(&bus, &config.db_file())?.start()).into()
get_sled_store(&bus, &config.db_file())?
} else {
(&InMemStore::new(true).start()).into()
get_in_mem_store()
};
Ok(store)
}
Expand Down
4 changes: 4 additions & 0 deletions packages/ciphernode/evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ tokio = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
zeroize = { workspace = true }

[dev-dependencies]
enclave_node = { path = "../enclave_node" }

111 changes: 11 additions & 100 deletions packages/ciphernode/evm/src/ciphernode_registry_sol.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
use std::collections::HashSet;

use crate::{
event_reader::{EnclaveEvmEvent, EventReader},
event_reader::EvmEventReaderState,
helpers::{ReadonlyProvider, WithChainId},
EvmEventReader,
};
use actix::{Actor, Addr, Handler};
use actix::{Actor, Addr};
use alloy::{
primitives::{LogData, B256},
sol,
sol_types::SolEvent,
};
use anyhow::Result;
use async_trait::async_trait;
use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot};
use enclave_core::{EnclaveEvent, EventBus, EventId, Subscribe};
use data::Repository;
use enclave_core::{EnclaveEvent, EventBus};
use tracing::{error, info, trace};

sol!(
Expand Down Expand Up @@ -100,62 +97,22 @@ pub fn extractor(data: &LogData, topic: Option<&B256>, _: u64) -> Option<Enclave
}

/// Connects to CiphernodeRegistry.sol converting EVM events to EnclaveEvents
pub struct CiphernodeRegistrySolReader {
bus: Addr<EventBus>,
state: CiphernodeRegistryReaderState,
repository: Repository<CiphernodeRegistryReaderState>,
}

pub struct CiphernodeRegistryReaderParams {
bus: Addr<EventBus>,
repository: Repository<CiphernodeRegistryReaderState>,
}

#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct CiphernodeRegistryReaderState {
pub ids: HashSet<EventId>,
pub last_block: Option<u64>,
}
pub struct CiphernodeRegistrySolReader;

impl CiphernodeRegistrySolReader {
pub fn new(params: CiphernodeRegistryReaderParams) -> Self {
Self {
bus: params.bus,
state: CiphernodeRegistryReaderState::default(),
repository: params.repository,
}
}

pub async fn load(params: CiphernodeRegistryReaderParams) -> Result<Self> {
Ok(if let Some(snapshot) = params.repository.read().await? {
Self::from_snapshot(params, snapshot).await?
} else {
Self::new(params)
})
}

pub async fn attach(
bus: &Addr<EventBus>,
provider: &WithChainId<ReadonlyProvider>,
contract_address: &str,
repository: &Repository<CiphernodeRegistryReaderState>,
) -> Result<Addr<Self>> {
let params = CiphernodeRegistryReaderParams {
bus: bus.clone(),
repository: repository.clone(),
};

let actor = Self::load(params).await?;
let last_block = actor.state.last_block;
let addr = actor.start();

EvmEventReader::attach(
&addr.clone().into(),
repository: &Repository<EvmEventReaderState>,
) -> Result<Addr<EvmEventReader<ReadonlyProvider>>> {
let addr = EvmEventReader::attach(
provider,
extractor,
contract_address,
last_block,
None,
&bus.clone().into(),
repository,
)
.await?;

Expand All @@ -169,60 +126,14 @@ impl Actor for CiphernodeRegistrySolReader {
type Context = actix::Context<Self>;
}

impl Handler<EnclaveEvmEvent> for CiphernodeRegistrySolReader {
type Result = ();
fn handle(&mut self, wrapped: EnclaveEvmEvent, _: &mut Self::Context) -> Self::Result {
let event_id = wrapped.event.get_id();
if self.state.ids.contains(&event_id) {
trace!(
"Event id {} has already been seen and was not forwarded to the bus",
&event_id
);
return;
}

// Forward everything else to the event bus
self.bus.do_send(wrapped.event);

// Save processed ids
self.state.ids.insert(event_id);
self.state.last_block = wrapped.block;
self.checkpoint();
}
}

impl Snapshot for CiphernodeRegistrySolReader {
type Snapshot = CiphernodeRegistryReaderState;
fn snapshot(&self) -> Self::Snapshot {
self.state.clone()
}
}

impl Checkpoint for CiphernodeRegistrySolReader {
fn repository(&self) -> &Repository<Self::Snapshot> {
&self.repository
}
}

#[async_trait]
impl FromSnapshotWithParams for CiphernodeRegistrySolReader {
type Params = CiphernodeRegistryReaderParams;
async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result<Self> {
Ok(Self {
bus: params.bus,
state: snapshot,
repository: params.repository,
})
}
}
/// Eventual wrapper for both a reader and a writer
pub struct CiphernodeRegistrySol;
impl CiphernodeRegistrySol {
pub async fn attach(
bus: &Addr<EventBus>,
provider: &WithChainId<ReadonlyProvider>,
contract_address: &str,
repository: &Repository<CiphernodeRegistryReaderState>,
repository: &Repository<EvmEventReaderState>,
) -> Result<()> {
CiphernodeRegistrySolReader::attach(bus, provider, contract_address, repository).await?;
// TODO: Writer if needed
Expand Down
7 changes: 2 additions & 5 deletions packages/ciphernode/evm/src/enclave_sol.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use crate::{
enclave_sol_reader::EnclaveSolReader,
enclave_sol_writer::EnclaveSolWriter,
helpers::{ReadonlyProvider, SignerProvider, WithChainId},
EnclaveSolReaderState,
enclave_sol_reader::EnclaveSolReader, enclave_sol_writer::EnclaveSolWriter, event_reader::EvmEventReaderState, helpers::{ReadonlyProvider, SignerProvider, WithChainId}
};
use actix::Addr;
use anyhow::Result;
Expand All @@ -16,7 +13,7 @@ impl EnclaveSol {
read_provider: &WithChainId<ReadonlyProvider>,
write_provider: &WithChainId<SignerProvider>,
contract_address: &str,
repository: &Repository<EnclaveSolReaderState>,
repository: &Repository<EvmEventReaderState>,
) -> Result<()> {
EnclaveSolReader::attach(bus, read_provider, contract_address, repository).await?;
EnclaveSolWriter::attach(bus, write_provider, contract_address).await?;
Expand Down
123 changes: 10 additions & 113 deletions packages/ciphernode/evm/src/enclave_sol_reader.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use std::collections::HashSet;

use crate::event_reader::{EnclaveEvmEvent, EventReader};
use crate::event_reader::EvmEventReaderState;
use crate::helpers::{ReadonlyProvider, WithChainId};
use crate::EvmEventReader;
use actix::{Actor, Addr, Handler};
use actix::Addr;
use alloy::primitives::{LogData, B256};
use alloy::transports::BoxTransport;
use alloy::{sol, sol_types::SolEvent};
use anyhow::Result;
use async_trait::async_trait;
use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot};
use enclave_core::{EnclaveEvent, EventBus, EventId};
use data::Repository;
use enclave_core::{EnclaveEvent, EventBus};
use tracing::{error, info, trace};

sol!(
Expand Down Expand Up @@ -83,72 +80,23 @@ pub fn extractor(data: &LogData, topic: Option<&B256>, chain_id: u64) -> Option<
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct EnclaveSolReaderState {
pub ids: HashSet<EventId>,
pub last_block: Option<u64>,
}

impl Default for EnclaveSolReaderState {
fn default() -> Self {
Self {
ids: HashSet::new(),
last_block: None,
}
}
}

/// Connects to Enclave.sol converting EVM events to EnclaveEvents
pub struct EnclaveSolReader {
bus: Addr<EventBus>,
state: EnclaveSolReaderState,
repository: Repository<EnclaveSolReaderState>,
}

pub struct EnclaveSolReaderParams {
bus: Addr<EventBus>,
repository: Repository<EnclaveSolReaderState>,
}
pub struct EnclaveSolReader;

impl EnclaveSolReader {
pub fn new(params: EnclaveSolReaderParams) -> Self {
Self {
bus: params.bus,
state: EnclaveSolReaderState::default(),
repository: params.repository,
}
}

pub async fn load(params: EnclaveSolReaderParams) -> Result<Self> {
Ok(if let Some(snapshot) = params.repository.read().await? {
Self::from_snapshot(params, snapshot).await?
} else {
Self::new(params)
})
}

pub async fn attach(
bus: &Addr<EventBus>,
provider: &WithChainId<ReadonlyProvider, BoxTransport>,
contract_address: &str,
repository: &Repository<EnclaveSolReaderState>,
) -> Result<Addr<Self>> {
let params = EnclaveSolReaderParams {
bus: bus.clone(),
repository: repository.clone(),
};

let actor = Self::load(params).await?;
let last_block = actor.state.last_block;
let addr = actor.start();

EvmEventReader::attach(
&addr.clone().recipient(),
repository: &Repository<EvmEventReaderState>,
) -> Result<Addr<EvmEventReader<ReadonlyProvider>>> {
let addr = EvmEventReader::attach(
provider,
extractor,
contract_address,
last_block,
None,
&bus.clone(),
repository,
)
.await?;

Expand All @@ -157,54 +105,3 @@ impl EnclaveSolReader {
Ok(addr)
}
}

impl Actor for EnclaveSolReader {
type Context = actix::Context<Self>;
}

impl Handler<EnclaveEvmEvent> for EnclaveSolReader {
type Result = ();
fn handle(&mut self, wrapped: EnclaveEvmEvent, _: &mut Self::Context) -> Self::Result {
let event_id = wrapped.event.get_id();
if self.state.ids.contains(&event_id) {
trace!(
"Event id {} has already been seen and was not forwarded to the bus",
&event_id
);
return;
}

// Forward everything else to the event bus
self.bus.do_send(wrapped.event);

// Save processed ids
self.state.ids.insert(event_id);
self.state.last_block = wrapped.block;
self.checkpoint();
}
}

impl Snapshot for EnclaveSolReader {
type Snapshot = EnclaveSolReaderState;
fn snapshot(&self) -> Self::Snapshot {
self.state.clone()
}
}

impl Checkpoint for EnclaveSolReader {
fn repository(&self) -> &Repository<Self::Snapshot> {
&self.repository
}
}

#[async_trait]
impl FromSnapshotWithParams for EnclaveSolReader {
type Params = EnclaveSolReaderParams;
async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result<Self> {
Ok(Self {
bus: params.bus,
state: snapshot,
repository: params.repository,
})
}
}
Loading

0 comments on commit dc95319

Please sign in to comment.