Skip to content

Commit

Permalink
persistence: add transactional model
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-orlovsky committed Jul 5, 2024
1 parent 5b9d218 commit f101040
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 30 deletions.
21 changes: 20 additions & 1 deletion src/persistence/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use rgb::{
};

use crate::containers::{BundledWitness, Consignment, ToWitnessId};
use crate::persistence::StoreTransaction;
use crate::SecretSeal;

#[derive(Clone, Eq, PartialEq, Debug, Display, Error, From)]
Expand Down Expand Up @@ -330,6 +331,24 @@ impl<P: IndexProvider> Index<P> {
}
}

impl<P: IndexProvider> StoreTransaction for Index<P> {
type TransactionErr = IndexError<P>;

fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> {
self.provider
.begin_transaction()
.map_err(IndexError::WriteProvider)
}

fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> {
self.provider
.commit_transaction()
.map_err(IndexError::WriteProvider)
}

fn rollback_transaction(&mut self) { self.provider.rollback_transaction() }
}

pub trait IndexProvider: Debug + IndexReadProvider + IndexWriteProvider {}

pub trait IndexReadProvider {
Expand Down Expand Up @@ -364,7 +383,7 @@ pub trait IndexReadProvider {
) -> Result<(XWitnessId, ContractId), IndexReadError<Self::Error>>;
}

pub trait IndexWriteProvider {
pub trait IndexWriteProvider: StoreTransaction<TransactionErr = Self::Error> {
type Error: Clone + Eq + Error;

fn register_contract(&mut self, contract_id: ContractId) -> Result<bool, Self::Error>;
Expand Down
54 changes: 53 additions & 1 deletion src/persistence/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use super::{
ContractIfaceError, IndexInconsistency, IndexProvider, IndexReadError, IndexReadProvider,
IndexWriteError, IndexWriteProvider, SchemaIfaces, StashInconsistency, StashProvider,
StashProviderError, StashReadProvider, StashWriteProvider, StateProvider, StateReadProvider,
StateUpdateError, StateWriteProvider,
StateUpdateError, StateWriteProvider, StoreTransaction,
};
use crate::containers::{
AnchorSet, ContentId, ContentRef, ContentSigs, SealWitness, SigBlob, Supplement, TrustLevel,
Expand All @@ -60,6 +60,8 @@ use crate::LIB_NAME_RGB_STD;
#[derive(StrictType, StrictEncode, StrictDecode)]
#[strict_type(lib = LIB_NAME_RGB_STD)]
pub struct MemStash {
dirty: bool,

schemata: TinyOrdMap<SchemaId, SchemaIfaces>,
ifaces: TinyOrdMap<IfaceId, Iface>,
geneses: TinyOrdMap<ContractId, Genesis>,
Expand All @@ -82,6 +84,22 @@ impl MemStash {
pub fn new() -> Self { MemStash::default() }
}

impl StoreTransaction for MemStash {
type TransactionErr = confinement::Error;

fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> {
self.dirty = true;
Ok(())
}

fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> {
// We do not do anything here since we do not actually save anything
Ok(())
}

fn rollback_transaction(&mut self) { unreachable!() }
}

impl StashProvider for MemStash {}

impl StashReadProvider for MemStash {
Expand Down Expand Up @@ -404,6 +422,7 @@ impl From<confinement::Error> for StateUpdateError<confinement::Error> {
#[derive(StrictType, StrictEncode, StrictDecode)]
#[strict_type(lib = LIB_NAME_RGB_STD)]
pub struct MemState {
dirty: bool,
history: TinyOrdMap<ContractId, ContractHistory>,
}

Expand All @@ -414,6 +433,22 @@ impl MemState {
pub fn new() -> Self { MemState::default() }
}

impl StoreTransaction for MemState {
type TransactionErr = confinement::Error;

fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> {
self.dirty = true;
Ok(())
}

fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> {
// We do not do anything here since we do not actually save anything
Ok(())
}

fn rollback_transaction(&mut self) { unreachable!() }
}

impl StateProvider for MemState {}

impl StateReadProvider for MemState {
Expand Down Expand Up @@ -486,6 +521,7 @@ pub struct ContractIndex {
#[derive(StrictType, StrictEncode, StrictDecode)]
#[strict_type(lib = LIB_NAME_RGB_STD)]
pub struct MemIndex {
dirty: bool,
op_bundle_index: MediumOrdMap<OpId, BundleId>,
bundle_contract_index: MediumOrdMap<BundleId, ContractId>,
bundle_witness_index: MediumOrdMap<BundleId, XWitnessId>,
Expand All @@ -500,6 +536,22 @@ impl MemIndex {
pub fn new() -> Self { MemIndex::default() }
}

impl StoreTransaction for MemIndex {
type TransactionErr = confinement::Error;

fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> {
self.dirty = true;
Ok(())
}

fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> {
// We do not do anything here since we do not actually save anything
Ok(())
}

fn rollback_transaction(&mut self) { unreachable!() }
}

impl IndexProvider for MemIndex {}

impl IndexReadProvider for MemIndex {
Expand Down
10 changes: 10 additions & 0 deletions src/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,13 @@ pub use stock::{
ComposeError, ConsignError, ContractIfaceError, FasciaError, InputError as StockInputError,
Stock, StockError, StockErrorAll, StockErrorMem,
};

pub trait StoreTransaction {
type TransactionErr: std::error::Error;

fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr>;

fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr>;

fn rollback_transaction(&mut self);
}
22 changes: 20 additions & 2 deletions src/persistence/stash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::containers::{
use crate::interface::{
ContractBuilder, Iface, IfaceClass, IfaceId, IfaceImpl, IfaceRef, TransitionBuilder,
};
use crate::persistence::ContractIfaceError;
use crate::persistence::{ContractIfaceError, StoreTransaction};
use crate::{MergeReveal, MergeRevealError, SecretSeal, LIB_NAME_RGB_STD};

#[derive(Clone, Eq, PartialEq, Debug, Display, Error, From)]
Expand Down Expand Up @@ -595,6 +595,24 @@ impl<P: StashProvider> Stash<P> {
}
}

impl<P: StashProvider> StoreTransaction for Stash<P> {
type TransactionErr = StashError<P>;

fn begin_transaction(&mut self) -> Result<(), Self::TransactionErr> {
self.provider
.begin_transaction()
.map_err(StashError::WriteProvider)
}

fn commit_transaction(&mut self) -> Result<(), Self::TransactionErr> {
self.provider
.commit_transaction()
.map_err(StashError::WriteProvider)
}

fn rollback_transaction(&mut self) { self.provider.rollback_transaction() }
}

pub trait StashProvider: Debug + StashReadProvider + StashWriteProvider {}

pub trait StashReadProvider {
Expand Down Expand Up @@ -650,7 +668,7 @@ pub trait StashReadProvider {
fn secret_seals(&self) -> Result<impl Iterator<Item = XChain<GraphSeal>>, Self::Error>;
}

pub trait StashWriteProvider {
pub trait StashWriteProvider: StoreTransaction<TransactionErr = Self::Error> {
type Error: Clone + Eq + Error;

fn replace_schema(&mut self, schema: Schema) -> Result<bool, Self::Error>;
Expand Down
3 changes: 2 additions & 1 deletion src/persistence/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use invoice::Amount;
use rgb::{AssetTag, BlindingFactor, ContractHistory, ContractId, DataState};

use crate::interface::AttachedState;
use crate::persistence::StoreTransaction;
use crate::resolvers::ResolveHeight;

#[derive(Clone, PartialEq, Eq, Debug, Display, Error)]
Expand Down Expand Up @@ -73,7 +74,7 @@ pub trait StateReadProvider {
) -> Result<Option<&ContractHistory>, Self::Error>;
}

pub trait StateWriteProvider {
pub trait StateWriteProvider: StoreTransaction<TransactionErr = Self::Error> {
type Error: Clone + Eq + Error;

fn create_or_update_state<R: ResolveHeight>(
Expand Down
86 changes: 61 additions & 25 deletions src/persistence/stock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use super::{
Index, IndexError, IndexInconsistency, IndexProvider, IndexReadProvider, IndexWriteProvider,
MemIndex, MemStash, MemState, PersistedState, SchemaIfaces, Stash, StashDataError, StashError,
StashInconsistency, StashProvider, StashReadProvider, StashWriteProvider, StateProvider,
StateReadProvider, StateUpdateError, StateWriteProvider,
StateReadProvider, StateUpdateError, StateWriteProvider, StoreTransaction,
};
use crate::containers::{
AnchorSet, AnchoredBundles, Batch, BuilderSeal, BundledWitness, Consignment, ContainerVer,
Expand Down Expand Up @@ -1057,9 +1057,42 @@ impl<S: StashProvider, H: StateProvider, P: IndexProvider> Stock<S, H, P> {
Ok(Batch { main, blanks })
}

fn store_transaction<E: Error>(
&mut self,
f: impl FnOnce(&mut Stash<S>, &mut H, &mut Index<P>) -> Result<(), StockError<S, H, P, E>>,
) -> Result<(), StockError<S, H, P, E>> {
self.state
.begin_transaction()
.map_err(StockError::StateWrite)?;
self.stash
.begin_transaction()
.inspect_err(|_| self.stash.rollback_transaction())?;
self.index.begin_transaction().inspect_err(|_| {
self.state.rollback_transaction();
self.stash.rollback_transaction();
})?;
f(&mut self.stash, &mut self.state, &mut self.index)?;
self.index
.commit_transaction()
.map_err(StockError::from)
.and_then(|_| {
self.state
.commit_transaction()
.map_err(StockError::StateWrite)
})
.and_then(|_| self.stash.commit_transaction().map_err(StockError::from))
.inspect_err(|_| {
self.state.rollback_transaction();
self.stash.rollback_transaction();
self.index.rollback_transaction();
})
}

pub fn import_kit(&mut self, kit: ValidKit) -> Result<validation::Status, StockError<S, H, P>> {
let (kit, status) = kit.split();
self.stash.begin_transaction()?;
self.stash.consume_kit(kit)?;
self.stash.commit_transaction()?;
Ok(status)
}

Expand Down Expand Up @@ -1088,12 +1121,14 @@ impl<S: StashProvider, H: StateProvider, P: IndexProvider> Stock<S, H, P> {
let (mut consignment, status) = consignment.split();

consignment = self.stash.resolve_secrets(consignment)?;
self.state
.create_or_update_state::<R>(contract_id, |history| {
self.store_transaction(move |stash, state, index| {
state.create_or_update_state::<R>(contract_id, |history| {
consignment.update_history(history, resolver)
})?;
self.index.index_consignment(&consignment)?;
self.stash.consume_consignment(consignment)?;
index.index_consignment(&consignment)?;
stash.consume_consignment(consignment)?;
Ok(())
})?;

Ok(status)
}
Expand All @@ -1110,35 +1145,36 @@ impl<S: StashProvider, H: StateProvider, P: IndexProvider> Stock<S, H, P> {
&mut self,
fascia: Fascia,
) -> Result<(), StockError<S, H, P, FasciaError>> {
let witness_id = fascia.witness_id();
self.stash
.consume_witness(SealWitness::new(fascia.witness.clone(), fascia.anchor.clone()))?;

for (contract_id, bundle) in fascia.into_bundles() {
let ids1 = bundle
.known_transitions
.keys()
.copied()
.collect::<BTreeSet<_>>();
let ids2 = bundle.input_map.values().copied().collect::<BTreeSet<_>>();
if !ids1.is_subset(&ids2) {
return Err(FasciaError::InvalidBundle(contract_id, bundle.bundle_id()).into());
}
self.store_transaction(move |stash, state, index| {
let witness_id = fascia.witness_id();
stash
.consume_witness(SealWitness::new(fascia.witness.clone(), fascia.anchor.clone()))?;

for (contract_id, bundle) in fascia.into_bundles() {
let ids1 = bundle
.known_transitions
.keys()
.copied()
.collect::<BTreeSet<_>>();
let ids2 = bundle.input_map.values().copied().collect::<BTreeSet<_>>();
if !ids1.is_subset(&ids2) {
return Err(FasciaError::InvalidBundle(contract_id, bundle.bundle_id()).into());
}

self.index.index_bundle(contract_id, &bundle, witness_id)?;
index.index_bundle(contract_id, &bundle, witness_id)?;

self.state
.update_state::<DumbResolver>(contract_id, |history| {
state.update_state::<DumbResolver>(contract_id, |history| {
for transition in bundle.known_transitions.values() {
let witness_anchor = WitnessAnchor::from_mempool(witness_id);
history.add_transition(transition, witness_anchor);
}
Ok(())
})?;

self.stash.consume_bundle(bundle)?;
}
Ok(())
stash.consume_bundle(bundle)?;
}
Ok(())
})
}

fn transition(&self, opid: OpId) -> Result<&Transition, StockError<S, H, P, ConsignError>> {
Expand Down

0 comments on commit f101040

Please sign in to comment.