From ac6044f2db460a1fb906c994ab1140e88970d958 Mon Sep 17 00:00:00 2001 From: laizy Date: Wed, 14 Dec 2022 13:03:08 +0800 Subject: [PATCH] fix bloom index start height and error recover (#1419) --- core/store/ledgerstore/block_store.go | 100 +++++++++++++------------ core/store/ledgerstore/bloombits.go | 85 ++++++++------------- core/store/ledgerstore/ledger_store.go | 40 +--------- http/ethrpc/backend/backend.go | 25 ++++++- 4 files changed, 111 insertions(+), 139 deletions(-) diff --git a/core/store/ledgerstore/block_store.go b/core/store/ledgerstore/block_store.go index c468a824b..7e370237c 100644 --- a/core/store/ledgerstore/block_store.go +++ b/core/store/ledgerstore/block_store.go @@ -38,7 +38,7 @@ type BlockStore struct { enableCache bool //Is enable lru cache dbDir string //The path of store file cache *BlockCache //The cache of block, if have. - indexer bloomIndexer // Background processor generating the index data content + bloomCache map[uint32]*types2.Bloom //bloomCache for bloom index, delete cached bloom after calculating bloom index filterStart uint32 // Start block that filter supported store *leveldbstore.LevelDBStore //block store handler } @@ -64,56 +64,17 @@ func NewBlockStore(dbDir string, enableCache bool) (*BlockStore, error) { enableCache: enableCache, store: store, cache: cache, + bloomCache: make(map[uint32]*types2.Bloom, 4096*2), } - _, curBlockHeight, err := blockStore.GetCurrentBlock() - if err != nil { - if err != scom.ErrNotFound { - return nil, fmt.Errorf("get current block: %s", err.Error()) - } - curBlockHeight = 0 - } - - indexer := NewBloomIndexer(store, curBlockHeight/BloomBitsBlocks) - - start, err := indexer.GetFilterStart() - if err != nil { - if err != scom.ErrNotFound { - return nil, fmt.Errorf("get filter start: %s", err.Error()) - } - - var tmp uint32 - if curBlockHeight < config.GetAddDecimalsHeight() { - tmp = config.GetAddDecimalsHeight() - } else { - tmp = curBlockHeight - } - err = indexer.PutFilterStart(tmp) - if err != nil { - return nil, fmt.Errorf("put filter start: %s", err.Error()) - } - start = tmp - } - - blockStore.indexer = indexer - blockStore.filterStart = start - return blockStore, nil } -func (this *BlockStore) PutFilterStart(height uint32) error { - return this.indexer.PutFilterStart(height) -} - //NewBatch start a commit batch func (this *BlockStore) NewBatch() { this.store.NewBatch() } -func (this *BlockStore) GetIndexer() bloomIndexer { - return this.indexer -} - //SaveBlock persist block to store func (this *BlockStore) SaveBlock(block *types.Block) error { if this.enableCache { @@ -400,16 +361,28 @@ func (this *BlockStore) SaveBlockHash(height uint32, blockHash common.Uint256) { //SaveBloomData persist block bloom data to store func (this *BlockStore) SaveBloomData(height uint32, bloom types2.Bloom) { - this.Process(height, bloom) - if height != 0 && height%BloomBitsBlocks == 0 { - this.indexer.BatchPut() + if height < this.filterStart { + return } key := this.genBloomKey(height) this.store.BatchPut(key, bloom.Bytes()) + this.bloomCache[height] = &bloom + this.cleanStaleBloomData(height) + + if (height+1)%BloomBitsBlocks == 0 { + var blooms []types2.Bloom + for i := 0; i < BloomBitsBlocks; i++ { + blooms = append(blooms, *this.bloomCache[height+uint32(i)+1-BloomBitsBlocks]) + } + section := height / BloomBitsBlocks + PutBloomIndex(this.store, blooms, section) + } } -func (this *BlockStore) Process(height uint32, bloom types2.Bloom) { - this.indexer.Process(height, bloom) +func (this *BlockStore) cleanStaleBloomData(curHeight uint32) { + if curHeight > BloomBitsBlocks*2 { + delete(this.bloomCache, curHeight-BloomBitsBlocks*2) + } } //GetBloomData return bloom data by block height @@ -638,3 +611,38 @@ func (this *BlockStore) PruneBlock(hash common.Uint256) []common.Uint256 { this.store.BatchDelete(key) return txHashes } + +func (this *BlockStore) LoadBloomBits() error { + _, curBlockHeight, err := this.GetCurrentBlock() + if err != nil { + if err != scom.ErrNotFound { + return fmt.Errorf("get current block: %s", err.Error()) + } + curBlockHeight = 0 + } + + initStart := (curBlockHeight + 4095) / 4096 + if curBlockHeight < config.GetAddDecimalsHeight() { + initStart = config.GetAddDecimalsHeight() / 4096 * 4096 + } + + start, err := GetOrSetFilterStart(this.store, initStart) + if err != nil { + return err + } + this.filterStart = start + + if curBlockHeight < this.filterStart { + return nil + } + + loadStart := curBlockHeight - curBlockHeight%BloomBitsBlocks + for i := loadStart; i <= curBlockHeight; i++ { + bloom, err := this.GetBloomData(i) + if err != nil { + return fmt.Errorf("LoadBloom error %s", err) + } + this.bloomCache[i] = &bloom + } + return nil +} diff --git a/core/store/ledgerstore/bloombits.go b/core/store/ledgerstore/bloombits.go index 5c9478ebf..9cb79b583 100644 --- a/core/store/ledgerstore/bloombits.go +++ b/core/store/ledgerstore/bloombits.go @@ -19,8 +19,8 @@ package ledgerstore import ( "encoding/binary" + "fmt" "io" - "time" "github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/core/bloombits" @@ -30,82 +30,63 @@ import ( "github.com/ontio/ontology/core/store/leveldbstore" ) -const ( - // bloomServiceThreads is the number of goroutines used globally by an Ethereum - // instance to service bloombits lookups for all running filters. - BloomServiceThreads = 16 - - // bloomFilterThreads is the number of goroutines used locally per filter to - // multiplex requests onto the global servicing goroutines. - BloomFilterThreads = 3 - - // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service - // in a single batch. - BloomRetrievalBatch = 16 - - // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests - // to accumulate request an entire batch (avoiding hysteresis). - BloomRetrievalWait = time.Duration(0) - - // BloomBitsBlocks is the number of blocks a single bloom bit section vector - // contains on the server side. - BloomBitsBlocks uint32 = 4096 -) +// BloomBitsBlocks is the number of blocks a single bloom bit section vector +// contains on the server side. +const BloomBitsBlocks = 4096 var ( bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint32 big endian) + hash -> bloom bits ) -// bloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index -// for the Ethereum header bloom filters, permitting blazing fast filtering. -type bloomIndexer struct { - store *leveldbstore.LevelDBStore // database instance to write index data and metadata into - gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index - section uint32 // Section is the section number being processed currently -} - -func NewBloomIndexer(store *leveldbstore.LevelDBStore, section uint32) bloomIndexer { +func PutBloomIndex(store *leveldbstore.LevelDBStore, blooms []types.Bloom, section uint32) { gen, err := bloombits.NewGenerator(uint(BloomBitsBlocks)) if err != nil { panic(err) // never fired since BloomBitsBlocks is multiple of 8 } - b := bloomIndexer{ - store: store, + for i, b := range blooms { + err := gen.AddBloom(uint(i), b) + if err != nil { + panic(err) // never fired + } } - b.gen, b.section = gen, section - return b -} -// Process implements core.ChainIndexerBackend, adding a new header's bloom into -// the index. -func (b *bloomIndexer) Process(height uint32, bloom types.Bloom) { - b.gen.AddBloom(uint(height-b.section*BloomBitsBlocks), bloom) -} - -// BatchPut implements core.ChainIndexerBackend, finalizing the bloom section and -// writing it out into the database. -func (b *bloomIndexer) BatchPut() { for i := 0; i < types.BloomBitLength; i++ { - bits, err := b.gen.Bitset(uint(i)) + bits, err := gen.Bitset(uint(i)) if err != nil { panic(err) // never fired since idx is always less than 8 and section should be right } value := bitutil.CompressBytes(bits) - b.store.BatchPut(bloomBitsKey(uint(i), b.section), value) + store.BatchPut(bloomBitsKey(uint(i), section), value) } - b.section++ } -func (this *bloomIndexer) PutFilterStart(height uint32) error { +func PutFilterStart(db *leveldbstore.LevelDBStore, height uint32) error { key := genFilterStartKey() sink := common2.NewZeroCopySink(nil) sink.WriteUint32(height) - return this.store.Put(key, sink.Bytes()) + return db.Put(key, sink.Bytes()) +} + +func GetOrSetFilterStart(db *leveldbstore.LevelDBStore, def uint32) (uint32, error) { + start, err := GetFilterStart(db) + if err != nil { + if err != scom.ErrNotFound { + return 0, fmt.Errorf("get filter start: %s", err.Error()) + } + + err = PutFilterStart(db, def) + if err != nil { + return 0, fmt.Errorf("put filter start: %s", err.Error()) + } + start = def + } + + return start, nil } -func (this *bloomIndexer) GetFilterStart() (uint32, error) { +func GetFilterStart(db *leveldbstore.LevelDBStore) (uint32, error) { key := genFilterStartKey() - data, err := this.store.Get(key) + data, err := db.Get(key) if err != nil { return 0, err } diff --git a/core/store/ledgerstore/ledger_store.go b/core/store/ledgerstore/ledger_store.go index ca694e6c1..61469dd3f 100644 --- a/core/store/ledgerstore/ledger_store.go +++ b/core/store/ledgerstore/ledger_store.go @@ -96,7 +96,6 @@ type LedgerStoreImp struct { currBlockHeight uint32 //Current block height currBlockHash common.Uint256 //Current block hash headerCache map[common.Uint256]*types.Header //BlockHash => Header - bloomCache map[uint32]*types3.Bloom //bloomCache for bloom index, delete cached bloom after calculating bloom index headerIndex map[uint32]common.Uint256 //Header index, Mapping header height => block hash vbftPeerInfoMap map[uint32]map[string]uint32 //key:block height,value:peerInfo lock sync.RWMutex @@ -112,7 +111,6 @@ func NewLedgerStore(dataDir string, stateHashHeight uint32) (*LedgerStoreImp, er ledgerStore := &LedgerStoreImp{ headerIndex: make(map[uint32]common.Uint256), headerCache: make(map[common.Uint256]*types.Header, 0), - bloomCache: make(map[uint32]*types3.Bloom, 4096), vbftPeerInfoMap: make(map[uint32]map[string]uint32), savingBlockSemaphore: make(chan bool, 1), stateHashCheckHeight: stateHashHeight, @@ -158,10 +156,6 @@ func (this *LedgerStoreImp) InitLedgerStoreWithGenesisBlock(genesisBlock *types. if err != nil { return fmt.Errorf("blockStore.ClearAll error %s", err) } - err = this.blockStore.PutFilterStart(0) - if err != nil { - return fmt.Errorf("blockStore.PutFilterStart error %s", err) - } err = this.stateStore.ClearAll() if err != nil { return fmt.Errorf("stateStore.ClearAll error %s", err) @@ -280,41 +274,13 @@ func (this *LedgerStoreImp) init() error { if err != nil { return fmt.Errorf("recoverStore error %s", err) } - err = this.loadBloomBits() + err = this.blockStore.LoadBloomBits() if err != nil { return fmt.Errorf("loadBloomBits error %s", err) } return nil } -func (this *LedgerStoreImp) loadBloomBits() error { - _, currentBlockHeight, err := this.blockStore.GetCurrentBlock() - if err != nil { - if err != scom.ErrNotFound { - return fmt.Errorf("LoadCurrentBlock error %s", err) - } - return nil - } - - if currentBlockHeight < this.blockStore.filterStart { - return nil - } - - start := currentBlockHeight - currentBlockHeight%BloomBitsBlocks - if start < this.blockStore.filterStart { - start = this.blockStore.filterStart - } - - for i := start; i <= currentBlockHeight; i++ { - bloom, err := this.blockStore.GetBloomData(i) - if err != nil { - return fmt.Errorf("LoadBloom error %s", err) - } - this.blockStore.Process(i, bloom) - } - return nil -} - func (this *LedgerStoreImp) loadCurrentBlock() error { currentBlockHash, currentBlockHeight, err := this.blockStore.GetCurrentBlock() if err != nil { @@ -786,9 +752,7 @@ func (this *LedgerStoreImp) saveBlockToBlockStore(block *types.Block, bloom type if err != nil { return fmt.Errorf("SaveBlock height %d hash %s error %s", blockHeight, blockHash.ToHexString(), err) } - if blockHeight >= config.GetAddDecimalsHeight() { - this.blockStore.SaveBloomData(blockHeight, bloom) - } + this.blockStore.SaveBloomData(blockHeight, bloom) return nil } diff --git a/http/ethrpc/backend/backend.go b/http/ethrpc/backend/backend.go index 406385af4..fe5d57365 100644 --- a/http/ethrpc/backend/backend.go +++ b/http/ethrpc/backend/backend.go @@ -20,6 +20,7 @@ package backend import ( "context" + "time" "github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/core/bloombits" @@ -28,6 +29,24 @@ import ( "github.com/ontio/ontology/http/base/actor" ) +const ( + // bloomServiceThreads is the number of goroutines used globally by an Ethereum + // instance to service bloombits lookups for all running filters. + BloomServiceThreads = 16 + + // bloomFilterThreads is the number of goroutines used locally per filter to + // multiplex requests onto the global servicing goroutines. + BloomFilterThreads = 3 + + // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service + // in a single batch. + BloomRetrievalBatch = 16 + + // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests + // to accumulate request an entire batch (avoiding hysteresis). + BloomRetrievalWait = time.Duration(0) +) + type BloomBackend struct { bloomRequests chan chan *bloombits.Retrieval closeBloomHandler chan struct{} @@ -46,8 +65,8 @@ func (b *BloomBackend) Close() { } func (b *BloomBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - for i := 0; i < ledgerstore.BloomFilterThreads; i++ { - go session.Multiplex(ledgerstore.BloomRetrievalBatch, ledgerstore.BloomRetrievalWait, b.bloomRequests) + for i := 0; i < BloomFilterThreads; i++ { + go session.Multiplex(BloomRetrievalBatch, BloomRetrievalWait, b.bloomRequests) } } @@ -58,7 +77,7 @@ func (b *BloomBackend) BloomStatus() (uint32, uint32) { // startBloomHandlers starts a batch of goroutines to accept bloom bit database // retrievals from possibly a range of filters and serving the data to satisfy. func (b *BloomBackend) StartBloomHandlers(sectionSize uint32, db *leveldbstore.LevelDBStore) error { - for i := 0; i < ledgerstore.BloomServiceThreads; i++ { + for i := 0; i < BloomServiceThreads; i++ { go func() { for { select {