Skip to content

Commit

Permalink
Merge pull request #19 from SiaFoundation/update-chain-subscriber
Browse files Browse the repository at this point in the history
Update chain subscriber
  • Loading branch information
chris124567 authored Apr 3, 2024
2 parents 8215046 + 46911f3 commit 86ef7c9
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 115 deletions.
5 changes: 3 additions & 2 deletions api/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -31,7 +32,7 @@ type (
Syncer interface {
Addr() string
Peers() []*syncer.Peer
Connect(addr string) (*syncer.Peer, error)
Connect(ctx context.Context, addr string) (*syncer.Peer, error)
BroadcastHeader(bh gateway.BlockHeader)
BroadcastTransactionSet(txns []types.Transaction)
BroadcastV2TransactionSet(index types.ChainIndex, txns []types.V2Transaction)
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *server) syncerConnectHandler(jc jape.Context) {
if jc.Decode(&addr) != nil {
return
}
_, err := s.s.Connect(addr)
_, err := s.s.Connect(context.Background(), addr)
jc.Check("couldn't connect to peer", err)
}

Expand Down
21 changes: 5 additions & 16 deletions cmd/explored/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package main
import (
"context"
"errors"
"io/fs"
"net"
"os"
"path/filepath"
"strconv"
"time"
Expand Down Expand Up @@ -162,24 +160,15 @@ func newNode(addr, dir string, chainNetwork string, useUPNP bool, logger *zap.Lo
return nil, err
}

tip, err := store.Tip()
if errors.Is(err, sqlite.ErrNoTip) {
tip = types.ChainIndex{
ID: genesisBlock.ID(),
Height: 0,
}
} else if err != nil {
return nil, err
genesisIndex := types.ChainIndex{
ID: genesisBlock.ID(),
Height: 0,
}
cm.AddSubscriber(store, tip)

hashPath := filepath.Join(dir, "./hash")
if err := os.MkdirAll(hashPath, fs.ModePerm); err != nil {
e, err := explorer.NewExplorer(cm, store, genesisIndex, logger.Named("explorer"))
if err != nil {
return nil, err
}

e := explorer.NewExplorer(store)

l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
Expand Down
87 changes: 83 additions & 4 deletions explorer/explorer.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
package explorer

import (
"errors"
"fmt"
"sync"

"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.uber.org/zap"
)

var (
// ErrNoTip is returned when Tip() is unable to find any blocks in the
// database and thus there is no tip. It does not mean there was an
// error in the underlying database.
ErrNoTip = errors.New("no tip found")
)

// A ChainManager manages the consensus state
type ChainManager interface {
Tip() types.ChainIndex
BestIndex(height uint64) (types.ChainIndex, bool)

OnReorg(fn func(types.ChainIndex)) (cancel func())
UpdatesSince(index types.ChainIndex, max int) (rus []chain.RevertUpdate, aus []chain.ApplyUpdate, err error)
}

// A Store is a database that stores information about elements, contracts,
// and blocks.
type Store interface {
chain.Subscriber
ProcessChainUpdates(crus []chain.RevertUpdate, caus []chain.ApplyUpdate) error

Tip() (types.ChainIndex, error)
Block(id types.BlockID) (Block, error)
Expand All @@ -24,12 +45,70 @@ type Store interface {

// Explorer implements a Sia explorer.
type Explorer struct {
s Store
s Store
mu sync.Mutex

unsubscribe func()
}

func syncStore(store Store, cm ChainManager, index types.ChainIndex) error {
for index != cm.Tip() {
crus, caus, err := cm.UpdatesSince(index, 1000)
if err != nil {
return fmt.Errorf("failed to subscribe to chain manager: %w", err)
}

if err := store.ProcessChainUpdates(crus, caus); err != nil {
return fmt.Errorf("failed to process updates: %w", err)
}
if len(crus) > 0 {
index = crus[len(crus)-1].State.Index
}
if len(caus) > 0 {
index = caus[len(caus)-1].State.Index
}
}
return nil
}

// NewExplorer returns a Sia explorer.
func NewExplorer(s Store) *Explorer {
return &Explorer{s: s}
func NewExplorer(cm ChainManager, store Store, genesisIndex types.ChainIndex, log *zap.Logger) (*Explorer, error) {
e := &Explorer{s: store}

tip, err := store.Tip()
if errors.Is(err, ErrNoTip) {
tip = genesisIndex
} else if err != nil {
return nil, fmt.Errorf("failed to get tip: %w", err)
}
if err := syncStore(store, cm, tip); err != nil {
return nil, fmt.Errorf("failed to subscribe to chain manager: %w", err)
}

reorgChan := make(chan types.ChainIndex, 1)
go func() {
for range reorgChan {
e.mu.Lock()
lastTip, err := store.Tip()
if errors.Is(err, ErrNoTip) {
lastTip = genesisIndex
} else if err != nil {
log.Error("failed to get tip", zap.Error(err))
}
if err := syncStore(store, cm, lastTip); err != nil {
log.Error("failed to sync store", zap.Error(err))
}
e.mu.Unlock()
}
}()

e.unsubscribe = cm.OnReorg(func(index types.ChainIndex) {
select {
case reorgChan <- index:
default:
}
})
return e, nil
}

// MerkleProof gets the merkle proof with the given leaf index.
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ go 1.21.6
require (
github.com/mattn/go-sqlite3 v1.14.22
go.etcd.io/bbolt v1.3.9
go.sia.tech/core v0.2.1
go.sia.tech/coreutils v0.0.3
go.sia.tech/core v0.2.2-0.20240325122830-e781eaa57d37
go.sia.tech/coreutils v0.0.4-0.20240327130436-3fc21abba2db
go.sia.tech/jape v0.11.1
go.uber.org/zap v1.27.0
golang.org/x/term v0.18.0
Expand All @@ -19,7 +19,7 @@ require (
github.com/julienschmidt/httprouter v1.3.0 // indirect
go.sia.tech/mux v1.2.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/tools v0.7.0 // indirect
)
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
go.sia.tech/core v0.2.1 h1:CqmMd+T5rAhC+Py3NxfvGtvsj/GgwIqQHHVrdts/LqY=
go.sia.tech/core v0.2.1/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q=
go.sia.tech/coreutils v0.0.3 h1:ZxuzovRpQMvfy/pCOV4om1cPF6sE15GyJyK36kIrF1Y=
go.sia.tech/coreutils v0.0.3/go.mod h1:UBFc77wXiE//eyilO5HLOncIEj7F69j0Nv2OkFujtP0=
go.sia.tech/core v0.2.2-0.20240325122830-e781eaa57d37 h1:jsiab6uAUkaeDL7XEseAxJw7NVhxLNoU2WaB0AHbgG8=
go.sia.tech/core v0.2.2-0.20240325122830-e781eaa57d37/go.mod h1:Zk7HaybEPgkPC1p6e6tTQr8PIeZClTgNcLNGYDLQJeE=
go.sia.tech/coreutils v0.0.4-0.20240327130436-3fc21abba2db h1:nfhcgN3zfwd+GdDUCrNmV4Ajf8VZSQcoXjGGmfs7V9E=
go.sia.tech/coreutils v0.0.4-0.20240327130436-3fc21abba2db/go.mod h1:QvsXghS4wqhJosQq3AkMjA2mJ6pbDB7PgG+w5b09/z0=
go.sia.tech/jape v0.11.1 h1:M7IP+byXL7xOqzxcHUQuXW+q3sYMkYzmMlMw+q8ZZw0=
go.sia.tech/jape v0.11.1/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4=
go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU=
Expand All @@ -26,8 +26,8 @@ go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
Expand Down
7 changes: 5 additions & 2 deletions internal/syncerutil/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,14 @@ func (eps *EphemeralPeerStore) UpdatePeerInfo(peer string, fn func(*syncer.PeerI
}

// PeerInfo implements PeerStore.
func (eps *EphemeralPeerStore) PeerInfo(peer string) (syncer.PeerInfo, bool) {
func (eps *EphemeralPeerStore) PeerInfo(peer string) (syncer.PeerInfo, error) {
eps.mu.Lock()
defer eps.mu.Unlock()
info, ok := eps.peers[peer]
return info, ok
if !ok {
return info, errors.New("no such peer")
}
return info, nil
}

// Ban implements PeerStore.
Expand Down
74 changes: 24 additions & 50 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (s *Store) updateMaturedBalances(dbTxn txn, update consensusUpdate, height
return nil
}

_, isRevert := update.(*chain.RevertUpdate)
_, isRevert := update.(chain.RevertUpdate)
if isRevert {
height++
}
Expand Down Expand Up @@ -445,7 +445,7 @@ func (s *Store) updateMaturedBalances(dbTxn txn, update consensusUpdate, height

func (s *Store) addSiacoinElements(dbTxn txn, bid types.BlockID, update consensusUpdate) (map[types.SiacoinOutputID]int64, error) {
sources := make(map[types.SiacoinOutputID]explorer.Source)
if applyUpdate, ok := update.(*chain.ApplyUpdate); ok {
if applyUpdate, ok := update.(chain.ApplyUpdate); ok {
block := applyUpdate.Block
for i := range block.MinerPayouts {
sources[bid.MinerOutputID(i)] = explorer.SourceMinerPayout
Expand Down Expand Up @@ -593,9 +593,28 @@ func (s *Store) deleteBlock(dbTxn txn, bid types.BlockID) error {
return err
}

func (s *Store) applyUpdates() error {
// ProcessChainUpdates implements explorer.Store.
func (s *Store) ProcessChainUpdates(crus []chain.RevertUpdate, caus []chain.ApplyUpdate) error {
return s.transaction(func(dbTxn txn) error {
for _, update := range s.pendingUpdates {
for _, cru := range crus {
if err := s.deleteBlock(dbTxn, cru.Block.ID()); err != nil {
return fmt.Errorf("revertUpdate: failed to delete block: %w", err)
} else if _, err := s.addSiacoinElements(dbTxn, cru.Block.ID(), cru); err != nil {
return fmt.Errorf("revertUpdate: failed to update siacoin output state: %w", err)
} else if _, err := s.addSiafundElements(dbTxn, cru.Block.ID(), cru); err != nil {
return fmt.Errorf("revertUpdate: failed to update siafund output state: %w", err)
} else if err := s.updateBalances(dbTxn, cru, cru.State.Index.Height); err != nil {
return fmt.Errorf("revertUpdate: failed to update balances: %w", err)
} else if err := s.updateMaturedBalances(dbTxn, cru, cru.State.Index.Height); err != nil {
return fmt.Errorf("revertUpdate: failed to update matured balances: %w", err)
} else if _, err := s.addFileContractElements(dbTxn, cru.Block.ID(), cru); err != nil {
return fmt.Errorf("revertUpdate: failed to update file contract state: %w", err)
} else if err := s.updateLeaves(dbTxn, cru); err != nil {
return fmt.Errorf("revertUpdate: failed to update leaves: %w", err)
}
}

for _, update := range caus {
scDBIds, err := s.addSiacoinElements(dbTxn, update.Block.ID(), update)
if err != nil {
return fmt.Errorf("applyUpdates: failed to add siacoin outputs: %w", err)
Expand Down Expand Up @@ -627,63 +646,18 @@ func (s *Store) applyUpdates() error {
return err
}
}
s.pendingUpdates = s.pendingUpdates[:0]
return nil
})
}

func (s *Store) revertUpdate(cru *chain.RevertUpdate) error {
return s.transaction(func(dbTxn txn) error {
if err := s.deleteBlock(dbTxn, cru.Block.ID()); err != nil {
return fmt.Errorf("revertUpdate: failed to delete block: %w", err)
} else if _, err := s.addSiacoinElements(dbTxn, cru.Block.ID(), cru); err != nil {
return fmt.Errorf("revertUpdate: failed to update siacoin output state: %w", err)
} else if _, err := s.addSiafundElements(dbTxn, cru.Block.ID(), cru); err != nil {
return fmt.Errorf("revertUpdate: failed to update siafund output state: %w", err)
} else if err := s.updateBalances(dbTxn, cru, cru.State.Index.Height); err != nil {
return fmt.Errorf("revertUpdate: failed to update balances: %w", err)
} else if err := s.updateMaturedBalances(dbTxn, cru, cru.State.Index.Height); err != nil {
return fmt.Errorf("revertUpdate: failed to update matured balances: %w", err)
} else if _, err := s.addFileContractElements(dbTxn, cru.Block.ID(), cru); err != nil {
return fmt.Errorf("revertUpdate: failed to update file contract state: %w", err)
}

return s.updateLeaves(dbTxn, cru)
})
}

// ProcessChainApplyUpdate implements chain.Subscriber.
func (s *Store) ProcessChainApplyUpdate(cau *chain.ApplyUpdate, mayCommit bool) error {
s.mu.Lock()
defer s.mu.Unlock()

s.pendingUpdates = append(s.pendingUpdates, cau)
if mayCommit {
return s.applyUpdates()
}
return nil
}

// ProcessChainRevertUpdate implements chain.Subscriber.
func (s *Store) ProcessChainRevertUpdate(cru *chain.RevertUpdate) error {
s.mu.Lock()
defer s.mu.Unlock()

if len(s.pendingUpdates) > 0 && s.pendingUpdates[len(s.pendingUpdates)-1].Block.ID() == cru.Block.ID() {
s.pendingUpdates = s.pendingUpdates[:len(s.pendingUpdates)-1]
return nil
}
return s.revertUpdate(cru)
}

// Tip implements explorer.Store.
func (s *Store) Tip() (result types.ChainIndex, err error) {
const query = `SELECT id, height FROM blocks ORDER BY height DESC LIMIT 1`
err = s.transaction(func(dbTx txn) error {
return dbTx.QueryRow(query).Scan(dbDecode(&result.ID), &result.Height)
})
if errors.Is(err, sql.ErrNoRows) {
return types.ChainIndex{}, ErrNoTip
return types.ChainIndex{}, explorer.ErrNoTip
}
return
}
Loading

0 comments on commit 86ef7c9

Please sign in to comment.