Skip to content

Commit

Permalink
Merge pull request #26 from SiaFoundation/subscription
Browse files Browse the repository at this point in the history
Overhaul subscription API
  • Loading branch information
n8maninger authored Mar 18, 2024
2 parents bf4acee + 7164d0c commit c73e571
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 225 deletions.
36 changes: 19 additions & 17 deletions chain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,25 +647,17 @@ func (db *DBStore) shouldFlush() bool {
return db.unflushed >= flushSizeThreshold || time.Since(db.lastFlush) >= flushDurationThreshold
}

func (db *DBStore) flush() {
if err := db.db.Flush(); err != nil {
panic(err)
}
db.unflushed = 0
db.lastFlush = time.Now()
}

// ApplyBlock implements Store.
func (db *DBStore) ApplyBlock(s consensus.State, cau consensus.ApplyUpdate, mustCommit bool) (committed bool) {
func (db *DBStore) ApplyBlock(s consensus.State, cau consensus.ApplyUpdate) {
db.applyState(s)
if s.Index.Height <= db.n.HardforkV2.RequireHeight {
db.applyElements(cau)
}
committed = mustCommit || db.shouldFlush()
if committed {
db.flush()
if db.shouldFlush() {
if err := db.Flush(); err != nil {
panic(err)
}
}
return
}

// RevertBlock implements Store.
Expand All @@ -675,12 +667,19 @@ func (db *DBStore) RevertBlock(s consensus.State, cru consensus.RevertUpdate) {
}
db.revertState(s)
if db.shouldFlush() {
db.flush()
if err := db.Flush(); err != nil {
panic(err)
}
}
}

// Close flushes any uncommitted data to the underlying DB.
func (db *DBStore) Close() error {
// Flush flushes any uncommitted data to the underlying DB.
func (db *DBStore) Flush() error {
if db.unflushed == 0 {
return nil
}
db.unflushed = 0
db.lastFlush = time.Now()
return db.db.Flush()
}

Expand Down Expand Up @@ -731,7 +730,10 @@ func NewDBStore(db DB, n *consensus.Network, genesisBlock types.Block) (_ *DBSto
cs, cau := consensus.ApplyBlock(genesisState, genesisBlock, bs, time.Time{})
dbs.putBlock(genesisBlock, &bs)
dbs.putState(cs)
dbs.ApplyBlock(cs, cau, true)
dbs.ApplyBlock(cs, cau)
if err := dbs.Flush(); err != nil {
return nil, consensus.State{}, err
}
} else if dbGenesis.ID != genesisBlock.ID() {
// try to detect network so we can provide a more helpful error message
_, mainnetGenesis := Mainnet()
Expand Down
164 changes: 73 additions & 91 deletions chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"go.sia.tech/core/consensus"
"go.sia.tech/core/types"
"lukechampine.com/frand"
)

var (
Expand All @@ -34,14 +35,6 @@ type RevertUpdate struct {
State consensus.State // post-reversion, i.e. pre-application
}

// A Subscriber processes updates to the blockchain. Implementations must not
// modify or retain the provided update object.
type Subscriber interface {
// Implementations MUST not commit updates to persistent storage unless mayCommit is set.
ProcessChainApplyUpdate(cau *ApplyUpdate, mayCommit bool) error
ProcessChainRevertUpdate(cru *RevertUpdate) error
}

// A Store durably commits Manager-related data to storage. I/O errors must be
// handled internally, e.g. by panicking or calling os.Exit.
type Store interface {
Expand All @@ -55,10 +48,10 @@ type Store interface {
AddState(cs consensus.State)
AncestorTimestamp(id types.BlockID) (time.Time, bool)

// Except when mustCommit is set, ApplyBlock and RevertBlock are free to
// commit whenever they see fit.
ApplyBlock(s consensus.State, cau consensus.ApplyUpdate, mustCommit bool) (committed bool)
// ApplyBlock and RevertBlock are free to commit whenever they see fit.
ApplyBlock(s consensus.State, cau consensus.ApplyUpdate)
RevertBlock(s consensus.State, cru consensus.RevertUpdate)
Flush() error
}

// blockAndParent returns the block with the specified ID, along with its parent
Expand All @@ -82,8 +75,7 @@ func blockAndChild(s Store, id types.BlockID) (types.Block, *consensus.V1BlockSu
type Manager struct {
store Store
tipState consensus.State
subscribers []Subscriber
lastCommit time.Time
onReorg map[[16]byte]func(types.ChainIndex)
invalidBlocks map[types.BlockID]error

txpool struct {
Expand Down Expand Up @@ -247,6 +239,13 @@ func (m *Manager) AddBlocks(blocks []types.Block) error {
}
return fmt.Errorf("reorg failed: %w", err)
}
// release lock while notifying listeners
tip := m.tipState.Index
m.mu.Unlock()
for _, fn := range m.onReorg {
fn(tip)
}
m.mu.Lock()
}
return nil
}
Expand All @@ -273,15 +272,7 @@ func (m *Manager) revertTip() error {
}
cru := consensus.RevertBlock(cs, b, *bs)
m.store.RevertBlock(cs, cru)

update := RevertUpdate{cru, b, cs}
for _, s := range m.subscribers {
if err := s.ProcessChainRevertUpdate(&update); err != nil {
return fmt.Errorf("subscriber %T: %w", s, err)
}
}

m.revertPoolUpdate(&update)
m.revertPoolUpdate(cru, cs)
m.tipState = cs
return nil
}
Expand Down Expand Up @@ -316,23 +307,8 @@ func (m *Manager) applyTip(index types.ChainIndex) error {
_, cau = consensus.ApplyBlock(m.tipState, b, *bs, ancestorTimestamp)
}

// force the store to commit if we're at the tip (or close to it), or at
// least every 2 seconds; this ensures that the amount of uncommitted data
// never grows too large
forceCommit := time.Since(b.Timestamp) < cs.BlockInterval()*2 || time.Since(m.lastCommit) > 2*time.Second
committed := m.store.ApplyBlock(cs, cau, forceCommit)
if committed {
m.lastCommit = time.Now()
}

update := &ApplyUpdate{cau, b, cs}
for _, s := range m.subscribers {
if err := s.ProcessChainApplyUpdate(update, committed); err != nil {
return fmt.Errorf("subscriber %T: %w", s, err)
}
}

m.applyPoolUpdate(update)
m.store.ApplyBlock(cs, cau)
m.applyPoolUpdate(cau, cs)
m.tipState = cs
return nil
}
Expand Down Expand Up @@ -403,6 +379,9 @@ func (m *Manager) reorgTo(index types.ChainIndex) error {
return fmt.Errorf("couldn't apply block %v: %w", index, err)
}
}
if err := m.store.Flush(); err != nil {
return err
}

// invalidate txpool caches
m.txpool.ms = nil
Expand All @@ -413,65 +392,68 @@ func (m *Manager) reorgTo(index types.ChainIndex) error {
m.txpool.lastReverted = b.Transactions
m.txpool.lastRevertedV2 = b.V2Transactions()
}

return nil
}

// AddSubscriber subscribes s to m, ensuring that it will receive updates when
// the best chain changes. If tip does not match the Manager's current tip, s is
// updated accordingly.
func (m *Manager) AddSubscriber(s Subscriber, tip types.ChainIndex) error {
// UpdatesSince returns at most max updates on the path between index and the
// Manager's current tip.
func (m *Manager) UpdatesSince(index types.ChainIndex, max int) (rus []RevertUpdate, aus []ApplyUpdate, err error) {
m.mu.Lock()
defer m.mu.Unlock()

// reorg s to the current tip, if necessary
revert, apply, err := m.reorgPath(tip, m.tipState.Index)
if err != nil {
return fmt.Errorf("couldn't determine reorg path from %v to %v: %w", tip, m.tipState.Index, err)
}
for _, index := range revert {
b, bs, cs, ok := blockAndParent(m.store, index.ID)
if !ok {
return fmt.Errorf("missing reverted block at index %v", index)
} else if bs == nil {
panic("missing supplement for reverted block")
}
cru := consensus.RevertBlock(cs, b, *bs)
if err := s.ProcessChainRevertUpdate(&RevertUpdate{cru, b, cs}); err != nil {
return fmt.Errorf("couldn't process revert update: %w", err)
}
onBestChain := func(index types.ChainIndex) bool {
bi, _ := m.store.BestIndex(index.Height)
return bi.ID == index.ID || index == types.ChainIndex{}
}
for _, index := range apply {
b, bs, cs, ok := blockAndParent(m.store, index.ID)
if !ok {
return fmt.Errorf("missing applied block at index %v", index)
} else if bs == nil {
panic("missing supplement for applied block")
}
ancestorTimestamp, ok := m.store.AncestorTimestamp(b.ParentID)
if !ok && index.Height != 0 {
return fmt.Errorf("missing ancestor timestamp for block %v", b.ParentID)
}
cs, cau := consensus.ApplyBlock(cs, b, *bs, ancestorTimestamp)
// TODO: commit every minute for large len(apply)?
shouldCommit := index == m.tipState.Index
if err := s.ProcessChainApplyUpdate(&ApplyUpdate{cau, b, cs}, shouldCommit); err != nil {
return fmt.Errorf("couldn't process apply update: %w", err)

for index != m.tipState.Index && len(rus)+len(aus) <= max {
// revert until we are on the best chain, then apply
if !onBestChain(index) {
b, bs, cs, ok := blockAndParent(m.store, index.ID)
if !ok {
return nil, nil, fmt.Errorf("missing block at index %v", index)
} else if bs == nil {
return nil, nil, fmt.Errorf("missing supplement for block %v", index)
}
cru := consensus.RevertBlock(cs, b, *bs)
rus = append(rus, RevertUpdate{cru, b, cs})
index = cs.Index
} else {
// special case: if index is uninitialized, we're starting from genesis
if index == (types.ChainIndex{}) {
index, _ = m.store.BestIndex(0)
} else {
index, _ = m.store.BestIndex(index.Height + 1)
}
b, bs, cs, ok := blockAndParent(m.store, index.ID)
if !ok {
return nil, nil, fmt.Errorf("missing block at index %v", index)
} else if bs == nil {
return nil, nil, fmt.Errorf("missing supplement for block %v", index)
}
ancestorTimestamp, ok := m.store.AncestorTimestamp(b.ParentID)
if !ok && index.Height != 0 {
return nil, nil, fmt.Errorf("missing ancestor timestamp for block %v", b.ParentID)
}
cs, cau := consensus.ApplyBlock(cs, b, *bs, ancestorTimestamp)
aus = append(aus, ApplyUpdate{cau, b, cs})
}
}
m.subscribers = append(m.subscribers, s)
return nil
return
}

// RemoveSubscriber unsubscribes s from m.
func (m *Manager) RemoveSubscriber(s Subscriber) {
// OnReorg adds fn to the set of functions that are called whenever the best
// chain changes. It returns a function that removes fn from the set.
//
// The supplied function must not block or call any Manager methods.
func (m *Manager) OnReorg(fn func(types.ChainIndex)) (cancel func()) {
m.mu.Lock()
defer m.mu.Unlock()
for i := range m.subscribers {
if m.subscribers[i] == s {
m.subscribers = append(m.subscribers[:i], m.subscribers[i+1:]...)
return
}
key := frand.Entropy128()
m.onReorg[key] = fn
return func() {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.onReorg, key)
}
}

Expand Down Expand Up @@ -654,7 +636,7 @@ func updateTxnProofs(txn *types.V2Transaction, updateElementProof func(*types.St
return
}

func (m *Manager) revertPoolUpdate(cru *RevertUpdate) {
func (m *Manager) revertPoolUpdate(cru consensus.RevertUpdate, cs consensus.State) {
// restore ephemeral elements, if necessary
var uncreated map[types.Hash256]bool
replaceEphemeral := func(e *types.StateElement) {
Expand Down Expand Up @@ -704,14 +686,14 @@ func (m *Manager) revertPoolUpdate(cru *RevertUpdate) {

rem := m.txpool.v2txns[:0]
for _, txn := range m.txpool.v2txns {
if updateTxnProofs(&txn, cru.UpdateElementProof, cru.State.Elements.NumLeaves) {
if updateTxnProofs(&txn, cru.UpdateElementProof, cs.Elements.NumLeaves) {
rem = append(rem, txn)
}
}
m.txpool.v2txns = rem
}

func (m *Manager) applyPoolUpdate(cau *ApplyUpdate) {
func (m *Manager) applyPoolUpdate(cau consensus.ApplyUpdate, cs consensus.State) {
// replace ephemeral elements, if necessary
var newElements map[types.Hash256]types.StateElement
replaceEphemeral := func(e *types.StateElement) {
Expand Down Expand Up @@ -759,7 +741,7 @@ func (m *Manager) applyPoolUpdate(cau *ApplyUpdate) {

rem := m.txpool.v2txns[:0]
for _, txn := range m.txpool.v2txns {
if updateTxnProofs(&txn, cau.UpdateElementProof, cau.State.Elements.NumLeaves) {
if updateTxnProofs(&txn, cau.UpdateElementProof, cs.Elements.NumLeaves) {
rem = append(rem, txn)
}
}
Expand Down Expand Up @@ -1113,7 +1095,7 @@ func NewManager(store Store, cs consensus.State) *Manager {
m := &Manager{
store: store,
tipState: cs,
lastCommit: time.Now(),
onReorg: make(map[[16]byte]func(types.ChainIndex)),
invalidBlocks: make(map[types.BlockID]error),
}
m.txpool.indices = make(map[types.TransactionID]int)
Expand Down
Loading

0 comments on commit c73e571

Please sign in to comment.