From f10104003e83196b5a8ba5a88e1ce5b05e43f219 Mon Sep 17 00:00:00 2001 From: Dr Maxim Orlovsky Date: Fri, 5 Jul 2024 17:50:13 +0200 Subject: [PATCH] persistence: add transactional model --- src/persistence/index.rs | 21 +++++++++- src/persistence/memory.rs | 54 +++++++++++++++++++++++- src/persistence/mod.rs | 10 +++++ src/persistence/stash.rs | 22 +++++++++- src/persistence/state.rs | 3 +- src/persistence/stock.rs | 86 +++++++++++++++++++++++++++------------ 6 files changed, 166 insertions(+), 30 deletions(-) diff --git a/src/persistence/index.rs b/src/persistence/index.rs index 187cb10b..fc49f0e1 100644 --- a/src/persistence/index.rs +++ b/src/persistence/index.rs @@ -30,6 +30,7 @@ use rgb::{ }; use crate::containers::{BundledWitness, Consignment, ToWitnessId}; +use crate::persistence::StoreTransaction; use crate::SecretSeal; #[derive(Clone, Eq, PartialEq, Debug, Display, Error, From)] @@ -330,6 +331,24 @@ impl Index

{ } } +impl StoreTransaction for Index

{ + type TransactionErr = IndexError

; + + fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> { + self.provider + .begin_transaction() + .map_err(IndexError::WriteProvider) + } + + fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> { + self.provider + .commit_transaction() + .map_err(IndexError::WriteProvider) + } + + fn rollback_transaction(&mut self) { self.provider.rollback_transaction() } +} + pub trait IndexProvider: Debug + IndexReadProvider + IndexWriteProvider {} pub trait IndexReadProvider { @@ -364,7 +383,7 @@ pub trait IndexReadProvider { ) -> Result<(XWitnessId, ContractId), IndexReadError>; } -pub trait IndexWriteProvider { +pub trait IndexWriteProvider: StoreTransaction { type Error: Clone + Eq + Error; fn register_contract(&mut self, contract_id: ContractId) -> Result; diff --git a/src/persistence/memory.rs b/src/persistence/memory.rs index 2a2e9005..0ef64bb6 100644 --- a/src/persistence/memory.rs +++ b/src/persistence/memory.rs @@ -41,7 +41,7 @@ use super::{ ContractIfaceError, IndexInconsistency, IndexProvider, IndexReadError, IndexReadProvider, IndexWriteError, IndexWriteProvider, SchemaIfaces, StashInconsistency, StashProvider, StashProviderError, StashReadProvider, StashWriteProvider, StateProvider, StateReadProvider, - StateUpdateError, StateWriteProvider, + StateUpdateError, StateWriteProvider, StoreTransaction, }; use crate::containers::{ AnchorSet, ContentId, ContentRef, ContentSigs, SealWitness, SigBlob, Supplement, TrustLevel, @@ -60,6 +60,8 @@ use crate::LIB_NAME_RGB_STD; #[derive(StrictType, StrictEncode, StrictDecode)] #[strict_type(lib = LIB_NAME_RGB_STD)] pub struct MemStash { + dirty: bool, + schemata: TinyOrdMap, ifaces: TinyOrdMap, geneses: TinyOrdMap, @@ -82,6 +84,22 @@ impl MemStash { pub fn new() -> Self { MemStash::default() } } +impl StoreTransaction for MemStash { + type TransactionErr = confinement::Error; + + fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> { + self.dirty = true; + Ok(()) + } + + fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> { + // We do not do anything here since we do not actually save anything + Ok(()) + } + + fn rollback_transaction(&mut self) { unreachable!() } +} + impl StashProvider for MemStash {} impl StashReadProvider for MemStash { @@ -404,6 +422,7 @@ impl From for StateUpdateError { #[derive(StrictType, StrictEncode, StrictDecode)] #[strict_type(lib = LIB_NAME_RGB_STD)] pub struct MemState { + dirty: bool, history: TinyOrdMap, } @@ -414,6 +433,22 @@ impl MemState { pub fn new() -> Self { MemState::default() } } +impl StoreTransaction for MemState { + type TransactionErr = confinement::Error; + + fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> { + self.dirty = true; + Ok(()) + } + + fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> { + // We do not do anything here since we do not actually save anything + Ok(()) + } + + fn rollback_transaction(&mut self) { unreachable!() } +} + impl StateProvider for MemState {} impl StateReadProvider for MemState { @@ -486,6 +521,7 @@ pub struct ContractIndex { #[derive(StrictType, StrictEncode, StrictDecode)] #[strict_type(lib = LIB_NAME_RGB_STD)] pub struct MemIndex { + dirty: bool, op_bundle_index: MediumOrdMap, bundle_contract_index: MediumOrdMap, bundle_witness_index: MediumOrdMap, @@ -500,6 +536,22 @@ impl MemIndex { pub fn new() -> Self { MemIndex::default() } } +impl StoreTransaction for MemIndex { + type TransactionErr = confinement::Error; + + fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> { + self.dirty = true; + Ok(()) + } + + fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> { + // We do not do anything here since we do not actually save anything + Ok(()) + } + + fn rollback_transaction(&mut self) { unreachable!() } +} + impl IndexProvider for MemIndex {} impl IndexReadProvider for MemIndex { diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index c56241f7..3c1421fd 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -55,3 +55,13 @@ pub use stock::{ ComposeError, ConsignError, ContractIfaceError, FasciaError, InputError as StockInputError, Stock, StockError, StockErrorAll, StockErrorMem, }; + +pub trait StoreTransaction { + type TransactionErr: std::error::Error; + + fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr>; + + fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr>; + + fn rollback_transaction(&mut self); +} diff --git a/src/persistence/stash.rs b/src/persistence/stash.rs index 45becb2a..23a97cbf 100644 --- a/src/persistence/stash.rs +++ b/src/persistence/stash.rs @@ -45,7 +45,7 @@ use crate::containers::{ use crate::interface::{ ContractBuilder, Iface, IfaceClass, IfaceId, IfaceImpl, IfaceRef, TransitionBuilder, }; -use crate::persistence::ContractIfaceError; +use crate::persistence::{ContractIfaceError, StoreTransaction}; use crate::{MergeReveal, MergeRevealError, SecretSeal, LIB_NAME_RGB_STD}; #[derive(Clone, Eq, PartialEq, Debug, Display, Error, From)] @@ -595,6 +595,24 @@ impl Stash

{ } } +impl StoreTransaction for Stash

{ + type TransactionErr = StashError

; + + fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> { + self.provider + .begin_transaction() + .map_err(StashError::WriteProvider) + } + + fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> { + self.provider + .commit_transaction() + .map_err(StashError::WriteProvider) + } + + fn rollback_transaction(&mut self) { self.provider.rollback_transaction() } +} + pub trait StashProvider: Debug + StashReadProvider + StashWriteProvider {} pub trait StashReadProvider { @@ -650,7 +668,7 @@ pub trait StashReadProvider { fn secret_seals(&self) -> Result>, Self::Error>; } -pub trait StashWriteProvider { +pub trait StashWriteProvider: StoreTransaction { type Error: Clone + Eq + Error; fn replace_schema(&mut self, schema: Schema) -> Result; diff --git a/src/persistence/state.rs b/src/persistence/state.rs index f791ab02..95a94849 100644 --- a/src/persistence/state.rs +++ b/src/persistence/state.rs @@ -26,6 +26,7 @@ use invoice::Amount; use rgb::{AssetTag, BlindingFactor, ContractHistory, ContractId, DataState}; use crate::interface::AttachedState; +use crate::persistence::StoreTransaction; use crate::resolvers::ResolveHeight; #[derive(Clone, PartialEq, Eq, Debug, Display, Error)] @@ -73,7 +74,7 @@ pub trait StateReadProvider { ) -> Result, Self::Error>; } -pub trait StateWriteProvider { +pub trait StateWriteProvider: StoreTransaction { type Error: Clone + Eq + Error; fn create_or_update_state( diff --git a/src/persistence/stock.rs b/src/persistence/stock.rs index ff479cfc..71a5d8e8 100644 --- a/src/persistence/stock.rs +++ b/src/persistence/stock.rs @@ -44,7 +44,7 @@ use super::{ Index, IndexError, IndexInconsistency, IndexProvider, IndexReadProvider, IndexWriteProvider, MemIndex, MemStash, MemState, PersistedState, SchemaIfaces, Stash, StashDataError, StashError, StashInconsistency, StashProvider, StashReadProvider, StashWriteProvider, StateProvider, - StateReadProvider, StateUpdateError, StateWriteProvider, + StateReadProvider, StateUpdateError, StateWriteProvider, StoreTransaction, }; use crate::containers::{ AnchorSet, AnchoredBundles, Batch, BuilderSeal, BundledWitness, Consignment, ContainerVer, @@ -1057,9 +1057,42 @@ impl Stock { Ok(Batch { main, blanks }) } + fn store_transaction( + &mut self, + f: impl FnOnce(&mut Stash, &mut H, &mut Index

) -> Result<(), StockError>, + ) -> Result<(), StockError> { + self.state + .begin_transaction() + .map_err(StockError::StateWrite)?; + self.stash + .begin_transaction() + .inspect_err(|_| self.stash.rollback_transaction())?; + self.index.begin_transaction().inspect_err(|_| { + self.state.rollback_transaction(); + self.stash.rollback_transaction(); + })?; + f(&mut self.stash, &mut self.state, &mut self.index)?; + self.index + .commit_transaction() + .map_err(StockError::from) + .and_then(|_| { + self.state + .commit_transaction() + .map_err(StockError::StateWrite) + }) + .and_then(|_| self.stash.commit_transaction().map_err(StockError::from)) + .inspect_err(|_| { + self.state.rollback_transaction(); + self.stash.rollback_transaction(); + self.index.rollback_transaction(); + }) + } + pub fn import_kit(&mut self, kit: ValidKit) -> Result> { let (kit, status) = kit.split(); + self.stash.begin_transaction()?; self.stash.consume_kit(kit)?; + self.stash.commit_transaction()?; Ok(status) } @@ -1088,12 +1121,14 @@ impl Stock { let (mut consignment, status) = consignment.split(); consignment = self.stash.resolve_secrets(consignment)?; - self.state - .create_or_update_state::(contract_id, |history| { + self.store_transaction(move |stash, state, index| { + state.create_or_update_state::(contract_id, |history| { consignment.update_history(history, resolver) })?; - self.index.index_consignment(&consignment)?; - self.stash.consume_consignment(consignment)?; + index.index_consignment(&consignment)?; + stash.consume_consignment(consignment)?; + Ok(()) + })?; Ok(status) } @@ -1110,25 +1145,25 @@ impl Stock { &mut self, fascia: Fascia, ) -> Result<(), StockError> { - let witness_id = fascia.witness_id(); - self.stash - .consume_witness(SealWitness::new(fascia.witness.clone(), fascia.anchor.clone()))?; - - for (contract_id, bundle) in fascia.into_bundles() { - let ids1 = bundle - .known_transitions - .keys() - .copied() - .collect::>(); - let ids2 = bundle.input_map.values().copied().collect::>(); - if !ids1.is_subset(&ids2) { - return Err(FasciaError::InvalidBundle(contract_id, bundle.bundle_id()).into()); - } + self.store_transaction(move |stash, state, index| { + let witness_id = fascia.witness_id(); + stash + .consume_witness(SealWitness::new(fascia.witness.clone(), fascia.anchor.clone()))?; + + for (contract_id, bundle) in fascia.into_bundles() { + let ids1 = bundle + .known_transitions + .keys() + .copied() + .collect::>(); + let ids2 = bundle.input_map.values().copied().collect::>(); + if !ids1.is_subset(&ids2) { + return Err(FasciaError::InvalidBundle(contract_id, bundle.bundle_id()).into()); + } - self.index.index_bundle(contract_id, &bundle, witness_id)?; + index.index_bundle(contract_id, &bundle, witness_id)?; - self.state - .update_state::(contract_id, |history| { + state.update_state::(contract_id, |history| { for transition in bundle.known_transitions.values() { let witness_anchor = WitnessAnchor::from_mempool(witness_id); history.add_transition(transition, witness_anchor); @@ -1136,9 +1171,10 @@ impl Stock { Ok(()) })?; - self.stash.consume_bundle(bundle)?; - } - Ok(()) + stash.consume_bundle(bundle)?; + } + Ok(()) + }) } fn transition(&self, opid: OpId) -> Result<&Transition, StockError> {