diff --git a/packages/ciphernode/Cargo.lock b/packages/ciphernode/Cargo.lock index 83bf4899..9f84d035 100644 --- a/packages/ciphernode/Cargo.lock +++ b/packages/ciphernode/Cargo.lock @@ -116,6 +116,7 @@ dependencies = [ "anyhow", "async-trait", "bincode", + "config", "data", "enclave-core", "fhe 0.1.0", @@ -1692,6 +1693,7 @@ dependencies = [ "alloy", "anyhow", "dirs", + "enclave-core", "figment", "serde", ] @@ -1891,6 +1893,7 @@ dependencies = [ "enclave-core", "serde", "sled", + "tokio", "tracing", ] @@ -2166,6 +2169,7 @@ dependencies = [ "dialoguer", "enclave-core", "enclave_node", + "evm", "hex", "once_cell", "router", @@ -2392,6 +2396,7 @@ dependencies = [ "anyhow", "async-trait", "bincode", + "config", "data", "enclave-core", "fhe 0.1.0-beta.7", @@ -3464,6 +3469,7 @@ dependencies = [ "anyhow", "async-trait", "cipher 0.1.0", + "config", "data", "enclave-core", "fhe 0.1.0", @@ -4121,9 +4127,13 @@ dependencies = [ "anyhow", "async-std", "async-trait", + "cipher 0.1.0", + "config", + "data", "enclave-core", "futures", "libp2p", + "serde", "tokio", "tracing", "tracing-subscriber", @@ -5222,6 +5232,7 @@ dependencies = [ "async-trait", "bincode", "cipher 0.1.0", + "config", "data", "enclave-core", "evm", @@ -5759,6 +5770,7 @@ dependencies = [ "alloy", "anyhow", "async-trait", + "config", "data", "enclave-core", "num", diff --git a/packages/ciphernode/Cargo.toml b/packages/ciphernode/Cargo.toml index fa1f8ffe..824c9de4 100644 --- a/packages/ciphernode/Cargo.toml +++ b/packages/ciphernode/Cargo.toml @@ -36,7 +36,10 @@ bincode = "1.3.3" bs58 = "0.5.1" base64 = "0.22.1" clap = { version = "4.5.17", features = ["derive"] } +config = { path = "./config" } +cipher = { path = "./cipher" } dirs = "5.0.1" +data = { path = "./data" } figment = { version = "0.10.19", features = ["yaml", "test"] } fhe_rs = { package = "fhe", git = "https://github.com/gnosisguild/fhe.rs", version = "0.1.0-beta.7" } fhe-traits = { git = "https://github.com/gnosisguild/fhe.rs", version = "0.1.0-beta.7" } diff --git a/packages/ciphernode/aggregator/Cargo.toml b/packages/ciphernode/aggregator/Cargo.toml index 495a26a1..54bcf4f9 100644 --- a/packages/ciphernode/aggregator/Cargo.toml +++ b/packages/ciphernode/aggregator/Cargo.toml @@ -8,6 +8,7 @@ actix = { workspace = true } anyhow = { workspace = true } serde = { workspace = true } bincode = { workspace = true } +config = { path = "../config" } async-trait = { workspace = true } enclave-core = { path = "../core" } fhe = { path = "../fhe" } diff --git a/packages/ciphernode/aggregator/src/lib.rs b/packages/ciphernode/aggregator/src/lib.rs index fef3eb91..6d905234 100644 --- a/packages/ciphernode/aggregator/src/lib.rs +++ b/packages/ciphernode/aggregator/src/lib.rs @@ -1,8 +1,11 @@ mod plaintext_aggregator; mod publickey_aggregator; +mod repositories; + pub use plaintext_aggregator::{ PlaintextAggregator, PlaintextAggregatorParams, PlaintextAggregatorState, }; pub use publickey_aggregator::{ PublicKeyAggregator, PublicKeyAggregatorParams, PublicKeyAggregatorState, }; +pub use repositories::*; diff --git a/packages/ciphernode/aggregator/src/plaintext_aggregator.rs b/packages/ciphernode/aggregator/src/plaintext_aggregator.rs index fc5bdf60..efad4d9c 100644 --- a/packages/ciphernode/aggregator/src/plaintext_aggregator.rs +++ b/packages/ciphernode/aggregator/src/plaintext_aggregator.rs @@ -1,7 +1,6 @@ use actix::prelude::*; use anyhow::Result; -use async_trait::async_trait; -use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use data::Persistable; use enclave_core::{ DecryptionshareCreated, Die, E3id, EnclaveEvent, EventBus, OrderedSet, PlaintextAggregated, Seed, @@ -50,28 +49,28 @@ struct ComputeAggregate { pub struct PlaintextAggregator { fhe: Arc, bus: Addr, - store: Repository, sortition: Addr, e3_id: E3id, - state: PlaintextAggregatorState, + state: Persistable, src_chain_id: u64, } pub struct PlaintextAggregatorParams { pub fhe: Arc, pub bus: Addr, - pub store: Repository, pub sortition: Addr, pub e3_id: E3id, pub src_chain_id: u64, } impl PlaintextAggregator { - pub fn new(params: PlaintextAggregatorParams, state: PlaintextAggregatorState) -> Self { + pub fn new( + params: PlaintextAggregatorParams, + state: Persistable, + ) -> Self { PlaintextAggregator { fhe: params.fhe, bus: params.bus, - store: params.store, sortition: params.sortition, e3_id: params.e3_id, src_chain_id: params.src_chain_id, @@ -79,36 +78,41 @@ impl PlaintextAggregator { } } - pub fn add_share(&mut self, share: Vec) -> Result { - let PlaintextAggregatorState::Collecting { - threshold_m, - shares, - ciphertext_output, - .. - } = &mut self.state - else { - return Err(anyhow::anyhow!("Can only add share in Collecting state")); - }; - - shares.insert(share); - if shares.len() == *threshold_m { - return Ok(PlaintextAggregatorState::Computing { - shares: shares.clone(), - ciphertext_output: ciphertext_output.to_vec(), - }); - } - - Ok(self.state.clone()) + pub fn add_share(&mut self, share: Vec) -> Result<()> { + self.state.try_mutate(|mut state| { + let PlaintextAggregatorState::Collecting { + threshold_m, + shares, + ciphertext_output, + .. + } = &mut state + else { + return Err(anyhow::anyhow!("Can only add share in Collecting state")); + }; + + shares.insert(share); + + if shares.len() == *threshold_m { + return Ok(PlaintextAggregatorState::Computing { + shares: shares.clone(), + ciphertext_output: ciphertext_output.to_vec(), + }); + } + + Ok(state) + }) } - pub fn set_decryption(&mut self, decrypted: Vec) -> Result { - let PlaintextAggregatorState::Computing { shares, .. } = &mut self.state else { - return Ok(self.state.clone()); - }; + pub fn set_decryption(&mut self, decrypted: Vec) -> Result<()> { + self.state.try_mutate(|mut state| { + let PlaintextAggregatorState::Computing { shares, .. } = &mut state else { + return Ok(state); + }; - let shares = shares.to_owned(); + let shares = shares.to_owned(); - Ok(PlaintextAggregatorState::Complete { decrypted, shares }) + Ok(PlaintextAggregatorState::Complete { decrypted, shares }) + }) } } @@ -131,9 +135,9 @@ impl Handler for PlaintextAggregator { type Result = ResponseActFuture>; fn handle(&mut self, event: DecryptionshareCreated, _: &mut Self::Context) -> Self::Result { - let PlaintextAggregatorState::Collecting { + let Some(PlaintextAggregatorState::Collecting { threshold_m, seed, .. - } = self.state + }) = self.state.get() else { error!(state=?self.state, "Aggregator has been closed for collecting."); return Box::pin(fut::ready(Ok(()))); @@ -165,14 +169,13 @@ impl Handler for PlaintextAggregator { } // add the keyshare and - act.state = act.add_share(decryption_share)?; - act.checkpoint(); + act.add_share(decryption_share)?; // Check the state and if it has changed to the computing - if let PlaintextAggregatorState::Computing { + if let Some(PlaintextAggregatorState::Computing { shares, ciphertext_output, - } = &act.state + }) = &act.state.get() { ctx.notify(ComputeAggregate { shares: shares.clone(), @@ -195,8 +198,7 @@ impl Handler for PlaintextAggregator { })?; // Update the local state - self.state = self.set_decryption(decrypted_output.clone())?; - self.checkpoint(); + self.set_decryption(decrypted_output.clone())?; // Dispatch the PlaintextAggregated event let event = EnclaveEvent::from(PlaintextAggregated { @@ -217,26 +219,3 @@ impl Handler for PlaintextAggregator { ctx.stop() } } - -impl Snapshot for PlaintextAggregator { - type Snapshot = PlaintextAggregatorState; - - fn snapshot(&self) -> Self::Snapshot { - self.state.clone() - } -} - -#[async_trait] -impl FromSnapshotWithParams for PlaintextAggregator { - type Params = PlaintextAggregatorParams; - - async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result { - Ok(PlaintextAggregator::new(params, snapshot)) - } -} - -impl Checkpoint for PlaintextAggregator { - fn repository(&self) -> &Repository { - &self.store - } -} diff --git a/packages/ciphernode/aggregator/src/publickey_aggregator.rs b/packages/ciphernode/aggregator/src/publickey_aggregator.rs index e5e3b640..f64d148f 100644 --- a/packages/ciphernode/aggregator/src/publickey_aggregator.rs +++ b/packages/ciphernode/aggregator/src/publickey_aggregator.rs @@ -1,7 +1,6 @@ use actix::prelude::*; use anyhow::Result; -use async_trait::async_trait; -use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use data::Persistable; use enclave_core::{ Die, E3id, EnclaveEvent, EventBus, KeyshareCreated, OrderedSet, PublicKeyAggregated, Seed, }; @@ -53,17 +52,15 @@ struct NotifyNetwork { pub struct PublicKeyAggregator { fhe: Arc, bus: Addr, - store: Repository, sortition: Addr, e3_id: E3id, - state: PublicKeyAggregatorState, + state: Persistable, src_chain_id: u64, } pub struct PublicKeyAggregatorParams { pub fhe: Arc, pub bus: Addr, - pub store: Repository, pub sortition: Addr, pub e3_id: E3id, pub src_chain_id: u64, @@ -76,11 +73,13 @@ pub struct PublicKeyAggregatorParams { /// It is expected to change this mechanism as we work through adversarial scenarios and write tests /// for them. impl PublicKeyAggregator { - pub fn new(params: PublicKeyAggregatorParams, state: PublicKeyAggregatorState) -> Self { + pub fn new( + params: PublicKeyAggregatorParams, + state: Persistable, + ) -> Self { PublicKeyAggregator { fhe: params.fhe, bus: params.bus, - store: params.store, sortition: params.sortition, e3_id: params.e3_id, src_chain_id: params.src_chain_id, @@ -88,35 +87,39 @@ impl PublicKeyAggregator { } } - pub fn add_keyshare(&mut self, keyshare: Vec) -> Result { - let PublicKeyAggregatorState::Collecting { - threshold_m, - keyshares, - .. - } = &mut self.state - else { - return Err(anyhow::anyhow!("Can only add keyshare in Collecting state")); - }; - keyshares.insert(keyshare); - if keyshares.len() == *threshold_m { - return Ok(PublicKeyAggregatorState::Computing { - keyshares: keyshares.clone(), - }); - } - - Ok(self.state.clone()) + pub fn add_keyshare(&mut self, keyshare: Vec) -> Result<()> { + self.state.try_mutate(|mut state| { + let PublicKeyAggregatorState::Collecting { + threshold_m, + keyshares, + .. + } = &mut state + else { + return Err(anyhow::anyhow!("Can only add keyshare in Collecting state")); + }; + keyshares.insert(keyshare); + if keyshares.len() == *threshold_m { + return Ok(PublicKeyAggregatorState::Computing { + keyshares: keyshares.clone(), + }); + } + + Ok(state) + }) } - pub fn set_pubkey(&mut self, pubkey: Vec) -> Result { - let PublicKeyAggregatorState::Computing { keyshares } = &mut self.state else { - return Ok(self.state.clone()); - }; + pub fn set_pubkey(&mut self, pubkey: Vec) -> Result<()> { + self.state.try_mutate(|mut state| { + let PublicKeyAggregatorState::Computing { keyshares } = &mut state else { + return Ok(state); + }; - let keyshares = keyshares.to_owned(); + let keyshares = keyshares.to_owned(); - Ok(PublicKeyAggregatorState::Complete { - public_key: pubkey, - keyshares, + Ok(PublicKeyAggregatorState::Complete { + public_key: pubkey, + keyshares, + }) }) } } @@ -140,9 +143,9 @@ impl Handler for PublicKeyAggregator { type Result = ResponseActFuture>; fn handle(&mut self, event: KeyshareCreated, _: &mut Self::Context) -> Self::Result { - let PublicKeyAggregatorState::Collecting { + let Some(PublicKeyAggregatorState::Collecting { threshold_m, seed, .. - } = self.state.clone() + }) = self.state.get() else { error!(state=?self.state, "Aggregator has been closed for collecting keyshares."); return Box::pin(fut::ready(Ok(()))); @@ -176,11 +179,12 @@ impl Handler for PublicKeyAggregator { } // add the keyshare and - act.state = act.add_keyshare(pubkey)?; - act.checkpoint(); + act.add_keyshare(pubkey)?; // Check the state and if it has changed to the computing - if let PublicKeyAggregatorState::Computing { keyshares } = &act.state { + if let Some(PublicKeyAggregatorState::Computing { keyshares }) = + &act.state.get() + { ctx.notify(ComputeAggregate { keyshares: keyshares.clone(), e3_id, @@ -202,8 +206,7 @@ impl Handler for PublicKeyAggregator { })?; // Update the local state - self.state = self.set_pubkey(pubkey.clone())?; - self.checkpoint(); + self.set_pubkey(pubkey.clone())?; ctx.notify(NotifyNetwork { pubkey, @@ -242,26 +245,3 @@ impl Handler for PublicKeyAggregator { ctx.stop() } } - -impl Snapshot for PublicKeyAggregator { - type Snapshot = PublicKeyAggregatorState; - - fn snapshot(&self) -> Self::Snapshot { - self.state.clone() - } -} - -#[async_trait] -impl FromSnapshotWithParams for PublicKeyAggregator { - type Params = PublicKeyAggregatorParams; - - async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result { - Ok(PublicKeyAggregator::new(params, snapshot)) - } -} - -impl Checkpoint for PublicKeyAggregator { - fn repository(&self) -> &Repository { - &self.store - } -} diff --git a/packages/ciphernode/aggregator/src/repositories.rs b/packages/ciphernode/aggregator/src/repositories.rs new file mode 100644 index 00000000..a010b142 --- /dev/null +++ b/packages/ciphernode/aggregator/src/repositories.rs @@ -0,0 +1,25 @@ +use config::StoreKeys; +use data::{Repositories, Repository}; +use enclave_core::E3id; + +use crate::{PlaintextAggregatorState, PublicKeyAggregatorState}; + +pub trait PlaintextRepositoryFactory { + fn plaintext(&self, e3_id: &E3id) -> Repository; +} + +impl PlaintextRepositoryFactory for Repositories { + fn plaintext(&self, e3_id: &E3id) -> Repository { + Repository::new(self.store.scope(StoreKeys::plaintext(e3_id))) + } +} + +pub trait PublicKeyRepositoryFactory { + fn publickey(&self, e3_id: &E3id) -> Repository; +} + +impl PublicKeyRepositoryFactory for Repositories { + fn publickey(&self, e3_id: &E3id) -> Repository { + Repository::new(self.store.scope(StoreKeys::publickey(e3_id))) + } +} diff --git a/packages/ciphernode/config/Cargo.toml b/packages/ciphernode/config/Cargo.toml index a694ceb5..19589786 100644 --- a/packages/ciphernode/config/Cargo.toml +++ b/packages/ciphernode/config/Cargo.toml @@ -9,3 +9,4 @@ anyhow = { workspace = true } serde = { workspace = true } figment = { workspace = true } alloy = { workspace = true } +enclave-core = { path = "../core" } diff --git a/packages/ciphernode/config/src/lib.rs b/packages/ciphernode/config/src/lib.rs index ba182a1b..f6f3380c 100644 --- a/packages/ciphernode/config/src/lib.rs +++ b/packages/ciphernode/config/src/lib.rs @@ -1,2 +1,4 @@ mod app_config; +mod store_keys; pub use app_config::*; +pub use store_keys::*; diff --git a/packages/ciphernode/config/src/store_keys.rs b/packages/ciphernode/config/src/store_keys.rs new file mode 100644 index 00000000..d0f9bee9 --- /dev/null +++ b/packages/ciphernode/config/src/store_keys.rs @@ -0,0 +1,53 @@ +use enclave_core::E3id; + +pub struct StoreKeys; + +impl StoreKeys { + pub fn keyshare(e3_id: &E3id) -> String { + format!("//keyshare/{e3_id}") + } + + pub fn plaintext(e3_id: &E3id) -> String { + format!("//plaintext/{e3_id}") + } + + pub fn publickey(e3_id: &E3id) -> String { + format!("//publickey/{e3_id}") + } + + pub fn fhe(e3_id: &E3id) -> String { + format!("//fhe/{e3_id}") + } + + pub fn meta(e3_id: &E3id) -> String { + format!("//meta/{e3_id}") + } + + pub fn context(e3_id: &E3id) -> String { + format!("//context/{e3_id}") + } + + pub fn router() -> String { + String::from("//router") + } + + pub fn sortition() -> String { + String::from("//sortition") + } + + pub fn eth_private_key() -> String { + String::from("//eth_private_key") + } + + pub fn libp2p_key() -> String { + String::from("//libp2p_key") + } + + pub fn enclave_sol_reader(chain_id: u64) -> String { + format!("//evm_readers/enclave/{chain_id}") + } + + pub fn ciphernode_registry_reader(chain_id: u64) -> String { + format!("//evm_readers/ciphernode_registry/{chain_id}") + } +} diff --git a/packages/ciphernode/core/src/events.rs b/packages/ciphernode/core/src/events.rs index 276cd80c..a5fa4984 100644 --- a/packages/ciphernode/core/src/events.rs +++ b/packages/ciphernode/core/src/events.rs @@ -562,7 +562,7 @@ impl Display for TestEvent { } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Seed(pub [u8; 32]); impl From for u64 { fn from(value: Seed) -> Self { diff --git a/packages/ciphernode/core/src/ordered_set.rs b/packages/ciphernode/core/src/ordered_set.rs index 0d0543e2..221b4ce2 100644 --- a/packages/ciphernode/core/src/ordered_set.rs +++ b/packages/ciphernode/core/src/ordered_set.rs @@ -3,7 +3,7 @@ use std::collections::BTreeSet; use std::fmt; use std::hash::{Hash, Hasher}; -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Default, Serialize, Deserialize)] pub struct OrderedSet(BTreeSet); impl OrderedSet { diff --git a/packages/ciphernode/data/Cargo.toml b/packages/ciphernode/data/Cargo.toml index eb9990ef..ee67d65c 100644 --- a/packages/ciphernode/data/Cargo.toml +++ b/packages/ciphernode/data/Cargo.toml @@ -10,6 +10,7 @@ actix = { workspace = true } enclave-core = { path = "../core" } anyhow = { workspace = true } serde = { workspace = true } +tokio = { workspace = true } sled = { workspace = true } bincode = { workspace = true } tracing = { workspace = true } diff --git a/packages/ciphernode/data/README.md b/packages/ciphernode/data/README.md new file mode 100644 index 00000000..d70dd33a --- /dev/null +++ b/packages/ciphernode/data/README.md @@ -0,0 +1,140 @@ +# On Persistence patterns + +_The way persistence is managed within this codebase has a few elements to it. So here is the story as to how this works and why it has been done like this_ + +Persistence within an Actor Model tends to be based around the idea that actors need to be able to have their state persistable and hydratable upon restart. This enables in an ideal scenario any actor to be able to just crash on error and restart as required. + +We started persistence by creating an Actor that wraps the database which is good practice within an Actor Model. This has advantages because we can interleave database writes to become a stream of events enabling high throughput. We can create delivery guarantees by storing events in a persistent queue at a later point if need be. + +```mermaid +graph LR + DB[(SledDB)] + Client --insert--> SledStore + SledStore --insert--> DB + SledStore -.retry.-> SledStore +``` + +## DataStore + +Next we needed a way to polymorphically pick between a real database and an in memory database for testing - to do this we utilize Actix's `Recipient` trait which means we can accept any actor that is happy to receive an `Insert` or a `Get` message. This means we can create a Key Value Store struct and pass in either a `SledStore` or an `InMemStore` Actor to the `DataStore` actor to accomplish this. + +```rust +let store = DataStore::from(SledStore::from(SledDb::new())); +``` + +or for testing: + +```rust +let store = DataStore::from(InMemStore::new()); +``` + +```mermaid +graph LR + DB[(SledDB)] + Client --> DataStore + DataStore -.-> SledStore + DataStore -.-> InMemStore + InMemStore -.-> BTreeMap + SledStore --> DB +``` + +The `DataStore` actor also has some convenience methods within it where it is possible to scope the keys so that you can consider the information you are storing as more of a tree structure as opposed to a flat list. + +```rust +let store = DataStore::from(&addr); +let scoped = store.scope("//foo/bar/baz"); +scoped.write(some_data); +``` + +## Repository + +There was an attempt to use the `DataStore` throughout the app but it became apparent this was causing the knowledge of where and how the data was saved to be spread throughout the codebase. What we needed was for the components not to really care how their data was saved but for us to be able to easily have a sense of the different keys under which data was being saved in a centralized place. + +Also `DataStore` can take any type of serializable data to save at a key location but this means the data in the DataStore was effectively untyped. + +TO solve this it made sense to create a typed `Repository` interface to encapsulate saving of data from within an actor or routine and in theory the repository could use whatever underlying mechanism requires to save the data. This could even be a SQL DB or the filesystem if required. Whatever it's type T the Repository knows how to save it. + +The tradeoff is we get a slightly deeper stack but each layer adds a responsibility to the data saving stack: + +```mermaid +graph LR + R["Repository<T>"] + DB[(SledDB)] + Client --"write()"--> R + R --> D[DataStore] + D -.-> SledStore + D -.-> InMemStore + InMemStore -.-> BTreeMap + SledStore --> DB +``` + + +| Layer | Functionality | +| ------------------- | ------------------------------------------------------------------------------------------------------------------------------- | +| `Repository` | Strongly typed Data persistence for a single item. Configured to know how to save its data. | +| `DataStore` | Flexible KV store. Client can scope to specific namespace. Can be backed by polymorphic data actor to handle testing scenarios. | +| `{InMem,Sled}Store` | Actor to receive `Insert` and `Get` requests can only save raw bytes. | + +## Snapshotting + +We had a way to save bytes data with the `DataStore` and had a way to specify where that could be saved but actors need to be restartable and be able to be hydrated and we needed a standard way to accomplish this. To do this in typical Rust fashion we created a set of traits: + +- [`Snapshot`](https://github.com/gnosisguild/enclave/blob/main/packages/ciphernode/data/src/snapshot.rs) for defining how an object can create a snapshot of it's state +- [`Checkpoint`](https://github.com/gnosisguild/enclave/blob/main/packages/ciphernode/data/src/snapshot.rs) for defining how to save that snapshot to a repository +- [`FromSnapshot`](https://github.com/gnosisguild/enclave/blob/main/packages/ciphernode/data/src/snapshot.rs) and [`FromSnapshotWithParams`](https://github.com/gnosisguild/enclave/blob/main/packages/ciphernode/data/src/snapshot.rs) for defining how an object could be reconstituted from a snapshot + +This worked well especially for objects who's persistable state needs to be derived from a subset of the saved state however there are a couple of problems: + +- `self.checkpoint()` needs to be called everytime you want to save the state +- Using these traits is very verbose and repeditive - especially for situations where the state was just a field on the actor which it often is. +- These traits mean you need to mix some persistence API within your business logic API unless you create a separate struct just for persistence. + +## Enter Persistable + +Persistable is a struct that connects a repository and some in memory state and ensures that every time the in memory state is mutated that the state is saved to the repository. + +This has several benefits: + +- Less verbose +- Centralized batching point for logical operations +- Can remove complex "snapshot" traits +- Simpler initialization +- No need to consider the underlying data saving mechanism - logic can be [persistence ignorant](https://www.linkedin.com/pulse/persistence-ignorance-domain-driven-design-ilkay-polat-atmae). + +```rust + +// Some how we get a repository for a type +let repo:Repository> = get_repo(); + +// We can use the load to create a persistable object from the contents of the persistance layer that the repository encapsulates +let persistable:Persistable> = repo.load().await?; + +// If we add a name to the list the list is automatically synced to the database +persistable.try_mutate(|mut list| { + list.push("Fred"); + Ok(list) +})?; + +// We can set new state +persistable.set(vec![String::from("Hello")]); + +// We can try and get the data if it is set on the object +if persistable.try_get()?.len() > 0 { + println!("Repo has names!") +} + +// We an clear the object which will clear the repo +persistable.clear(); + +assert_eq!(persistable.get(), None); +``` + +To use it we can just have it as a field on a struct or actor: + +```rust +struct MyActor { + state: Persistable> +} +``` + +We have also extracted the key calculation mechanism to a [`StoreKeys`](https://github.com/gnosisguild/enclave/blob/main/packages/ciphernode/config/src/store_keys.rs) struct. This is used in various places when creating repsitory factories for example [here](https://github.com/gnosisguild/enclave/blob/main/packages/ciphernode/aggregator/src/repositories.rs) diff --git a/packages/ciphernode/data/src/data_store.rs b/packages/ciphernode/data/src/data_store.rs index e81fc478..f2ae586e 100644 --- a/packages/ciphernode/data/src/data_store.rs +++ b/packages/ciphernode/data/src/data_store.rs @@ -36,7 +36,8 @@ impl Get { } } -/// Generate proxy for the DB +/// Generate proxy for the DB / KV store +/// DataStore is scopable #[derive(Clone, Debug)] pub struct DataStore { scope: Vec, @@ -54,6 +55,11 @@ impl DataStore { return Ok(None); }; + // If we get a null value return None as this doesn't deserialize correctly + if bytes == [0] { + return Ok(None); + } + Ok(Some(bincode::deserialize(&bytes)?)) } diff --git a/packages/ciphernode/data/src/lib.rs b/packages/ciphernode/data/src/lib.rs index b3f1ea93..48ed8475 100644 --- a/packages/ciphernode/data/src/lib.rs +++ b/packages/ciphernode/data/src/lib.rs @@ -1,6 +1,8 @@ mod data_store; mod in_mem; mod into_key; +mod persistable; +mod repositories; mod repository; mod sled_store; mod snapshot; @@ -8,6 +10,8 @@ mod snapshot; pub use data_store::*; pub use in_mem::*; pub use into_key::IntoKey; +pub use persistable::*; +pub use repositories::*; pub use repository::*; pub use sled_store::*; pub use snapshot::*; diff --git a/packages/ciphernode/data/src/persistable.rs b/packages/ciphernode/data/src/persistable.rs new file mode 100644 index 00000000..68cfe729 --- /dev/null +++ b/packages/ciphernode/data/src/persistable.rs @@ -0,0 +1,431 @@ +use crate::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use anyhow::*; +use async_trait::async_trait; +use serde::{de::DeserializeOwned, Serialize}; + +pub trait PersistableData: Serialize + DeserializeOwned + Clone + Send + Sync + 'static {} +impl PersistableData for T where T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static {} + +/// AutoPersist enables a repository to generate a persistable container +#[async_trait] +pub trait AutoPersist +where + T: PersistableData, +{ + /// Load the data from the repository into an auto persist container + async fn load(&self) -> Result>; + /// Create a new auto persist container and set some data on it to send back to the repository + fn send(&self, data: Option) -> Persistable; + /// Load the data from the repository into an auto persist container. If there is no persisted data then persist the given default data + async fn load_or_default(&self, default: T) -> Result>; + /// Load the data from the repository into an auto persist container. If there is no persisted data then persist the given default data + async fn load_or_else(&self, f: F) -> Result> + where + F: Send + FnOnce() -> Result; +} + +#[async_trait] +impl AutoPersist for Repository +where + T: PersistableData, +{ + /// Load the data from the repository into an auto persist container + async fn load(&self) -> Result> { + Ok(Persistable::load(self).await?) + } + + /// Create a new auto persist container and set some data on it to send back to the repository + fn send(&self, data: Option) -> Persistable { + Persistable::new(data, self).save() + } + + /// Load the data from the repository into an auto persist container. If there is no persisted data then persist the given default data + async fn load_or_default(&self, default: T) -> Result> { + Ok(Persistable::load_or_default(self, default).await?) + } + + /// Load the data from the repository into an auto persist container. If there is no persisted data then persist the result of the callback + async fn load_or_else(&self, f: F) -> Result> + where + F: Send + FnOnce() -> Result, + { + Ok(Persistable::load_or_else(self, f).await?) + } +} + +/// A container that automatically persists it's content every time it is mutated or changed. +#[derive(Debug)] +pub struct Persistable { + data: Option, + repo: Repository, +} + +impl Persistable +where + T: PersistableData, +{ + /// Create a new container with the given option data and repository + pub fn new(data: Option, repo: &Repository) -> Self { + Self { + data, + repo: repo.clone(), + } + } + + /// Load data from the repository to the container + pub async fn load(repo: &Repository) -> Result { + let data = repo.read().await?; + + Ok(Self::new(data, repo)) + } + + /// Load the data from the repo or save and sync the given default value + pub async fn load_or_default(repo: &Repository, default: T) -> Result { + let instance = Self::new(Some(repo.read().await?.unwrap_or(default)), repo); + + Ok(instance.save()) + } + + /// Load the data from the repo or save and sync the result of the given callback + pub async fn load_or_else(repo: &Repository, f: F) -> Result + where + F: FnOnce() -> Result, + { + let data = repo + .read() + .await? + .ok_or_else(|| anyhow!("Not found")) + .or_else(|_| f())?; + + let instance = Self::new(Some(data), repo); + Ok(instance.save()) + } + + /// Save the data in the container to the database + pub fn save(self) -> Self { + self.checkpoint(); + self + } + + /// Mutate the content if it is available or return an error if either the mutator function + /// fails or if the data has not been set. + pub fn try_mutate(&mut self, mutator: F) -> Result<()> + where + F: FnOnce(T) -> Result, + { + let content = self.data.clone().ok_or(anyhow!("Data has not been set"))?; + self.data = Some(mutator(content)?); + self.checkpoint(); + Ok(()) + } + + /// Set the data on both the persistable and the repository. + pub fn set(&mut self, data: T) { + self.data = Some(data); + self.checkpoint(); + } + + /// Clear the data from both the persistable and the repository. + pub fn clear(&mut self) { + self.data = None; + self.clear_checkpoint(); + } + + /// Get the data currently stored on the container as an Option + pub fn get(&self) -> Option { + self.data.clone() + } + + /// Get the data from the container or return an error. + pub fn try_get(&self) -> Result { + self.data + .clone() + .ok_or(anyhow!("Data was not set on container.")) + } + + /// Returns true if there is data on the container and false if there is not. + pub fn has(&self) -> bool { + self.data.is_some() + } + + /// Get an immutable reference to the data on the container if the data is not set on the + /// container return an error + pub fn try_with(&self, f: F) -> Result + where + F: FnOnce(&T) -> Result, + { + match &self.data { + Some(data) => f(data), + None => Err(anyhow!("Data was not set on container.")), + } + } +} + +impl Snapshot for Persistable +where + T: PersistableData, +{ + type Snapshot = T; + fn snapshot(&self) -> Result { + Ok(self + .data + .clone() + .ok_or(anyhow!("No data stored on container"))?) + } +} + +impl Checkpoint for Persistable +where + T: PersistableData, +{ + fn repository(&self) -> &Repository { + &self.repo + } +} + +#[async_trait] +impl FromSnapshotWithParams for Persistable +where + T: PersistableData, +{ + type Params = Repository; + async fn from_snapshot(params: Repository, snapshot: T) -> Result { + Ok(Persistable::new(Some(snapshot), ¶ms)) + } +} + +#[cfg(test)] +mod tests { + use crate::{AutoPersist, DataStore, GetLog, InMemStore, Repository}; + use actix::{Actor, Addr}; + use anyhow::{anyhow, Result}; + + fn get_repo() -> (Repository, Addr) { + let addr = InMemStore::new(true).start(); + let store = DataStore::from(&addr).scope("/"); + let repo: Repository = Repository::new(store); + (repo, addr) + } + + #[actix::test] + async fn persistable_loads_with_default() -> Result<()> { + let (repo, addr) = get_repo::>(); + let container = repo + .clone() + .load_or_default(vec!["berlin".to_string()]) + .await?; + + assert_eq!(addr.send(GetLog).await?.len(), 1); + assert_eq!(repo.read().await?, Some(vec!["berlin".to_string()])); + assert_eq!(container.get(), Some(vec!["berlin".to_string()])); + Ok(()) + } + + #[actix::test] + async fn persistable_loads_with_default_override() -> Result<()> { + let (repo, _) = get_repo::>(); + repo.write(&vec!["berlin".to_string()]); + let container = repo + .clone() + .load_or_default(vec!["amsterdam".to_string()]) + .await?; + + assert_eq!(repo.read().await?, Some(vec!["berlin".to_string()])); + assert_eq!(container.get(), Some(vec!["berlin".to_string()])); + Ok(()) + } + + #[actix::test] + async fn persistable_load() -> Result<()> { + let (repo, _) = get_repo::>(); + repo.write(&vec!["berlin".to_string()]); + let container = repo.clone().load().await?; + + assert_eq!(repo.read().await?, Some(vec!["berlin".to_string()])); + assert_eq!(container.get(), Some(vec!["berlin".to_string()])); + Ok(()) + } + + #[actix::test] + async fn persistable_send() -> Result<()> { + let (repo, _) = get_repo::>(); + repo.write(&vec!["amsterdam".to_string()]); + let container = repo.clone().send(Some(vec!["berlin".to_string()])); + + assert_eq!(repo.read().await?, Some(vec!["berlin".to_string()])); + assert_eq!(container.get(), Some(vec!["berlin".to_string()])); + Ok(()) + } + + #[actix::test] + async fn persistable_mutate() -> Result<()> { + let (repo, addr) = get_repo::>(); + + let mut container = repo.clone().send(Some(vec!["berlin".to_string()])); + + container.try_mutate(|mut list| { + list.push(String::from("amsterdam")); + Ok(list) + })?; + + assert_eq!( + repo.read().await?, + Some(vec!["berlin".to_string(), "amsterdam".to_string()]) + ); + + assert_eq!(addr.send(GetLog).await?.len(), 2); + + Ok(()) + } + + #[actix::test] + async fn test_clear_persistable() -> Result<()> { + let (repo, _) = get_repo::>(); + let repo_ref = &repo; + let mut container = repo_ref.send(Some(vec!["berlin".to_string()])); + + assert!(container.has()); + container.clear(); + assert!(!container.has()); + assert_eq!(repo_ref.read().await?, None); + Ok(()) + } + + #[actix::test] + async fn test_set_persistable() -> Result<()> { + let (repo, _) = get_repo::>(); + let mut container = repo.clone().send(None); + + container.set(vec!["amsterdam".to_string()]); + + assert!(container.has()); + assert_eq!(repo.read().await?, Some(vec!["amsterdam".to_string()])); + Ok(()) + } + + #[actix::test] + async fn test_try_get_with_data() -> Result<()> { + let (repo, _) = get_repo::>(); + let container = repo.clone().send(Some(vec!["berlin".to_string()])); + + let result = container.try_get()?; + assert_eq!(result, vec!["berlin".to_string()]); + Ok(()) + } + + #[actix::test] + async fn test_try_get_without_data() { + let (repo, _) = get_repo::>(); + let container = repo.clone().send(None); + + assert!(container.try_get().is_err()); + } + + #[actix::test] + async fn test_try_with_success() -> Result<()> { + let (repo, _) = get_repo::>(); + let container = repo.clone().send(Some(vec!["berlin".to_string()])); + + let length = container.try_with(|data| Ok(data.len()))?; + assert_eq!(length, 1); + Ok(()) + } + + #[actix::test] + async fn test_try_with_failure() { + let (repo, _) = get_repo::>(); + let container = repo.clone().send(None); + + let result = container.try_with(|data| Ok(data.len())); + assert!(result.is_err()); + } + + #[actix::test] + async fn test_try_mutate_failure() { + let (repo, _) = get_repo::>(); + let mut container = repo.clone().send(None); + + let result = container.try_mutate(|mut list| { + list.push(String::from("amsterdam")); + Ok(list) + }); + assert!(result.is_err()); + } + + #[actix::test] + async fn test_mutate_with_error() -> Result<()> { + let (repo, _) = get_repo::>(); + let mut container = repo.clone().send(Some(vec!["berlin".to_string()])); + + let result = + container.try_mutate(|_| -> Result> { Err(anyhow!("Mutation failed")) }); + + assert!(result.is_err()); + // Original data should remain unchanged + assert_eq!(container.try_get()?, vec!["berlin".to_string()]); + Ok(()) + } + + #[actix::test] + async fn test_load_or_else_success_with_empty_repo() -> Result<()> { + let (repo, _) = get_repo::>(); + + let container = repo + .clone() + .load_or_else(|| Ok(vec!["amsterdam".to_string()])) + .await?; + + assert_eq!(container.try_get()?, vec!["amsterdam".to_string()]); + assert_eq!(repo.read().await?, Some(vec!["amsterdam".to_string()])); + Ok(()) + } + + #[actix::test] + async fn test_load_or_else_skips_callback_when_data_exists() -> Result<()> { + let (repo, _) = get_repo::>(); + repo.write(&vec!["berlin".to_string()]); + + let container = repo + .clone() + .load_or_else(|| { + panic!("This callback should not be called!"); + #[allow(unreachable_code)] + Ok(vec!["amsterdam".to_string()]) + }) + .await?; + + assert_eq!(container.try_get()?, vec!["berlin".to_string()]); + Ok(()) + } + + #[actix::test] + async fn test_load_or_else_propagates_callback_error() -> Result<()> { + let (repo, _) = get_repo::>(); + + let result = repo + .clone() + .load_or_else(|| Err(anyhow!("Failed to create default data"))) + .await; + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Failed to create default data")); + assert_eq!(repo.read().await?, None); + Ok(()) + } + + #[actix::test] + async fn test_load_or_else_custom_error_message() -> Result<()> { + let (repo, _) = get_repo::>(); + let error_msg = "Custom initialization error"; + + let result = repo.load_or_else(|| Err(anyhow!(error_msg))).await; + + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains(error_msg)); + Ok(()) + } + +} diff --git a/packages/ciphernode/data/src/repositories.rs b/packages/ciphernode/data/src/repositories.rs new file mode 100644 index 00000000..be9aad25 --- /dev/null +++ b/packages/ciphernode/data/src/repositories.rs @@ -0,0 +1,48 @@ +use crate::{DataStore, Repository}; + +pub struct Repositories { + pub store: DataStore, +} + +impl From for Repositories { + fn from(value: DataStore) -> Self { + Repositories { store: value } + } +} +impl From<&DataStore> for Repositories { + fn from(value: &DataStore) -> Self { + Repositories { + store: value.clone(), + } + } +} + +impl Repositories { + pub fn new(store: DataStore) -> Self { + Repositories { store } + } +} + +impl From> for Repositories { + fn from(value: Repository) -> Self { + let store: DataStore = value.into(); + store.into() + } +} + +pub trait RepositoriesFactory { + fn repositories(&self) -> Repositories; +} + +impl RepositoriesFactory for DataStore { + fn repositories(&self) -> Repositories { + self.into() + } +} + +impl RepositoriesFactory for Repository { + fn repositories(&self) -> Repositories { + let store: DataStore = self.into(); + store.repositories() + } +} diff --git a/packages/ciphernode/data/src/repository.rs b/packages/ciphernode/data/src/repository.rs index b44f2b07..f3d94963 100644 --- a/packages/ciphernode/data/src/repository.rs +++ b/packages/ciphernode/data/src/repository.rs @@ -6,7 +6,8 @@ use crate::DataStore; #[derive(Debug)] pub struct Repository { - store: DataStore, + /// store is currently set to be a scopeable key value store + store: DataStore, // this could change and be abstracted if need be _p: PhantomData, } @@ -56,7 +57,15 @@ where self.store.read().await } + pub async fn has(&self) -> bool { + self.read().await.ok().flatten().is_some() + } + pub fn write(&self, value: &T) { self.store.write(value) } + + pub fn clear(&self) { + self.store.write::>(None) + } } diff --git a/packages/ciphernode/data/src/snapshot.rs b/packages/ciphernode/data/src/snapshot.rs index a0203f45..2bf91d9e 100644 --- a/packages/ciphernode/data/src/snapshot.rs +++ b/packages/ciphernode/data/src/snapshot.rs @@ -2,6 +2,7 @@ use crate::Repository; use anyhow::Result; use async_trait::async_trait; use serde::{de::DeserializeOwned, Serialize}; +use tracing::error; /// This trait enables the self type to report their state snapshot pub trait Snapshot @@ -14,7 +15,7 @@ where type Snapshot: Serialize + DeserializeOwned; /// Return the Snapshot object for the implementor - fn snapshot(&self) -> Self::Snapshot; + fn snapshot(&self) -> Result; } /// This trait enables the self type to checkpoint its state @@ -24,7 +25,19 @@ pub trait Checkpoint: Snapshot { /// Write the current snapshot to the `Repository` provided by `repository()` fn checkpoint(&self) { - self.repository().write(&self.snapshot()); + let snapshot = match self.snapshot() { + Ok(v) => v, + Err(err) => { + error!("Not saving data because '{}'", err); + return; + } + }; + + self.repository().write(&snapshot); + } + + fn clear_checkpoint(&self) { + self.repository().clear() } } diff --git a/packages/ciphernode/enclave/Cargo.toml b/packages/ciphernode/enclave/Cargo.toml index f0a93ab7..d0ff1e30 100644 --- a/packages/ciphernode/enclave/Cargo.toml +++ b/packages/ciphernode/enclave/Cargo.toml @@ -17,6 +17,7 @@ config = { path = "../config" } data = { path = "../data" } dialoguer = "0.11.0" enclave-core = { path = "../core" } +evm = { path = "../evm" } enclave_node = { path = "../enclave_node" } hex = { workspace = true } once_cell = "1.20.2" diff --git a/packages/ciphernode/enclave/src/commands/wallet/set.rs b/packages/ciphernode/enclave/src/commands/wallet/set.rs index ae6d5773..cfaf1979 100644 --- a/packages/ciphernode/enclave/src/commands/wallet/set.rs +++ b/packages/ciphernode/enclave/src/commands/wallet/set.rs @@ -4,6 +4,7 @@ use cipher::Cipher; use config::AppConfig; use enclave_core::{EventBus, GetErrors}; use enclave_node::get_repositories; +use evm::EthPrivateKeyRepositoryFactory; pub async fn execute(config: &AppConfig, input: String) -> Result<()> { let cipher = Cipher::from_config(config).await?; diff --git a/packages/ciphernode/enclave_node/src/aggregator.rs b/packages/ciphernode/enclave_node/src/aggregator.rs index a63bf636..d2771afc 100644 --- a/packages/ciphernode/enclave_node/src/aggregator.rs +++ b/packages/ciphernode/enclave_node/src/aggregator.rs @@ -2,20 +2,19 @@ use actix::{Actor, Addr}; use anyhow::Result; use cipher::Cipher; use config::AppConfig; +use data::RepositoriesFactory; use enclave_core::EventBus; use evm::{ helpers::{get_signer_from_repository, ProviderConfig, RPC}, - CiphernodeRegistrySol, EnclaveSol, RegistryFilterSol, + CiphernodeRegistryReaderRepositoryFactory, CiphernodeRegistrySol, EnclaveSol, + EnclaveSolReaderRepositoryFactory, EthPrivateKeyRepositoryFactory, RegistryFilterSol, }; use logger::SimpleLogger; -use net::NetworkRelay; +use net::{NetRepositoryFactory, NetworkRelay}; use rand::SeedableRng; use rand_chacha::{rand_core::OsRng, ChaCha20Rng}; -use router::{ - E3RequestRouter, FheFeature, PlaintextAggregatorFeature, PublicKeyAggregatorFeature, - RepositoriesFactory, -}; -use sortition::Sortition; +use router::{E3RequestRouter, FheFeature, PlaintextAggregatorFeature, PublicKeyAggregatorFeature}; +use sortition::{Sortition, SortitionRepositoryFactory}; use std::sync::{Arc, Mutex}; use test_helpers::{PlaintextWriter, PublicKeyWriter}; use tokio::task::JoinHandle; @@ -81,7 +80,13 @@ pub async fn setup_aggregator( .build() .await?; - let (_, join_handle, peer_id) = NetworkRelay::setup_with_peer(bus.clone(), config.peers())?; + let (_, join_handle, peer_id) = NetworkRelay::setup_with_peer( + bus.clone(), + config.peers(), + repositories.libp2p_key(), + &cipher, + ) + .await?; if let Some(path) = pubkey_write_path { PublicKeyWriter::attach(path, bus.clone()); diff --git a/packages/ciphernode/enclave_node/src/ciphernode.rs b/packages/ciphernode/enclave_node/src/ciphernode.rs index 30386ab2..8a500a44 100644 --- a/packages/ciphernode/enclave_node/src/ciphernode.rs +++ b/packages/ciphernode/enclave_node/src/ciphernode.rs @@ -3,19 +3,20 @@ use alloy::primitives::Address; use anyhow::Result; use cipher::Cipher; use config::AppConfig; +use data::RepositoriesFactory; use enclave_core::{get_tag, EventBus}; use evm::{ helpers::{ProviderConfig, RPC}, - CiphernodeRegistrySol, EnclaveSolReader, + CiphernodeRegistryReaderRepositoryFactory, CiphernodeRegistrySol, EnclaveSolReader, + EnclaveSolReaderRepositoryFactory, }; use logger::SimpleLogger; -use net::NetworkRelay; +use net::{NetRepositoryFactory, NetworkRelay}; use rand::SeedableRng; use rand_chacha::rand_core::OsRng; -use router::{ - CiphernodeSelector, E3RequestRouter, FheFeature, KeyshareFeature, RepositoriesFactory, -}; +use router::{CiphernodeSelector, E3RequestRouter, FheFeature, KeyshareFeature}; use sortition::Sortition; +use sortition::SortitionRepositoryFactory; use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; use tracing::instrument; @@ -73,7 +74,13 @@ pub async fn setup_ciphernode( .build() .await?; - let (_, join_handle, peer_id) = NetworkRelay::setup_with_peer(bus.clone(), config.peers())?; + let (_, join_handle, peer_id) = NetworkRelay::setup_with_peer( + bus.clone(), + config.peers(), + repositories.libp2p_key(), + &cipher, + ) + .await?; let nm = format!("CIPHER({})", &address.to_string()[0..5]); SimpleLogger::attach(&nm, bus.clone()); diff --git a/packages/ciphernode/enclave_node/src/datastore.rs b/packages/ciphernode/enclave_node/src/datastore.rs index 17a2be04..43ff182f 100644 --- a/packages/ciphernode/enclave_node/src/datastore.rs +++ b/packages/ciphernode/enclave_node/src/datastore.rs @@ -4,8 +4,8 @@ use actix::{Actor, Addr}; use anyhow::Result; use config::AppConfig; use data::{DataStore, InMemStore, SledStore}; +use data::{Repositories, RepositoriesFactory}; use enclave_core::EventBus; -use router::{Repositories, RepositoriesFactory}; pub fn get_sled_store(bus: &Addr, db_file: &PathBuf) -> Result { Ok((&SledStore::new(bus, db_file)?).into()) diff --git a/packages/ciphernode/evm/Cargo.toml b/packages/ciphernode/evm/Cargo.toml index 0a50c22a..081743ae 100644 --- a/packages/ciphernode/evm/Cargo.toml +++ b/packages/ciphernode/evm/Cargo.toml @@ -15,8 +15,8 @@ config = { path = "../config" } data = { path = "../data" } enclave-core = { path = "../core" } futures-util = { workspace = true } -sortition = { path = "../sortition" } serde = { workspace = true } +sortition = { path = "../sortition" } tokio = { workspace = true } tracing = { workspace = true } url = { workspace = true } diff --git a/packages/ciphernode/evm/src/event_reader.rs b/packages/ciphernode/evm/src/event_reader.rs index 4981c4f6..df218fb7 100644 --- a/packages/ciphernode/evm/src/event_reader.rs +++ b/packages/ciphernode/evm/src/event_reader.rs @@ -8,8 +8,7 @@ use alloy::providers::Provider; use alloy::rpc::types::Filter; use alloy::transports::{BoxTransport, Transport}; use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use data::{AutoPersist, Persistable, Repository, Snapshot}; use enclave_core::{ get_tag, BusError, EnclaveErrorType, EnclaveEvent, EventBus, EventId, Subscribe, }; @@ -50,7 +49,7 @@ where contract_address: Address, start_block: Option, bus: Addr, - repository: Repository, + state: Persistable, } #[derive(Default, serde::Serialize, serde::Deserialize, Clone)] @@ -80,10 +79,8 @@ where start_block: Option, /// Event bus for error propagation bus: Addr, - /// The in memory state of the event reader - state: EvmEventReaderState, - /// Repository to save the state of the event reader - repository: Repository, + /// The auto persistable state of the event reader + state: Persistable, } impl EvmEventReader @@ -101,22 +98,10 @@ where shutdown_tx: Some(shutdown_tx), start_block: params.start_block, bus: params.bus, - state: EvmEventReaderState::default(), - repository: params.repository, + state: params.state, } } - #[instrument(name="evm_event_reader", skip_all, fields(id = get_tag()))] - pub async fn load(params: EvmEventReaderParams) -> Result { - Ok(if let Some(snapshot) = params.repository.read().await? { - info!("Loading from snapshot"); - Self::from_snapshot(params, snapshot).await? - } else { - info!("Loading from params"); - Self::new(params) - }) - } - pub async fn attach( provider: &WithChainId, extractor: ExtractorFn, @@ -125,15 +110,21 @@ where bus: &Addr, repository: &Repository, ) -> Result> { + let sync_state = repository + .clone() + .load_or_default(EvmEventReaderState::default()) + .await?; + let params = EvmEventReaderParams { provider: provider.clone(), extractor, contract_address: contract_address.parse()?, start_block, bus: bus.clone(), - repository: repository.clone(), + state: sync_state, }; - let addr = EvmEventReader::load(params).await?.start(); + + let addr = EvmEventReader::new(params).start(); bus.do_send(Subscribe::new("Shutdown", addr.clone().into())); @@ -284,69 +275,33 @@ where #[instrument(name="evm_event_reader", skip_all, fields(id = get_tag()))] fn handle(&mut self, wrapped: EnclaveEvmEvent, _: &mut Self::Context) -> Self::Result { - let event_id = wrapped.get_id(); - info!("Processing event: {}", event_id); - info!("cache length: {}", self.state.ids.len()); - if self.state.ids.contains(&event_id) { - error!( - "Event id {} has already been seen and was not forwarded to the bus", - &event_id - ); - return; - } - let event_type = wrapped.event.event_type(); + match self.state.try_mutate(|mut state| { + let event_id = wrapped.get_id(); + info!("Processing event: {}", event_id); + info!("cache length: {}", state.ids.len()); + if state.ids.contains(&event_id) { + error!( + "Event id {} has already been seen and was not forwarded to the bus", + &event_id + ); + return Ok(state); + } - // Forward everything else to the event bus - self.bus.do_send(wrapped.event); + let event_type = wrapped.event.event_type(); - // Save processed ids - info!("Storing event(EVM) in cache {}({})", event_type, event_id); - self.state.ids.insert(event_id); - self.state.last_block = wrapped.block; - self.checkpoint(); - } -} + // Forward everything else to the event bus + self.bus.do_send(wrapped.event); -impl Snapshot for EvmEventReader -where - P: Provider + Clone + 'static, - T: Transport + Clone + Unpin, -{ - type Snapshot = EvmEventReaderState; - fn snapshot(&self) -> Self::Snapshot { - self.state.clone() - } -} + // Save processed ids + info!("Storing event(EVM) in cache {}({})", event_type, event_id); -impl Checkpoint for EvmEventReader -where - P: Provider + Clone + 'static, - T: Transport + Clone + Unpin, -{ - fn repository(&self) -> &Repository { - &self.repository - } -} + state.ids.insert(event_id); + state.last_block = wrapped.block; -#[async_trait] -impl FromSnapshotWithParams for EvmEventReader -where - P: Provider + Clone + 'static, - T: Transport + Clone + Unpin, -{ - type Params = EvmEventReaderParams; - async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Ok(Self { - contract_address: params.contract_address, - provider: Some(params.provider), - extractor: params.extractor, - shutdown_rx: Some(shutdown_rx), - shutdown_tx: Some(shutdown_tx), - start_block: params.start_block, - bus: params.bus, - state: snapshot, - repository: params.repository, - }) + Ok(state) + }) { + Ok(_) => (), + Err(err) => self.bus.err(EnclaveErrorType::Evm, err), + } } } diff --git a/packages/ciphernode/evm/src/lib.rs b/packages/ciphernode/evm/src/lib.rs index e7fb10c9..b78ef57b 100644 --- a/packages/ciphernode/evm/src/lib.rs +++ b/packages/ciphernode/evm/src/lib.rs @@ -5,6 +5,7 @@ mod enclave_sol_writer; mod event_reader; pub mod helpers; mod registry_filter_sol; +mod repositories; pub use ciphernode_registry_sol::{CiphernodeRegistrySol, CiphernodeRegistrySolReader}; pub use enclave_sol::EnclaveSol; @@ -12,3 +13,4 @@ pub use enclave_sol_reader::EnclaveSolReader; pub use enclave_sol_writer::EnclaveSolWriter; pub use event_reader::{EnclaveEvmEvent, EvmEventReader, EvmEventReaderState, ExtractorFn}; pub use registry_filter_sol::{RegistryFilterSol, RegistryFilterSolWriter}; +pub use repositories::*; diff --git a/packages/ciphernode/evm/src/repositories.rs b/packages/ciphernode/evm/src/repositories.rs new file mode 100644 index 00000000..f20b5d1e --- /dev/null +++ b/packages/ciphernode/evm/src/repositories.rs @@ -0,0 +1,37 @@ +use config::StoreKeys; +use data::{Repositories, Repository}; + +use crate::EvmEventReaderState; + +pub trait EthPrivateKeyRepositoryFactory { + fn eth_private_key(&self) -> Repository>; +} + +impl EthPrivateKeyRepositoryFactory for Repositories { + fn eth_private_key(&self) -> Repository> { + Repository::new(self.store.scope(StoreKeys::eth_private_key())) + } +} + +pub trait EnclaveSolReaderRepositoryFactory { + fn enclave_sol_reader(&self, chain_id: u64) -> Repository; +} + +impl EnclaveSolReaderRepositoryFactory for Repositories { + fn enclave_sol_reader(&self, chain_id: u64) -> Repository { + Repository::new(self.store.scope(StoreKeys::enclave_sol_reader(chain_id))) + } +} + +pub trait CiphernodeRegistryReaderRepositoryFactory { + fn ciphernode_registry_reader(&self, chain_id: u64) -> Repository; +} + +impl CiphernodeRegistryReaderRepositoryFactory for Repositories { + fn ciphernode_registry_reader(&self, chain_id: u64) -> Repository { + Repository::new( + self.store + .scope(StoreKeys::ciphernode_registry_reader(chain_id)), + ) + } +} diff --git a/packages/ciphernode/fhe/Cargo.toml b/packages/ciphernode/fhe/Cargo.toml index 3272964a..7781f7bf 100644 --- a/packages/ciphernode/fhe/Cargo.toml +++ b/packages/ciphernode/fhe/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" anyhow = { workspace = true } async-trait = { workspace = true } bincode = { workspace = true } +config = { path = "../config" } data = { path = "../data" } enclave-core = { path = "../core" } fhe-traits = { workspace = true } diff --git a/packages/ciphernode/fhe/src/fhe.rs b/packages/ciphernode/fhe/src/fhe.rs index f5131aec..84333de4 100644 --- a/packages/ciphernode/fhe/src/fhe.rs +++ b/packages/ciphernode/fhe/src/fhe.rs @@ -127,11 +127,11 @@ impl Fhe { impl Snapshot for Fhe { type Snapshot = FheSnapshot; - fn snapshot(&self) -> Self::Snapshot { - FheSnapshot { + fn snapshot(&self) -> Result { + Ok(FheSnapshot { crp: self.crp.to_bytes(), params: self.params.to_bytes(), - } + }) } } diff --git a/packages/ciphernode/fhe/src/lib.rs b/packages/ciphernode/fhe/src/lib.rs index 75b9bd75..14983c0a 100644 --- a/packages/ciphernode/fhe/src/lib.rs +++ b/packages/ciphernode/fhe/src/lib.rs @@ -1,5 +1,7 @@ mod fhe; +mod repositories; mod utils; pub use fhe::*; +pub use repositories::*; pub use utils::*; diff --git a/packages/ciphernode/fhe/src/repositories.rs b/packages/ciphernode/fhe/src/repositories.rs new file mode 100644 index 00000000..7e035b73 --- /dev/null +++ b/packages/ciphernode/fhe/src/repositories.rs @@ -0,0 +1,15 @@ +use config::StoreKeys; +use data::{Repositories, Repository}; +use enclave_core::E3id; + +use crate::FheSnapshot; + +pub trait FheRepositoryFactory { + fn fhe(&self, e3_id: &E3id) -> Repository; +} + +impl FheRepositoryFactory for Repositories { + fn fhe(&self, e3_id: &E3id) -> Repository { + Repository::new(self.store.scope(StoreKeys::fhe(e3_id))) + } +} diff --git a/packages/ciphernode/keyshare/Cargo.toml b/packages/ciphernode/keyshare/Cargo.toml index bc05ba0b..37a64f6c 100644 --- a/packages/ciphernode/keyshare/Cargo.toml +++ b/packages/ciphernode/keyshare/Cargo.toml @@ -9,6 +9,7 @@ anyhow = { workspace = true } async-trait = { workspace = true } data = { path = "../data" } cipher = { path = "../cipher" } +config = { path = "../config" } enclave-core = { path = "../core" } fhe = { path = "../fhe" } serde = { workspace = true } diff --git a/packages/ciphernode/keyshare/src/keyshare.rs b/packages/ciphernode/keyshare/src/keyshare.rs index a2e93c0e..dab8a0c8 100644 --- a/packages/ciphernode/keyshare/src/keyshare.rs +++ b/packages/ciphernode/keyshare/src/keyshare.rs @@ -1,22 +1,19 @@ use actix::prelude::*; use anyhow::{anyhow, Result}; -use async_trait::async_trait; use cipher::Cipher; -use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use data::Persistable; use enclave_core::{ BusError, CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, Die, E3RequestComplete, EnclaveErrorType, EnclaveEvent, EventBus, FromError, KeyshareCreated, }; use fhe::{DecryptCiphertext, Fhe}; -use serde::{Deserialize, Serialize}; use std::sync::Arc; use tracing::warn; pub struct Keyshare { fhe: Arc, - store: Repository, bus: Addr, - secret: Option>, + secret: Persistable>, address: String, cipher: Arc, } @@ -27,24 +24,18 @@ impl Actor for Keyshare { pub struct KeyshareParams { pub bus: Addr, - pub store: Repository, + pub secret: Persistable>, pub fhe: Arc, pub address: String, pub cipher: Arc, } -#[derive(Serialize, Deserialize)] -pub struct KeyshareState { - secret: Option>, -} - impl Keyshare { pub fn new(params: KeyshareParams) -> Self { Self { bus: params.bus, fhe: params.fhe, - store: params.store, - secret: None, + secret: params.secret, address: params.address, cipher: params.cipher, } @@ -53,7 +44,7 @@ impl Keyshare { fn set_secret(&mut self, mut data: Vec) -> Result<()> { let encrypted = self.cipher.encrypt_data(&mut data)?; - self.secret = Some(encrypted); + self.secret.set(encrypted); Ok(()) } @@ -61,8 +52,8 @@ impl Keyshare { fn get_secret(&self) -> Result> { let encrypted = self .secret - .as_ref() - .ok_or(anyhow!("No secret share available on Keyshare"))?; + .get() + .ok_or(anyhow!("State was not stored on keyshare"))?; let decrypted = self.cipher.decrypt_data(&encrypted)?; @@ -70,38 +61,7 @@ impl Keyshare { } fn clear_secret(&mut self) { - self.secret = None; - } -} - -impl Snapshot for Keyshare { - type Snapshot = KeyshareState; - - fn snapshot(&self) -> Self::Snapshot { - KeyshareState { - secret: self.secret.clone(), - } - } -} - -impl Checkpoint for Keyshare { - fn repository(&self) -> &Repository { - &self.store - } -} - -#[async_trait] -impl FromSnapshotWithParams for Keyshare { - type Params = KeyshareParams; - async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result { - Ok(Self { - bus: params.bus, - fhe: params.fhe, - store: params.store, - secret: snapshot.secret, - address: params.address, - cipher: params.cipher, - }) + self.secret.clear(); } } @@ -148,9 +108,6 @@ impl Handler for Keyshare { e3_id, node: self.address.clone(), })); - - // Write the snapshot to the store - self.checkpoint() } } @@ -198,7 +155,6 @@ impl Handler for Keyshare { type Result = (); fn handle(&mut self, _: E3RequestComplete, ctx: &mut Self::Context) -> Self::Result { self.clear_secret(); - self.checkpoint(); ctx.notify(Die); } } diff --git a/packages/ciphernode/keyshare/src/lib.rs b/packages/ciphernode/keyshare/src/lib.rs index 46e4b5c9..8c81d825 100644 --- a/packages/ciphernode/keyshare/src/lib.rs +++ b/packages/ciphernode/keyshare/src/lib.rs @@ -1,2 +1,4 @@ mod keyshare; +mod repositories; pub use keyshare::*; +pub use repositories::*; diff --git a/packages/ciphernode/keyshare/src/repositories.rs b/packages/ciphernode/keyshare/src/repositories.rs new file mode 100644 index 00000000..772c5335 --- /dev/null +++ b/packages/ciphernode/keyshare/src/repositories.rs @@ -0,0 +1,13 @@ +use config::StoreKeys; +use data::{Repositories, Repository}; +use enclave_core::E3id; + +pub trait KeyshareRepositoryFactory { + fn keyshare(&self, e3_id: &E3id) -> Repository>; +} + +impl KeyshareRepositoryFactory for Repositories { + fn keyshare(&self, e3_id: &E3id) -> Repository> { + Repository::new(self.store.scope(StoreKeys::keyshare(e3_id))) + } +} diff --git a/packages/ciphernode/net/Cargo.toml b/packages/ciphernode/net/Cargo.toml index a8a7c7c8..8eda8c22 100644 --- a/packages/ciphernode/net/Cargo.toml +++ b/packages/ciphernode/net/Cargo.toml @@ -11,6 +11,9 @@ repository = "https://github.com/gnosisguild/enclave/packages/ciphernode" async-std = { workspace = true, features = ["attributes"] } async-trait = { workspace = true } futures = { workspace = true } +config = { workspace = true } +cipher = { workspace = true } +data = { workspace = true } libp2p = { workspace = true, features = [ "async-std", "gossipsub", @@ -22,6 +25,7 @@ libp2p = { workspace = true, features = [ "quic", "tokio", ] } +serde = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/packages/ciphernode/net/src/encrypted_keypair.rs b/packages/ciphernode/net/src/encrypted_keypair.rs new file mode 100644 index 00000000..7891da83 --- /dev/null +++ b/packages/ciphernode/net/src/encrypted_keypair.rs @@ -0,0 +1,30 @@ +use anyhow::*; +use cipher::Cipher; +use libp2p::identity::Keypair; +use serde::{Deserialize, Serialize}; + +/// Hold an encrypted libp2p keypair in a serializable way for storage +#[derive(Clone, Serialize, Deserialize)] +pub struct EncryptedKeypair { + secret: Vec, +} + +impl EncryptedKeypair { + /// Generate an encrypted Keypair and store it in memory encrypted to the given Cipher + pub fn generate(cipher: &Cipher) -> Result { + let mut secret_raw = Keypair::generate_ed25519() + .try_into_ed25519()? + .to_bytes() + .to_vec(); + + let secret = cipher.encrypt_data(&mut secret_raw)?; + Ok(Self { secret }) + } + + /// Decrypt the Keypair with the given cipher and return it + pub fn decrypt(&self, cipher: &Cipher) -> Result { + Ok(Keypair::ed25519_from_bytes( + cipher.decrypt_data(&self.secret)?, + )?) + } +} diff --git a/packages/ciphernode/net/src/lib.rs b/packages/ciphernode/net/src/lib.rs index e1f2cde5..0e11a7d1 100644 --- a/packages/ciphernode/net/src/lib.rs +++ b/packages/ciphernode/net/src/lib.rs @@ -1,8 +1,12 @@ #![crate_name = "net"] #![crate_type = "lib"] +mod encrypted_keypair; mod network_peer; mod network_relay; +mod repositories; +pub use encrypted_keypair::*; pub use network_peer::*; pub use network_relay::*; +pub use repositories::*; diff --git a/packages/ciphernode/net/src/network_peer.rs b/packages/ciphernode/net/src/network_peer.rs index 8e1d80d6..12fb10e9 100644 --- a/packages/ciphernode/net/src/network_peer.rs +++ b/packages/ciphernode/net/src/network_peer.rs @@ -14,7 +14,7 @@ use std::hash::{Hash, Hasher}; use std::{hash::DefaultHasher, io::Error, time::Duration}; use tokio::{ select, - sync::mpsc::{self, channel, Receiver, Sender}, + sync::mpsc::{channel, Receiver, Sender}, }; use tracing::{debug, error, info, trace, warn}; diff --git a/packages/ciphernode/net/src/network_relay.rs b/packages/ciphernode/net/src/network_relay.rs index fe2b8add..54a7720c 100644 --- a/packages/ciphernode/net/src/network_relay.rs +++ b/packages/ciphernode/net/src/network_relay.rs @@ -1,11 +1,15 @@ +use std::sync::Arc; use std::{collections::HashSet, error::Error}; +use crate::EncryptedKeypair; use crate::NetworkPeer; /// Actor for connecting to an libp2p client via it's mpsc channel interface /// This Actor should be responsible for use actix::prelude::*; use anyhow::anyhow; use anyhow::Result; +use cipher::Cipher; +use data::{AutoPersist, Repository}; use enclave_core::{EnclaveEvent, EventBus, EventId, Subscribe}; use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{error, trace}; @@ -19,7 +23,7 @@ pub struct NetworkRelay { } impl Actor for NetworkRelay { - type Context = Context; + type Context = actix::Context; } #[derive(Message, Clone, Debug, PartialEq, Eq)] @@ -63,11 +67,17 @@ impl NetworkRelay { } /// Spawn a Libp2p peer and hook it up to this actor - pub fn setup_with_peer( + pub async fn setup_with_peer( bus: Addr, peers: Vec, + repository: Repository, + cipher: &Arc, ) -> Result<(Addr, tokio::task::JoinHandle>, String)> { - let keypair = libp2p::identity::Keypair::generate_ed25519(); + let keypair = repository + .load_or_else(|| EncryptedKeypair::generate(cipher)) + .await? + .try_get()? + .decrypt(cipher)?; let mut peer = NetworkPeer::new(&keypair, peers, None, "tmp-enclave-gossip-topic")?; let rx = peer.rx().ok_or(anyhow!("Peer rx already taken"))?; let p2p_addr = NetworkRelay::setup(bus, peer.tx(), rx); diff --git a/packages/ciphernode/net/src/repositories.rs b/packages/ciphernode/net/src/repositories.rs new file mode 100644 index 00000000..c3f8bdcf --- /dev/null +++ b/packages/ciphernode/net/src/repositories.rs @@ -0,0 +1,14 @@ +use config::StoreKeys; +use data::{Repositories, Repository}; + +use crate::EncryptedKeypair; + +pub trait NetRepositoryFactory { + fn libp2p_key(&self) -> Repository; +} + +impl NetRepositoryFactory for Repositories { + fn libp2p_key(&self) -> Repository { + Repository::new(self.store.scope(StoreKeys::libp2p_key())) + } +} diff --git a/packages/ciphernode/router/Cargo.toml b/packages/ciphernode/router/Cargo.toml index cb626737..59d714b9 100644 --- a/packages/ciphernode/router/Cargo.toml +++ b/packages/ciphernode/router/Cargo.toml @@ -9,6 +9,7 @@ enclave-core = { path = "../core" } sortition = { path = "../sortition" } fhe = { path = "../fhe" } data = { path = "../data" } +config = { path = "../config" } keyshare = { path = "../keyshare" } aggregator = { path = "../aggregator" } evm = { path = "../evm" } diff --git a/packages/ciphernode/router/src/committee_meta.rs b/packages/ciphernode/router/src/committee_meta.rs index eb9a7d2c..c414209e 100644 --- a/packages/ciphernode/router/src/committee_meta.rs +++ b/packages/ciphernode/router/src/committee_meta.rs @@ -1,6 +1,7 @@ -use crate::{E3Feature, E3RequestContext, E3RequestContextSnapshot, RepositoriesFactory}; +use crate::{E3Feature, E3RequestContext, E3RequestContextSnapshot, MetaRepositoryFactory}; use anyhow::*; use async_trait::async_trait; +use data::RepositoriesFactory; use enclave_core::{E3Requested, EnclaveEvent, Seed}; #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] diff --git a/packages/ciphernode/router/src/context.rs b/packages/ciphernode/router/src/context.rs index 506e106c..dddef1a1 100644 --- a/packages/ciphernode/router/src/context.rs +++ b/packages/ciphernode/router/src/context.rs @@ -1,15 +1,15 @@ -use std::sync::Arc; - -use crate::{CommitteeMeta, E3Feature, EventBuffer, Repositories, RepositoriesFactory}; +use crate::{CommitteeMeta, E3Feature, EventBuffer}; use actix::{Addr, Recipient}; use aggregator::{PlaintextAggregator, PublicKeyAggregator}; use anyhow::Result; use async_trait::async_trait; use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use data::{Repositories, RepositoriesFactory}; use enclave_core::{E3id, EnclaveEvent}; use fhe::Fhe; use keyshare::Keyshare; use serde::{Deserialize, Serialize}; +use std::sync::Arc; /// Context that is set to each event hook. Hooks can use this context to gather dependencies if /// they need to instantiate struct instances or actors. @@ -151,15 +151,15 @@ impl RepositoriesFactory for E3RequestContext { impl Snapshot for E3RequestContext { type Snapshot = E3RequestContextSnapshot; - fn snapshot(&self) -> Self::Snapshot { - Self::Snapshot { + fn snapshot(&self) -> Result { + Ok(Self::Snapshot { e3_id: self.e3_id.clone(), meta: self.meta.is_some(), fhe: self.fhe.is_some(), publickey: self.publickey.is_some(), plaintext: self.plaintext.is_some(), keyshare: self.keyshare.is_some(), - } + }) } } diff --git a/packages/ciphernode/router/src/e3_request_router.rs b/packages/ciphernode/router/src/e3_request_router.rs index 0890d4e0..0ce11620 100644 --- a/packages/ciphernode/router/src/e3_request_router.rs +++ b/packages/ciphernode/router/src/e3_request_router.rs @@ -1,8 +1,9 @@ use crate::CommitteeMetaFeature; +use crate::ContextRepositoryFactory; use crate::E3RequestContext; use crate::E3RequestContextParams; use crate::E3RequestContextSnapshot; -use crate::RepositoriesFactory; +use crate::RouterRepositoryFactory; use actix::AsyncContext; use actix::{Actor, Addr, Context, Handler}; use anyhow::*; @@ -10,6 +11,7 @@ use async_trait::async_trait; use data::Checkpoint; use data::DataStore; use data::FromSnapshotWithParams; +use data::RepositoriesFactory; use data::Repository; use data::Snapshot; use enclave_core::E3RequestComplete; @@ -178,14 +180,14 @@ pub struct E3RequestRouterSnapshot { impl Snapshot for E3RequestRouter { type Snapshot = E3RequestRouterSnapshot; - fn snapshot(&self) -> Self::Snapshot { + fn snapshot(&self) -> Result { let contexts = self.contexts.keys().cloned().collect(); let completed = self.completed.clone(); - Self::Snapshot { + Ok(Self::Snapshot { completed, contexts, - } + }) } } diff --git a/packages/ciphernode/router/src/hooks.rs b/packages/ciphernode/router/src/hooks.rs index c2969214..6295b97b 100644 --- a/packages/ciphernode/router/src/hooks.rs +++ b/packages/ciphernode/router/src/hooks.rs @@ -1,16 +1,18 @@ -use crate::{E3Feature, E3RequestContext, E3RequestContextSnapshot, RepositoriesFactory}; +use crate::{E3Feature, E3RequestContext, E3RequestContextSnapshot}; use actix::{Actor, Addr}; use aggregator::{ - PlaintextAggregator, PlaintextAggregatorParams, PlaintextAggregatorState, PublicKeyAggregator, - PublicKeyAggregatorParams, PublicKeyAggregatorState, + PlaintextAggregator, PlaintextAggregatorParams, PlaintextAggregatorState, + PlaintextRepositoryFactory, PublicKeyAggregator, PublicKeyAggregatorParams, + PublicKeyAggregatorState, PublicKeyRepositoryFactory, }; use anyhow::{anyhow, Result}; use async_trait::async_trait; use cipher::Cipher; +use data::{AutoPersist, RepositoriesFactory}; use data::{FromSnapshotWithParams, Snapshot}; use enclave_core::{BusError, E3Requested, EnclaveErrorType, EnclaveEvent, EventBus}; -use fhe::{Fhe, SharedRng}; -use keyshare::{Keyshare, KeyshareParams}; +use fhe::{Fhe, FheRepositoryFactory, SharedRng}; +use keyshare::{Keyshare, KeyshareParams, KeyshareRepositoryFactory}; use sortition::Sortition; use std::sync::Arc; @@ -57,7 +59,14 @@ impl E3Feature for FheFeature { let fhe = Arc::new(fhe_inner); // FHE doesn't implement Checkpoint so we are going to store it manually - ctx.repositories().fhe(&e3_id).write(&fhe.snapshot()); + let Ok(snapshot) = fhe.snapshot() else { + self.bus.err( + EnclaveErrorType::KeyGeneration, + anyhow!("Failed to get snapshot"), + ); + return; + }; + ctx.repositories().fhe(&e3_id).write(&snapshot); let _ = ctx.set_fhe(fhe); } @@ -120,11 +129,13 @@ impl E3Feature for KeyshareFeature { }; let e3_id = data.clone().e3_id; + let repo = ctx.repositories().keyshare(&e3_id); + let container = repo.send(None); ctx.set_keyshare( Keyshare::new(KeyshareParams { bus: self.bus.clone(), - store: ctx.repositories().keyshare(&e3_id), + secret: container, fhe: fhe.clone(), address: self.address.clone(), cipher: self.cipher.clone(), @@ -143,10 +154,10 @@ impl E3Feature for KeyshareFeature { return Ok(()); }; - let store = ctx.repositories().keyshare(&snapshot.e3_id); + let sync_secret = ctx.repositories().keyshare(&snapshot.e3_id).load().await?; - // No Snapshot returned from the store -> bail - let Some(snap) = store.read().await? else { + // No Snapshot returned from the sync_secret -> bail + if !sync_secret.has() { return Ok(()); }; @@ -160,17 +171,13 @@ impl E3Feature for KeyshareFeature { }; // Construct from snapshot - let value = Keyshare::from_snapshot( - KeyshareParams { - fhe, - bus: self.bus.clone(), - store, - address: self.address.clone(), - cipher: self.cipher.clone(), - }, - snap, - ) - .await? + let value = Keyshare::new(KeyshareParams { + fhe, + bus: self.bus.clone(), + secret: sync_secret, + address: self.address.clone(), + cipher: self.cipher.clone(), + }) .start(); // send to context @@ -221,22 +228,23 @@ impl E3Feature for PlaintextAggregatorFeature { }; let e3_id = data.e3_id.clone(); - - let _ = ctx.set_plaintext( + let repo = ctx.repositories().plaintext(&e3_id); + let sync_state = repo.send(Some(PlaintextAggregatorState::init( + meta.threshold_m, + meta.seed, + data.ciphertext_output.clone(), + ))); + + ctx.set_plaintext( PlaintextAggregator::new( PlaintextAggregatorParams { fhe: fhe.clone(), bus: self.bus.clone(), - store: ctx.repositories().plaintext(&e3_id), sortition: self.sortition.clone(), - e3_id, + e3_id: e3_id.clone(), src_chain_id: meta.src_chain_id, }, - PlaintextAggregatorState::init( - meta.threshold_m, - meta.seed, - data.ciphertext_output.clone(), - ), + sync_state, ) .start(), ); @@ -252,10 +260,11 @@ impl E3Feature for PlaintextAggregatorFeature { return Ok(()); } - let store = ctx.repositories().plaintext(&snapshot.e3_id); + let repo = ctx.repositories().plaintext(&snapshot.e3_id); + let sync_state = repo.load().await?; // No Snapshot returned from the store -> bail - let Some(snap) = store.read().await? else { + if !sync_state.has() { return Ok(()); }; @@ -276,18 +285,16 @@ impl E3Feature for PlaintextAggregatorFeature { return Ok(()); }; - let value = PlaintextAggregator::from_snapshot( + let value = PlaintextAggregator::new( PlaintextAggregatorParams { fhe: fhe.clone(), bus: self.bus.clone(), - store, sortition: self.sortition.clone(), e3_id: ctx.e3_id.clone(), src_chain_id: meta.src_chain_id, }, - snap, + sync_state, ) - .await? .start(); // send to context @@ -338,18 +345,21 @@ impl E3Feature for PublicKeyAggregatorFeature { }; let e3_id = data.e3_id.clone(); - - let _ = ctx.set_publickey( + let repo = ctx.repositories().publickey(&e3_id); + let sync_state = repo.send(Some(PublicKeyAggregatorState::init( + meta.threshold_m, + meta.seed, + ))); + ctx.set_publickey( PublicKeyAggregator::new( PublicKeyAggregatorParams { fhe: fhe.clone(), bus: self.bus.clone(), - store: ctx.repositories().publickey(&e3_id), sortition: self.sortition.clone(), e3_id, src_chain_id: meta.src_chain_id, }, - PublicKeyAggregatorState::init(meta.threshold_m, meta.seed), + sync_state, ) .start(), ); @@ -365,10 +375,11 @@ impl E3Feature for PublicKeyAggregatorFeature { return Ok(()); }; - let repository = ctx.repositories().publickey(&ctx.e3_id); + let repo = ctx.repositories().publickey(&ctx.e3_id); + let sync_state = repo.load().await?; // No Snapshot returned from the store -> bail - let Some(snap) = repository.read().await? else { + if !sync_state.has() { return Ok(()); }; @@ -391,18 +402,16 @@ impl E3Feature for PublicKeyAggregatorFeature { return Ok(()); }; - let value = PublicKeyAggregator::from_snapshot( + let value = PublicKeyAggregator::new( PublicKeyAggregatorParams { fhe: fhe.clone(), bus: self.bus.clone(), - store: repository, sortition: self.sortition.clone(), e3_id: ctx.e3_id.clone(), src_chain_id: meta.src_chain_id, }, - snap, + sync_state, ) - .await? .start(); // send to context diff --git a/packages/ciphernode/router/src/repositories.rs b/packages/ciphernode/router/src/repositories.rs index e8557c18..de6c11b2 100644 --- a/packages/ciphernode/router/src/repositories.rs +++ b/packages/ciphernode/router/src/repositories.rs @@ -1,106 +1,35 @@ -use crate::{CommitteeMeta, E3RequestContextSnapshot, E3RequestRouterSnapshot}; -use aggregator::{PlaintextAggregatorState, PublicKeyAggregatorState}; -use data::{DataStore, Repository}; +use config::StoreKeys; +use data::{Repositories, Repository}; use enclave_core::E3id; -use evm::EvmEventReaderState; -use fhe::FheSnapshot; -use keyshare::KeyshareState; -use sortition::SortitionModule; -pub struct Repositories { - store: DataStore, -} +use crate::{CommitteeMeta, E3RequestContextSnapshot, E3RequestRouterSnapshot}; -impl From for Repositories { - fn from(value: DataStore) -> Self { - Repositories { store: value } - } -} -impl From<&DataStore> for Repositories { - fn from(value: &DataStore) -> Self { - Repositories { - store: value.clone(), - } - } +pub trait MetaRepositoryFactory { + fn meta(&self, e3_id: &E3id) -> Repository; } -impl Repositories { - pub fn new(store: DataStore) -> Self { - Repositories { store } +impl MetaRepositoryFactory for Repositories { + fn meta(&self, e3_id: &E3id) -> Repository { + Repository::new(self.store.scope(StoreKeys::meta(e3_id))) } } -impl From> for Repositories { - fn from(value: Repository) -> Self { - let store: DataStore = value.into(); - store.into() - } +pub trait ContextRepositoryFactory { + fn context(&self, e3_id: &E3id) -> Repository; } -impl Repositories { - pub fn keyshare(&self, e3_id: &E3id) -> Repository { - Repository::new(self.store.scope(format!("//keyshare/{e3_id}"))) - } - - pub fn plaintext(&self, e3_id: &E3id) -> Repository { - Repository::new(self.store.scope(format!("//plaintext/{e3_id}"))) - } - - pub fn publickey(&self, e3_id: &E3id) -> Repository { - Repository::new(self.store.scope(format!("//publickey/{e3_id}"))) - } - - pub fn fhe(&self, e3_id: &E3id) -> Repository { - Repository::new(self.store.scope(format!("//fhe/{e3_id}"))) - } - - pub fn meta(&self, e3_id: &E3id) -> Repository { - Repository::new(self.store.scope(format!("//meta/{e3_id}"))) - } - - pub fn context(&self, e3_id: &E3id) -> Repository { - Repository::new(self.store.scope(format!("//context/{e3_id}"))) - } - - pub fn router(&self) -> Repository { - Repository::new(self.store.scope(format!("//router"))) - } - - pub fn sortition(&self) -> Repository { - Repository::new(self.store.scope(format!("//sortition"))) - } - - pub fn eth_private_key(&self) -> Repository> { - Repository::new(self.store.scope(format!("//eth_private_key"))) - } - - pub fn enclave_sol_reader(&self, chain_id: u64) -> Repository { - Repository::new( - self.store - .scope(format!("//evm_readers/enclave/{chain_id}")), - ) - } - pub fn ciphernode_registry_reader(&self, chain_id: u64) -> Repository { - Repository::new( - self.store - .scope(format!("//evm_readers/ciphernode_registry/{chain_id}")), - ) +impl ContextRepositoryFactory for Repositories { + fn context(&self, e3_id: &E3id) -> Repository { + Repository::new(self.store.scope(StoreKeys::context(e3_id))) } } -pub trait RepositoriesFactory { - fn repositories(&self) -> Repositories; -} - -impl RepositoriesFactory for DataStore { - fn repositories(&self) -> Repositories { - self.into() - } +pub trait RouterRepositoryFactory { + fn router(&self) -> Repository; } -impl RepositoriesFactory for Repository { - fn repositories(&self) -> Repositories { - let store: DataStore = self.into(); - store.repositories() +impl RouterRepositoryFactory for Repositories { + fn router(&self) -> Repository { + Repository::new(self.store.scope(StoreKeys::router())) } } diff --git a/packages/ciphernode/sortition/Cargo.toml b/packages/ciphernode/sortition/Cargo.toml index 383e1955..d10354dc 100644 --- a/packages/ciphernode/sortition/Cargo.toml +++ b/packages/ciphernode/sortition/Cargo.toml @@ -13,6 +13,7 @@ actix = { workspace = true } alloy = { workspace = true, features = ["full"] } anyhow = { workspace = true } async-trait = { workspace = true } +config = { path = "../config" } data = { path = "../data" } enclave-core = { path = "../core" } num = { workspace = true } diff --git a/packages/ciphernode/sortition/src/lib.rs b/packages/ciphernode/sortition/src/lib.rs index 792f64fc..323eb64f 100644 --- a/packages/ciphernode/sortition/src/lib.rs +++ b/packages/ciphernode/sortition/src/lib.rs @@ -4,8 +4,10 @@ mod distance; mod index; +mod repositories; mod sortition; pub use distance::*; pub use index::*; +pub use repositories::*; pub use sortition::*; diff --git a/packages/ciphernode/sortition/src/repositories.rs b/packages/ciphernode/sortition/src/repositories.rs new file mode 100644 index 00000000..2b84c184 --- /dev/null +++ b/packages/ciphernode/sortition/src/repositories.rs @@ -0,0 +1,14 @@ +use config::StoreKeys; +use data::{Repositories, Repository}; + +use crate::SortitionModule; + +pub trait SortitionRepositoryFactory { + fn sortition(&self) -> Repository; +} + +impl SortitionRepositoryFactory for Repositories { + fn sortition(&self) -> Repository { + Repository::new(self.store.scope(StoreKeys::sortition())) + } +} diff --git a/packages/ciphernode/sortition/src/sortition.rs b/packages/ciphernode/sortition/src/sortition.rs index a41c9fa9..918522db 100644 --- a/packages/ciphernode/sortition/src/sortition.rs +++ b/packages/ciphernode/sortition/src/sortition.rs @@ -2,8 +2,8 @@ use crate::DistanceSortition; use actix::prelude::*; use alloy::primitives::Address; use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use data::{Checkpoint, FromSnapshotWithParams, Repository, Snapshot}; +use data::Repository; +use data::{AutoPersist, Persistable}; use enclave_core::{ get_tag, BusError, CiphernodeAdded, CiphernodeRemoved, EnclaveErrorType, EnclaveEvent, EventBus, Seed, Subscribe, @@ -87,23 +87,21 @@ impl SortitionList for SortitionModule { pub struct GetNodes; pub struct Sortition { - list: SortitionModule, + list: Persistable, bus: Addr, - store: Repository, } #[derive(Debug)] pub struct SortitionParams { - pub bus: Addr, - pub store: Repository, + bus: Addr, + list: Persistable, } impl Sortition { pub fn new(params: SortitionParams) -> Self { Self { - list: SortitionModule::new(), + list: params.list, bus: params.bus, - store: params.store, } } @@ -112,29 +110,18 @@ impl Sortition { bus: &Addr, store: Repository, ) -> Result> { - let addr = Sortition::load(SortitionParams { + let list = store.load_or_default(SortitionModule::default()).await?; + let addr = Sortition::new(SortitionParams { bus: bus.clone(), - store, + list, }) - .await? .start(); bus.do_send(Subscribe::new("CiphernodeAdded", addr.clone().into())); Ok(addr) } - #[instrument(name="sortition", skip_all, fields(id = get_tag()))] - pub async fn load(params: SortitionParams) -> Result { - Ok(if let Some(snapshot) = params.store.read().await? { - info!("Loading from snapshot"); - Self::from_snapshot(params, snapshot).await? - } else { - info!("Loading from params"); - Self::new(params) - }) - } - pub fn get_nodes(&self) -> Vec { - self.list.nodes.clone().into_iter().collect() + self.list.get().unwrap().nodes.clone().into_iter().collect() } } @@ -142,38 +129,6 @@ impl Actor for Sortition { type Context = actix::Context; } -impl Snapshot for Sortition { - type Snapshot = SortitionModule; - fn snapshot(&self) -> Self::Snapshot { - self.list.clone() - } -} - -#[async_trait] -impl FromSnapshotWithParams for Sortition { - type Params = SortitionParams; - - #[instrument(name="sortition", skip_all, fields(id = get_tag()))] - async fn from_snapshot(params: Self::Params, snapshot: Self::Snapshot) -> Result { - info!("Loaded snapshot with {} nodes", snapshot.nodes().len()); - info!( - "Nodes:\n\n{:?}\n", - snapshot.nodes().into_iter().collect::>() - ); - Ok(Sortition { - bus: params.bus, - store: params.store, - list: snapshot, - }) - } -} - -impl Checkpoint for Sortition { - fn repository(&self) -> &Repository { - &self.store - } -} - impl Handler for Sortition { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { @@ -191,8 +146,13 @@ impl Handler for Sortition { #[instrument(name="sortition", skip_all, fields(id = get_tag()))] fn handle(&mut self, msg: CiphernodeAdded, _ctx: &mut Self::Context) -> Self::Result { info!("Adding node: {}", msg.address); - self.list.add(msg.address); - self.checkpoint(); + match self.list.try_mutate(|mut list| { + list.add(msg.address); + Ok(list) + }) { + Err(err) => self.bus.err(EnclaveErrorType::Sortition, err), + _ => (), + }; } } @@ -202,8 +162,13 @@ impl Handler for Sortition { #[instrument(name="sortition", skip_all, fields(id = get_tag()))] fn handle(&mut self, msg: CiphernodeRemoved, _ctx: &mut Self::Context) -> Self::Result { info!("Removing node: {}", msg.address); - self.list.remove(msg.address); - self.checkpoint(); + match self.list.try_mutate(|mut list| { + list.remove(msg.address); + Ok(list) + }) { + Err(err) => self.bus.err(EnclaveErrorType::Sortition, err), + _ => (), + }; } } @@ -212,13 +177,12 @@ impl Handler for Sortition { #[instrument(name="sortition", skip_all, fields(id = get_tag()))] fn handle(&mut self, msg: GetHasNode, _ctx: &mut Self::Context) -> Self::Result { - match self.list.contains(msg.seed, msg.size, msg.address) { - Ok(val) => val, - Err(err) => { + self.list + .try_with(|list| list.contains(msg.seed, msg.size, msg.address)) + .unwrap_or_else(|err| { self.bus.err(EnclaveErrorType::Sortition, err); false - } - } + }) } } diff --git a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs index 4839877c..1c2b6918 100644 --- a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs +++ b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs @@ -1,4 +1,5 @@ use cipher::Cipher; +use data::RepositoriesFactory; use data::{DataStore, InMemStore}; use enclave_core::{ CiphernodeAdded, CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, @@ -11,12 +12,12 @@ use logger::SimpleLogger; use net::NetworkRelay; use router::{ CiphernodeSelector, E3RequestRouter, FheFeature, KeyshareFeature, PlaintextAggregatorFeature, - PublicKeyAggregatorFeature, RepositoriesFactory, + PublicKeyAggregatorFeature, }; -use sortition::Sortition; +use sortition::{Sortition, SortitionRepositoryFactory}; use actix::prelude::*; -use alloy::{primitives::Address, signers::k256::sha2::digest::Reset}; +use alloy::primitives::Address; use anyhow::*; use fhe_rs::{ bfv::{BfvParameters, Ciphertext, Encoding, Plaintext, PublicKey, SecretKey}, @@ -26,7 +27,7 @@ use fhe_traits::{FheEncoder, FheEncrypter, Serialize}; use rand::Rng; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; -use std::{env, path::Path, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use tokio::sync::Mutex; use tokio::{sync::mpsc::channel, time::sleep};