Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for block pruning #116

Merged
merged 4 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/add_support_for_block_pruning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
default: minor
---

# Add support for block pruning

The chain manager can now automatically delete blocks after a configurable number of confirmations. Note that this does not apply retroactively.
125 changes: 83 additions & 42 deletions chain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,32 @@ import (
)

type supplementedBlock struct {
Block types.Block
Header *types.BlockHeader
lukechampine marked this conversation as resolved.
Show resolved Hide resolved
Block *types.Block
Supplement *consensus.V1BlockSupplement
}

func (sb supplementedBlock) EncodeTo(e *types.Encoder) {
e.WriteUint8(2)
(types.V2Block)(sb.Block).EncodeTo(e)
e.WriteBool(sb.Supplement != nil)
if sb.Supplement != nil {
sb.Supplement.EncodeTo(e)
}
e.WriteUint8(3)
types.EncodePtr(e, sb.Header)
types.EncodePtr(e, (*types.V2Block)(sb.Block))
types.EncodePtr(e, sb.Supplement)
}

func (sb *supplementedBlock) DecodeFrom(d *types.Decoder) {
if v := d.ReadUint8(); v != 2 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
(*types.V2Block)(&sb.Block).DecodeFrom(d)
if d.ReadBool() {
sb.Supplement = new(consensus.V1BlockSupplement)
sb.Supplement.DecodeFrom(d)
}
}

// helper type for decoding just the header information from a block
type supplementedHeader struct {
ParentID types.BlockID
Timestamp time.Time
}

func (sh *supplementedHeader) DecodeFrom(d *types.Decoder) {
if v := d.ReadUint8(); v != 2 {
switch v := d.ReadUint8(); v {
case 2:
sb.Header = nil
sb.Block = new(types.Block)
(*types.V2Block)(sb.Block).DecodeFrom(d)
types.DecodePtr(d, &sb.Supplement)
case 3:
types.DecodePtr(d, &sb.Header)
types.DecodePtrCast[types.V2Block](d, &sb.Block)
types.DecodePtr(d, &sb.Supplement)
default:
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
sh.ParentID.DecodeFrom(d)
_ = d.ReadUint64() // nonce
sh.Timestamp = d.ReadTime()
}

type versionedState struct {
Expand Down Expand Up @@ -304,21 +293,62 @@ func (db *DBStore) putState(cs consensus.State) {
db.bucket(bStates).put(cs.Index.ID[:], versionedState{cs})
}

func (db *DBStore) getBlock(id types.BlockID) (b types.Block, bs *consensus.V1BlockSupplement, _ bool) {
func (db *DBStore) getBlock(id types.BlockID) (bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement, _ bool) {
var sb supplementedBlock
ok := db.bucket(bBlocks).get(id[:], &sb)
return sb.Block, sb.Supplement, ok
if sb.Header == nil {
sb.Header = new(types.BlockHeader)
*sb.Header = sb.Block.Header()
}
return *sb.Header, sb.Block, sb.Supplement, ok
}

func (db *DBStore) putBlock(b types.Block, bs *consensus.V1BlockSupplement) {
id := b.ID()
db.bucket(bBlocks).put(id[:], supplementedBlock{b, bs})
func (db *DBStore) putBlock(bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement) {
id := bh.ID()
db.bucket(bBlocks).put(id[:], supplementedBlock{&bh, b, bs})
}

func (db *DBStore) getBlockHeader(id types.BlockID) (parentID types.BlockID, timestamp time.Time, _ bool) {
var sh supplementedHeader
ok := db.bucket(bBlocks).get(id[:], &sh)
return sh.ParentID, sh.Timestamp, ok
func (db *DBStore) getAncestorInfo(id types.BlockID) (parentID types.BlockID, timestamp time.Time, ok bool) {
ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) {
v := d.ReadUint8()
if v != 2 && v != 3 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
}
// kinda cursed; don't worry about it
if v == 3 {
if !d.ReadBool() {
d.ReadBool()
}
}
parentID.DecodeFrom(d)
_ = d.ReadUint64() // nonce
timestamp = d.ReadTime()
}))
return
}

func (db *DBStore) getBlockHeader(id types.BlockID) (bh types.BlockHeader, ok bool) {
ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) {
v := d.ReadUint8()
if v != 2 && v != 3 {
d.SetErr(fmt.Errorf("incompatible version (%d)", v))
return
}
if v == 3 {
bhp := &bh
types.DecodePtr(d, &bhp)
if bhp != nil {
return
} else if !d.ReadBool() {
d.SetErr(errors.New("neither header nor block present"))
return
}
}
var b types.Block
(*types.V2Block)(&b).DecodeFrom(d)
bh = b.Header()
}))
return
}

func (db *DBStore) treeKey(row, col uint64) []byte {
Expand Down Expand Up @@ -628,9 +658,9 @@ func (db *DBStore) AncestorTimestamp(id types.BlockID) (t time.Time, ok bool) {
}
break
}
ancestorID, _, _ = db.getBlockHeader(ancestorID)
ancestorID, _, _ = db.getAncestorInfo(ancestorID)
}
_, t, ok = db.getBlockHeader(ancestorID)
_, t, ok = db.getAncestorInfo(ancestorID)
return
}

Expand All @@ -646,12 +676,23 @@ func (db *DBStore) AddState(cs consensus.State) {

// Block implements Store.
func (db *DBStore) Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) {
return db.getBlock(id)
_, b, bs, ok := db.getBlock(id)
if !ok || b == nil {
return types.Block{}, nil, false
}
return *b, bs, ok
}

// AddBlock implements Store.
func (db *DBStore) AddBlock(b types.Block, bs *consensus.V1BlockSupplement) {
db.putBlock(b, bs)
db.putBlock(b.Header(), &b, bs)
}

// PruneBlock implements Store.
func (db *DBStore) PruneBlock(id types.BlockID) {
if bh, _, _, ok := db.getBlock(id); ok {
db.putBlock(bh, nil, nil)
}
}

func (db *DBStore) shouldFlush() bool {
Expand Down Expand Up @@ -743,7 +784,7 @@ func NewDBStore(db DB, n *consensus.Network, genesisBlock types.Block) (_ *DBSto
dbs.putState(genesisState)
bs := consensus.V1BlockSupplement{Transactions: make([]consensus.V1TransactionSupplement, len(genesisBlock.Transactions))}
cs, cau := consensus.ApplyBlock(genesisState, genesisBlock, bs, time.Time{})
dbs.putBlock(genesisBlock, &bs)
dbs.putBlock(genesisBlock.Header(), &genesisBlock, &bs)
dbs.putState(cs)
dbs.ApplyBlock(cs, cau)
if err := dbs.Flush(); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion chain/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Store interface {

Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool)
AddBlock(b types.Block, bs *consensus.V1BlockSupplement)
PruneBlock(id types.BlockID)
State(id types.BlockID) (consensus.State, bool)
AddState(cs consensus.State)
AncestorTimestamp(id types.BlockID) (time.Time, bool)
Expand Down Expand Up @@ -74,12 +75,15 @@ func blockAndChild(s Store, id types.BlockID) (types.Block, *consensus.V1BlockSu
// A Manager tracks multiple blockchains and identifies the best valid
// chain.
type Manager struct {
log *zap.Logger
store Store
tipState consensus.State
onReorg map[[16]byte]func(types.ChainIndex)
invalidBlocks map[types.BlockID]error

// configuration options
log *zap.Logger
pruneTarget uint64

txpool struct {
txns []types.Transaction
v2txns []types.V2Transaction
Expand Down Expand Up @@ -314,6 +318,12 @@ func (m *Manager) applyTip(index types.ChainIndex) error {
m.store.ApplyBlock(cs, cau)
m.applyPoolUpdate(cau, cs)
m.tipState = cs

if m.pruneTarget != 0 && cs.Index.Height > m.pruneTarget {
if index, ok := m.store.BestIndex(cs.Index.Height - m.pruneTarget); ok {
m.store.PruneBlock(index.ID)
}
}
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions chain/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ func WithLog(l *zap.Logger) ManagerOption {
m.log = l
}
}

// WithPruneTarget sets the target number of blocks to store.
func WithPruneTarget(n uint64) ManagerOption {
return func(m *Manager) {
m.pruneTarget = n
}
}
8 changes: 5 additions & 3 deletions miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ import (

// FindBlockNonce attempts to find a nonce for b that meets the PoW target.
func FindBlockNonce(cs consensus.State, b *types.Block, timeout time.Duration) bool {
b.Nonce = 0
bh := b.Header()
bh.Nonce = 0
factor := cs.NonceFactor()
startBlock := time.Now()
for b.ID().CmpWork(cs.ChildTarget) < 0 {
b.Nonce += factor
for bh.ID().CmpWork(cs.ChildTarget) < 0 {
bh.Nonce += factor
if time.Since(startBlock) > timeout {
return false
}
}
b.Nonce = bh.Nonce
return true
}

Expand Down
2 changes: 1 addition & 1 deletion rhp/v4/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func startTestNode(tb testing.TB, n *consensus.Network, genesis types.Block) (*c
}
tb.Cleanup(func() { syncerListener.Close() })

s := syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{
s := syncer.New(syncerListener, cm, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: "localhost:1234",
Expand Down
4 changes: 2 additions & 2 deletions syncer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ func (p *Peer) SendCheckpoint(index types.ChainIndex, timeout time.Duration) (ty
}

// RelayV2Header relays a v2 block header to the peer.
func (p *Peer) RelayV2Header(h types.BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayV2Header{Header: h}, timeout)
func (p *Peer) RelayV2Header(bh types.BlockHeader, timeout time.Duration) error {
return p.callRPC(&gateway.RPCRelayV2Header{Header: bh}, timeout)
}

// RelayV2BlockOutline relays a v2 block outline to the peer.
Expand Down
10 changes: 6 additions & 4 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,9 +533,9 @@ func (s *Syncer) peerLoop(ctx context.Context) error {

ctx, cancel := context.WithTimeout(ctx, s.config.ConnectTimeout)
if _, err := s.Connect(ctx, p); err != nil {
log.Debug("connected to peer", zap.String("peer", p))
} else {
log.Debug("failed to connect to peer", zap.String("peer", p), zap.Error(err))
} else {
log.Debug("connected to peer", zap.String("peer", p))
}
cancel()
lastTried[p] = time.Now()
Expand Down Expand Up @@ -723,10 +723,12 @@ func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) {
}

// BroadcastHeader broadcasts a header to all peers.
func (s *Syncer) BroadcastHeader(h types.BlockHeader) { s.relayHeader(h, nil) }
func (s *Syncer) BroadcastHeader(bh types.BlockHeader) { s.relayHeader(bh, nil) }

// BroadcastV2Header broadcasts a v2 header to all peers.
func (s *Syncer) BroadcastV2Header(h types.BlockHeader) { s.relayV2Header(h, nil) }
func (s *Syncer) BroadcastV2Header(bh types.BlockHeader) {
s.relayV2Header(bh, nil)
}

// BroadcastV2BlockOutline broadcasts a v2 block outline to all peers.
func (s *Syncer) BroadcastV2BlockOutline(b gateway.V2BlockOutline) { s.relayV2BlockOutline(b, nil) }
Expand Down
4 changes: 2 additions & 2 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ func TestSyncer(t *testing.T) {
}
defer l2.Close()

s1 := syncer.New(l1, cm1, testutil.NewMemPeerStore(), gateway.Header{
s1 := syncer.New(l1, cm1, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l1.Addr().String(),
}, syncer.WithLogger(log.Named("syncer1")))
defer s1.Close()
go s1.Run(context.Background())

s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{
s2 := syncer.New(l2, cm2, testutil.NewEphemeralPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l2.Addr().String(),
Expand Down
24 changes: 12 additions & 12 deletions testutil/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"go.sia.tech/coreutils/syncer"
)

// A MemPeerStore is an in-memory implementation of a PeerStore.
type MemPeerStore struct {
// A EphemeralPeerStore is an in-memory implementation of a PeerStore.
type EphemeralPeerStore struct {
n8maninger marked this conversation as resolved.
Show resolved Hide resolved
mu sync.Mutex
peers map[string]syncer.PeerInfo
}

// AddPeer adds a peer to the store. If the peer already exists, nil should
// be returned.
func (ps *MemPeerStore) AddPeer(addr string) error {
func (ps *EphemeralPeerStore) AddPeer(addr string) error {
ps.mu.Lock()
defer ps.mu.Unlock()
if _, ok := ps.peers[addr]; ok {
Expand All @@ -26,7 +26,7 @@ func (ps *MemPeerStore) AddPeer(addr string) error {
}

// Peers returns the set of known peers.
func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) {
func (ps *EphemeralPeerStore) Peers() ([]syncer.PeerInfo, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peers []syncer.PeerInfo
Expand All @@ -38,7 +38,7 @@ func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) {

// PeerInfo returns the metadata for the specified peer or ErrPeerNotFound
// if the peer wasn't found in the store.
func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) {
func (ps *EphemeralPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
p, ok := ps.peers[addr]
Expand All @@ -50,7 +50,7 @@ func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) {

// UpdatePeerInfo updates the metadata for the specified peer. If the peer
// is not found, the error should be ErrPeerNotFound.
func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error {
func (ps *EphemeralPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error {
ps.mu.Lock()
defer ps.mu.Unlock()
p := ps.peers[addr]
Expand All @@ -61,18 +61,18 @@ func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) e

// Ban temporarily bans one or more IPs. The addr should either be a single
// IP with port (e.g. 1.2.3.4:5678) or a CIDR subnet (e.g. 1.2.3.4/16).
func (ps *MemPeerStore) Ban(addr string, duration time.Duration, reason string) error {
func (ps *EphemeralPeerStore) Ban(addr string, duration time.Duration, reason string) error {
return nil
}

// Banned returns false
func (ps *MemPeerStore) Banned(addr string) (bool, error) { return false, nil }
func (ps *EphemeralPeerStore) Banned(addr string) (bool, error) { return false, nil }

var _ syncer.PeerStore = (*MemPeerStore)(nil)
var _ syncer.PeerStore = (*EphemeralPeerStore)(nil)

// NewMemPeerStore returns a new MemPeerStore.
func NewMemPeerStore() *MemPeerStore {
return &MemPeerStore{
// NewEphemeralPeerStore returns a new EphemeralPeerStore.
func NewEphemeralPeerStore() *EphemeralPeerStore {
return &EphemeralPeerStore{
peers: make(map[string]syncer.PeerInfo),
}
}
Loading