From 237a1adbaad02a13ef54ff3bef9962b13f2c2dc3 Mon Sep 17 00:00:00 2001 From: lukechampine Date: Fri, 8 Nov 2024 21:35:19 -0500 Subject: [PATCH 1/4] chain,syncer: Add support for block pruning --- chain/db.go | 125 +++++++++++++++++++++++++++++++---------------- chain/manager.go | 12 ++++- chain/options.go | 7 +++ miner.go | 8 +-- syncer/peer.go | 4 +- syncer/syncer.go | 10 ++-- 6 files changed, 114 insertions(+), 52 deletions(-) diff --git a/chain/db.go b/chain/db.go index 9f1eda0..466acfa 100644 --- a/chain/db.go +++ b/chain/db.go @@ -13,43 +13,32 @@ import ( ) type supplementedBlock struct { - Block types.Block + Header *types.BlockHeader + Block *types.Block Supplement *consensus.V1BlockSupplement } func (sb supplementedBlock) EncodeTo(e *types.Encoder) { - e.WriteUint8(2) - (types.V2Block)(sb.Block).EncodeTo(e) - e.WriteBool(sb.Supplement != nil) - if sb.Supplement != nil { - sb.Supplement.EncodeTo(e) - } + e.WriteUint8(3) + types.EncodePtr(e, sb.Header) + types.EncodePtr(e, (*types.V2Block)(sb.Block)) + types.EncodePtr(e, sb.Supplement) } func (sb *supplementedBlock) DecodeFrom(d *types.Decoder) { - if v := d.ReadUint8(); v != 2 { - d.SetErr(fmt.Errorf("incompatible version (%d)", v)) - } - (*types.V2Block)(&sb.Block).DecodeFrom(d) - if d.ReadBool() { - sb.Supplement = new(consensus.V1BlockSupplement) - sb.Supplement.DecodeFrom(d) - } -} - -// helper type for decoding just the header information from a block -type supplementedHeader struct { - ParentID types.BlockID - Timestamp time.Time -} - -func (sh *supplementedHeader) DecodeFrom(d *types.Decoder) { - if v := d.ReadUint8(); v != 2 { + switch v := d.ReadUint8(); v { + case 2: + sb.Header = nil + sb.Block = new(types.Block) + (*types.V2Block)(sb.Block).DecodeFrom(d) + types.DecodePtr(d, &sb.Supplement) + case 3: + types.DecodePtr(d, &sb.Header) + types.DecodePtrCast[types.V2Block](d, &sb.Block) + types.DecodePtr(d, &sb.Supplement) + default: d.SetErr(fmt.Errorf("incompatible version (%d)", v)) } - sh.ParentID.DecodeFrom(d) - _ = d.ReadUint64() // nonce - sh.Timestamp = d.ReadTime() } type versionedState struct { @@ -304,21 +293,62 @@ func (db *DBStore) putState(cs consensus.State) { db.bucket(bStates).put(cs.Index.ID[:], versionedState{cs}) } -func (db *DBStore) getBlock(id types.BlockID) (b types.Block, bs *consensus.V1BlockSupplement, _ bool) { +func (db *DBStore) getBlock(id types.BlockID) (bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement, _ bool) { var sb supplementedBlock ok := db.bucket(bBlocks).get(id[:], &sb) - return sb.Block, sb.Supplement, ok + if sb.Header == nil { + sb.Header = new(types.BlockHeader) + *sb.Header = sb.Block.Header() + } + return *sb.Header, sb.Block, sb.Supplement, ok } -func (db *DBStore) putBlock(b types.Block, bs *consensus.V1BlockSupplement) { - id := b.ID() - db.bucket(bBlocks).put(id[:], supplementedBlock{b, bs}) +func (db *DBStore) putBlock(bh types.BlockHeader, b *types.Block, bs *consensus.V1BlockSupplement) { + id := bh.ID() + db.bucket(bBlocks).put(id[:], supplementedBlock{&bh, b, bs}) } -func (db *DBStore) getBlockHeader(id types.BlockID) (parentID types.BlockID, timestamp time.Time, _ bool) { - var sh supplementedHeader - ok := db.bucket(bBlocks).get(id[:], &sh) - return sh.ParentID, sh.Timestamp, ok +func (db *DBStore) getAncestorInfo(id types.BlockID) (parentID types.BlockID, timestamp time.Time, ok bool) { + ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) { + v := d.ReadUint8() + if v != 2 && v != 3 { + d.SetErr(fmt.Errorf("incompatible version (%d)", v)) + } + // kinda cursed; don't worry about it + if v == 3 { + if !d.ReadBool() { + d.ReadBool() + } + } + parentID.DecodeFrom(d) + _ = d.ReadUint64() // nonce + timestamp = d.ReadTime() + })) + return +} + +func (db *DBStore) getBlockHeader(id types.BlockID) (bh types.BlockHeader, ok bool) { + ok = db.bucket(bBlocks).get(id[:], types.DecoderFunc(func(d *types.Decoder) { + v := d.ReadUint8() + if v != 2 && v != 3 { + d.SetErr(fmt.Errorf("incompatible version (%d)", v)) + return + } + if v == 3 { + bhp := &bh + types.DecodePtr(d, &bhp) + if bhp != nil { + return + } else if !d.ReadBool() { + d.SetErr(errors.New("neither header nor block present")) + return + } + } + var b types.Block + (*types.V2Block)(&b).DecodeFrom(d) + bh = b.Header() + })) + return } func (db *DBStore) treeKey(row, col uint64) []byte { @@ -628,9 +658,9 @@ func (db *DBStore) AncestorTimestamp(id types.BlockID) (t time.Time, ok bool) { } break } - ancestorID, _, _ = db.getBlockHeader(ancestorID) + ancestorID, _, _ = db.getAncestorInfo(ancestorID) } - _, t, ok = db.getBlockHeader(ancestorID) + _, t, ok = db.getAncestorInfo(ancestorID) return } @@ -646,12 +676,23 @@ func (db *DBStore) AddState(cs consensus.State) { // Block implements Store. func (db *DBStore) Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) { - return db.getBlock(id) + _, b, bs, ok := db.getBlock(id) + if !ok || b == nil { + return types.Block{}, nil, false + } + return *b, bs, ok } // AddBlock implements Store. func (db *DBStore) AddBlock(b types.Block, bs *consensus.V1BlockSupplement) { - db.putBlock(b, bs) + db.putBlock(b.Header(), &b, bs) +} + +// PruneBlock implements Store. +func (db *DBStore) PruneBlock(id types.BlockID) { + if bh, _, _, ok := db.getBlock(id); ok { + db.putBlock(bh, nil, nil) + } } func (db *DBStore) shouldFlush() bool { @@ -743,7 +784,7 @@ func NewDBStore(db DB, n *consensus.Network, genesisBlock types.Block) (_ *DBSto dbs.putState(genesisState) bs := consensus.V1BlockSupplement{Transactions: make([]consensus.V1TransactionSupplement, len(genesisBlock.Transactions))} cs, cau := consensus.ApplyBlock(genesisState, genesisBlock, bs, time.Time{}) - dbs.putBlock(genesisBlock, &bs) + dbs.putBlock(genesisBlock.Header(), &genesisBlock, &bs) dbs.putState(cs) dbs.ApplyBlock(cs, cau) if err := dbs.Flush(); err != nil { diff --git a/chain/manager.go b/chain/manager.go index 679b281..53e47da 100644 --- a/chain/manager.go +++ b/chain/manager.go @@ -45,6 +45,7 @@ type Store interface { Block(id types.BlockID) (types.Block, *consensus.V1BlockSupplement, bool) AddBlock(b types.Block, bs *consensus.V1BlockSupplement) + PruneBlock(id types.BlockID) State(id types.BlockID) (consensus.State, bool) AddState(cs consensus.State) AncestorTimestamp(id types.BlockID) (time.Time, bool) @@ -74,12 +75,15 @@ func blockAndChild(s Store, id types.BlockID) (types.Block, *consensus.V1BlockSu // A Manager tracks multiple blockchains and identifies the best valid // chain. type Manager struct { - log *zap.Logger store Store tipState consensus.State onReorg map[[16]byte]func(types.ChainIndex) invalidBlocks map[types.BlockID]error + // configuration options + log *zap.Logger + pruneTarget uint64 + txpool struct { txns []types.Transaction v2txns []types.V2Transaction @@ -314,6 +318,12 @@ func (m *Manager) applyTip(index types.ChainIndex) error { m.store.ApplyBlock(cs, cau) m.applyPoolUpdate(cau, cs) m.tipState = cs + + if m.pruneTarget != 0 && cs.Index.Height > m.pruneTarget { + if index, ok := m.store.BestIndex(cs.Index.Height - m.pruneTarget); ok { + m.store.PruneBlock(index.ID) + } + } return nil } diff --git a/chain/options.go b/chain/options.go index f7fa10a..05d5f6d 100644 --- a/chain/options.go +++ b/chain/options.go @@ -11,3 +11,10 @@ func WithLog(l *zap.Logger) ManagerOption { m.log = l } } + +// WithPruneTarget sets the target number of blocks to store. +func WithPruneTarget(n uint64) ManagerOption { + return func(m *Manager) { + m.pruneTarget = n + } +} diff --git a/miner.go b/miner.go index db4358a..e028d9d 100644 --- a/miner.go +++ b/miner.go @@ -10,15 +10,17 @@ import ( // FindBlockNonce attempts to find a nonce for b that meets the PoW target. func FindBlockNonce(cs consensus.State, b *types.Block, timeout time.Duration) bool { - b.Nonce = 0 + bh := b.Header() + bh.Nonce = 0 factor := cs.NonceFactor() startBlock := time.Now() - for b.ID().CmpWork(cs.ChildTarget) < 0 { - b.Nonce += factor + for bh.ID().CmpWork(cs.ChildTarget) < 0 { + bh.Nonce += factor if time.Since(startBlock) > timeout { return false } } + b.Nonce = bh.Nonce return true } diff --git a/syncer/peer.go b/syncer/peer.go index 45a6d81..de98c86 100644 --- a/syncer/peer.go +++ b/syncer/peer.go @@ -182,8 +182,8 @@ func (p *Peer) SendCheckpoint(index types.ChainIndex, timeout time.Duration) (ty } // RelayV2Header relays a v2 block header to the peer. -func (p *Peer) RelayV2Header(h types.BlockHeader, timeout time.Duration) error { - return p.callRPC(&gateway.RPCRelayV2Header{Header: h}, timeout) +func (p *Peer) RelayV2Header(bh types.BlockHeader, timeout time.Duration) error { + return p.callRPC(&gateway.RPCRelayV2Header{Header: bh}, timeout) } // RelayV2BlockOutline relays a v2 block outline to the peer. diff --git a/syncer/syncer.go b/syncer/syncer.go index f1ac29b..6778d79 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -533,9 +533,9 @@ func (s *Syncer) peerLoop(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, s.config.ConnectTimeout) if _, err := s.Connect(ctx, p); err != nil { - log.Debug("connected to peer", zap.String("peer", p)) - } else { log.Debug("failed to connect to peer", zap.String("peer", p), zap.Error(err)) + } else { + log.Debug("connected to peer", zap.String("peer", p)) } cancel() lastTried[p] = time.Now() @@ -723,10 +723,12 @@ func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) { } // BroadcastHeader broadcasts a header to all peers. -func (s *Syncer) BroadcastHeader(h types.BlockHeader) { s.relayHeader(h, nil) } +func (s *Syncer) BroadcastHeader(bh types.BlockHeader) { s.relayHeader(bh, nil) } // BroadcastV2Header broadcasts a v2 header to all peers. -func (s *Syncer) BroadcastV2Header(h types.BlockHeader) { s.relayV2Header(h, nil) } +func (s *Syncer) BroadcastV2Header(bh types.BlockHeader) { + s.relayV2Header(bh, nil) +} // BroadcastV2BlockOutline broadcasts a v2 block outline to all peers. func (s *Syncer) BroadcastV2BlockOutline(b gateway.V2BlockOutline) { s.relayV2BlockOutline(b, nil) } From 1eba8522f72929d1c621e544fd4dfccf08fb6c79 Mon Sep 17 00:00:00 2001 From: lukechampine Date: Fri, 8 Nov 2024 21:35:48 -0500 Subject: [PATCH 2/4] testutil: Rename MemPeerStore to EphemeralPeerStore --- rhp/v4/rpc_test.go | 2 +- syncer/syncer_test.go | 4 ++-- testutil/syncer.go | 24 ++++++++++++------------ 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/rhp/v4/rpc_test.go b/rhp/v4/rpc_test.go index a869635..e264b1b 100644 --- a/rhp/v4/rpc_test.go +++ b/rhp/v4/rpc_test.go @@ -74,7 +74,7 @@ func startTestNode(tb testing.TB, n *consensus.Network, genesis types.Block) (*c } tb.Cleanup(func() { syncerListener.Close() }) - s := syncer.New(syncerListener, cm, testutil.NewMemPeerStore(), gateway.Header{ + s := syncer.New(syncerListener, cm, testutil.NewEphemeralPeerStore(), gateway.Header{ GenesisID: genesis.ID(), UniqueID: gateway.GenerateUniqueID(), NetAddress: "localhost:1234", diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index d3551a0..eb5d673 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -42,7 +42,7 @@ func TestSyncer(t *testing.T) { } defer l2.Close() - s1 := syncer.New(l1, cm1, testutil.NewMemPeerStore(), gateway.Header{ + s1 := syncer.New(l1, cm1, testutil.NewEphemeralPeerStore(), gateway.Header{ GenesisID: genesis.ID(), UniqueID: gateway.GenerateUniqueID(), NetAddress: l1.Addr().String(), @@ -50,7 +50,7 @@ func TestSyncer(t *testing.T) { defer s1.Close() go s1.Run(context.Background()) - s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{ + s2 := syncer.New(l2, cm2, testutil.NewEphemeralPeerStore(), gateway.Header{ GenesisID: genesis.ID(), UniqueID: gateway.GenerateUniqueID(), NetAddress: l2.Addr().String(), diff --git a/testutil/syncer.go b/testutil/syncer.go index 1aba4f6..caa621a 100644 --- a/testutil/syncer.go +++ b/testutil/syncer.go @@ -7,15 +7,15 @@ import ( "go.sia.tech/coreutils/syncer" ) -// A MemPeerStore is an in-memory implementation of a PeerStore. -type MemPeerStore struct { +// A EphemeralPeerStore is an in-memory implementation of a PeerStore. +type EphemeralPeerStore struct { mu sync.Mutex peers map[string]syncer.PeerInfo } // AddPeer adds a peer to the store. If the peer already exists, nil should // be returned. -func (ps *MemPeerStore) AddPeer(addr string) error { +func (ps *EphemeralPeerStore) AddPeer(addr string) error { ps.mu.Lock() defer ps.mu.Unlock() if _, ok := ps.peers[addr]; ok { @@ -26,7 +26,7 @@ func (ps *MemPeerStore) AddPeer(addr string) error { } // Peers returns the set of known peers. -func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) { +func (ps *EphemeralPeerStore) Peers() ([]syncer.PeerInfo, error) { ps.mu.Lock() defer ps.mu.Unlock() var peers []syncer.PeerInfo @@ -38,7 +38,7 @@ func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) { // PeerInfo returns the metadata for the specified peer or ErrPeerNotFound // if the peer wasn't found in the store. -func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) { +func (ps *EphemeralPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) { ps.mu.Lock() defer ps.mu.Unlock() p, ok := ps.peers[addr] @@ -50,7 +50,7 @@ func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) { // UpdatePeerInfo updates the metadata for the specified peer. If the peer // is not found, the error should be ErrPeerNotFound. -func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error { +func (ps *EphemeralPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error { ps.mu.Lock() defer ps.mu.Unlock() p := ps.peers[addr] @@ -61,18 +61,18 @@ func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) e // Ban temporarily bans one or more IPs. The addr should either be a single // IP with port (e.g. 1.2.3.4:5678) or a CIDR subnet (e.g. 1.2.3.4/16). -func (ps *MemPeerStore) Ban(addr string, duration time.Duration, reason string) error { +func (ps *EphemeralPeerStore) Ban(addr string, duration time.Duration, reason string) error { return nil } // Banned returns false -func (ps *MemPeerStore) Banned(addr string) (bool, error) { return false, nil } +func (ps *EphemeralPeerStore) Banned(addr string) (bool, error) { return false, nil } -var _ syncer.PeerStore = (*MemPeerStore)(nil) +var _ syncer.PeerStore = (*EphemeralPeerStore)(nil) -// NewMemPeerStore returns a new MemPeerStore. -func NewMemPeerStore() *MemPeerStore { - return &MemPeerStore{ +// NewEphemeralPeerStore returns a new EphemeralPeerStore. +func NewEphemeralPeerStore() *EphemeralPeerStore { + return &EphemeralPeerStore{ peers: make(map[string]syncer.PeerInfo), } } From 50054723a0ee23c73831f484315eaa2905a2f1ec Mon Sep 17 00:00:00 2001 From: "knope-bot[bot]" <152252888+knope-bot[bot]@users.noreply.github.com> Date: Thu, 5 Dec 2024 01:47:02 +0000 Subject: [PATCH 3/4] Auto generate changeset --- .changeset/add_support_for_block_pruning.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 .changeset/add_support_for_block_pruning.md diff --git a/.changeset/add_support_for_block_pruning.md b/.changeset/add_support_for_block_pruning.md new file mode 100644 index 0000000..9f37597 --- /dev/null +++ b/.changeset/add_support_for_block_pruning.md @@ -0,0 +1,17 @@ +--- +default: minor +--- + +# Add support for block pruning + +#116 by @lukechampine + +Been a long time coming. 😅 + +The strategy here is quite naive, but I think it will be serviceable. Basically, when we apply a block `N`, we delete block `N-P`. `P` is therefore the "prune target," i.e. the maximum number of blocks you want to store. + +In practice, this isn't exhaustive: it only deletes blocks from the best chain. It also won't dramatically shrink the size of an existing database. I think this is acceptable, because pruning is most important during the initial sync, and during the initial sync, you'll only be receiving blocks from one chain at a time. Also, we don't want to make pruning *too* easy; after all, we need a good percentage of nodes to be storing the full chain, so that others can sync to them. + +I tested this out locally with a prune target of 1000, and after syncing 400,000 blocks, my `consensus.db` was around 18 GB. This is disappointing; it should be much smaller. With some investigation, I found that the Bolt database was only storing ~5 GB of data (most of which was the accumulator tree, which we can't prune until after v2). I think this is a combination of a) Bolt grows the DB capacity aggressively in response to writes, and b) Bolt never shrinks the DB capacity. So it's possible that we could reduce this number by tweaking our DB batching parameters. Alternatively, we could provide a tool that copies the DB to a new file. Not the most user-friendly, but again, I think I'm okay with that for now. + +Depends on https://github.com/SiaFoundation/core/pull/228 From a349c353bcb65244244b3615664f1219643e889e Mon Sep 17 00:00:00 2001 From: Luke Champine Date: Sat, 7 Dec 2024 09:10:26 -0500 Subject: [PATCH 4/4] Update add_support_for_block_pruning.md --- .changeset/add_support_for_block_pruning.md | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/.changeset/add_support_for_block_pruning.md b/.changeset/add_support_for_block_pruning.md index 9f37597..95fe1ad 100644 --- a/.changeset/add_support_for_block_pruning.md +++ b/.changeset/add_support_for_block_pruning.md @@ -1,17 +1,7 @@ ---- -default: minor ---- - -# Add support for block pruning - -#116 by @lukechampine - -Been a long time coming. 😅 +--- +default: minor +--- -The strategy here is quite naive, but I think it will be serviceable. Basically, when we apply a block `N`, we delete block `N-P`. `P` is therefore the "prune target," i.e. the maximum number of blocks you want to store. +# Add support for block pruning -In practice, this isn't exhaustive: it only deletes blocks from the best chain. It also won't dramatically shrink the size of an existing database. I think this is acceptable, because pruning is most important during the initial sync, and during the initial sync, you'll only be receiving blocks from one chain at a time. Also, we don't want to make pruning *too* easy; after all, we need a good percentage of nodes to be storing the full chain, so that others can sync to them. - -I tested this out locally with a prune target of 1000, and after syncing 400,000 blocks, my `consensus.db` was around 18 GB. This is disappointing; it should be much smaller. With some investigation, I found that the Bolt database was only storing ~5 GB of data (most of which was the accumulator tree, which we can't prune until after v2). I think this is a combination of a) Bolt grows the DB capacity aggressively in response to writes, and b) Bolt never shrinks the DB capacity. So it's possible that we could reduce this number by tweaking our DB batching parameters. Alternatively, we could provide a tool that copies the DB to a new file. Not the most user-friendly, but again, I think I'm okay with that for now. - -Depends on https://github.com/SiaFoundation/core/pull/228 +The chain manager can now automatically delete blocks after a configurable number of confirmations. Note that this does not apply retroactively.