From d8a70a4531cc983ac5194aa1fd1b27a6cb57451a Mon Sep 17 00:00:00 2001 From: Sunny Date: Sat, 2 Nov 2024 16:23:58 +0800 Subject: [PATCH] Revert "pevm: refactor txdag reader (#212)" This reverts commit aad4474097a5a84b8c8e9b384d740879a9b85c7d. --- core/blockchain.go | 190 ++++++++++++++++++++++++------ core/blockchain_test.go | 46 +++----- core/txdag_reader.go | 242 -------------------------------------- core/txdag_reader_test.go | 126 -------------------- 4 files changed, 176 insertions(+), 428 deletions(-) delete mode 100644 core/txdag_reader.go delete mode 100644 core/txdag_reader_test.go diff --git a/core/blockchain.go b/core/blockchain.go index a50f1e3d4d..fb7df3be5c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -18,6 +18,7 @@ package core import ( + "bufio" "bytes" "encoding/hex" "errors" @@ -95,8 +96,7 @@ var ( triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil) innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil) - txDAGGenerateTimer = metrics.NewRegisteredTimer("chain/block/txdag/gen", nil) - txDAGReaderChanGauge = metrics.NewRegisteredGauge("chain/block/txdag/reader/chan", nil) + txDAGGenerateTimer = metrics.NewRegisteredTimer("chain/block/txdag/gen", nil) parallelTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/txs", nil) parallelConflictTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/conflicttxs", nil) @@ -891,10 +891,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha bc.futureBlocks.Purge() if bc.txDAGReader != nil { - err := bc.txDAGReader.Reset(head) - if err != nil { - log.Error("reset txDAG reader fail", "err", err) - } + bc.txDAGReader.Reset(head) } // Clear safe block, finalized block if needed @@ -2751,38 +2748,33 @@ func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) { // startup with latest block curHeader := bc.CurrentHeader() if curHeader != nil && bc.txDAGReader != nil { - err := bc.txDAGReader.InitAndStartReadingLock(curHeader.Number.Uint64()) - if err != nil { - log.Error("load TxDAG from file err", "err", err, "output", output, "block", curHeader.Number, "latest", bc.txDAGReader.Latest()) - } else { - log.Info("load TxDAG from file", "output", output, "block", curHeader.Number, "latest", bc.txDAGReader.Latest()) - } + bc.txDAGReader.TxDAG(curHeader.Number.Uint64()) + log.Info("load TxDAG from file", "output", output, "block", curHeader.Number, "latest", bc.txDAGReader.Latest()) } return - } else { - // write handler - go func() { - writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - log.Error("OpenFile when open the txDAG output file", "file", output, "err", err) + } + + // write handler + go func() { + writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Error("OpenFile when open the txDAG output file", "file", output, "err", err) + return + } + bc.txDAGWriteCh = make(chan TxDAGOutputItem, 10000) + defer writeHandle.Close() + for { + select { + case <-bc.quit: return - } - bc.txDAGWriteCh = make(chan TxDAGOutputItem, 10000) - defer writeHandle.Close() - for { - select { - case <-bc.quit: - return - case item := <-bc.txDAGWriteCh: - if err := writeTxDAGToFile(writeHandle, item); err != nil { - log.Error("encode TxDAG err in OutputHandler", "err", err) - continue - } + case item := <-bc.txDAGWriteCh: + if err := writeTxDAGToFile(writeHandle, item); err != nil { + log.Error("encode TxDAG err in OutputHandler", "err", err) + continue } } - }() - } - + } + }() } type TxDAGOutputItem struct { @@ -2803,3 +2795,135 @@ func writeTxDAGToFile(writeHandle *os.File, item TxDAGOutputItem) error { _, err = writeHandle.Write(buf.Bytes()) return err } + +var TxDAGCacheSize = uint64(10000) + +type TxDAGFileReader struct { + output string + file *os.File + scanner *bufio.Scanner + cache map[uint64]types.TxDAG + latest uint64 + lock sync.RWMutex +} + +func NewTxDAGFileReader(output string) (*TxDAGFileReader, error) { + reader := &TxDAGFileReader{output: output} + err := reader.openFile(output) + if err != nil { + return nil, err + } + return reader, nil +} + +func (t *TxDAGFileReader) Close() { + t.lock.Lock() + defer t.lock.Unlock() + t.closeFile() +} + +func (t *TxDAGFileReader) openFile(output string) error { + file, err := os.Open(output) + if err != nil { + return err + } + scanner := bufio.NewScanner(file) + scanner.Buffer(make([]byte, 5*1024*1024), 5*1024*1024) + t.file = file + t.scanner = scanner + return nil +} + +func (t *TxDAGFileReader) closeFile() { + if t.scanner != nil { + t.scanner = nil + } + if t.file != nil { + t.file.Close() + t.file = nil + } +} + +func (t *TxDAGFileReader) Latest() uint64 { + t.lock.RLock() + defer t.lock.RUnlock() + return t.latest +} + +func (t *TxDAGFileReader) TxDAG(expect uint64) types.TxDAG { + t.lock.Lock() + defer t.lock.Unlock() + + if t.cache != nil && t.latest >= expect { + return t.cache[expect] + } + if t.scanner == nil { + return nil + } + + logTime := time.Now() + t.cache = make(map[uint64]types.TxDAG, TxDAGCacheSize) + for t.scanner.Scan() { + num, dag, err := readTxDAGItemFromLine(t.scanner.Text()) + if err != nil { + log.Error("query TxDAG error", "latest", t.latest, "err", err) + continue + } + if time.Since(logTime) > 10*time.Second { + logTime = time.Now() + log.Info("try load TxDAG from file", "num", num, "expect", expect, "cached", len(t.cache)) + } + // skip lower blocks + if expect > num { + continue + } + t.cache[num] = dag + t.latest = num + if uint64(len(t.cache)) >= TxDAGCacheSize { + break + } + } + if t.scanner.Err() != nil { + log.Error("scan TxDAG file got err", "expect", expect, "err", t.scanner.Err()) + } + + if time.Since(logTime) > 10*time.Second { + log.Info("try load TxDAG from file", "expect", expect, "cached", len(t.cache)) + } + return t.cache[expect] +} + +func (t *TxDAGFileReader) Reset(number uint64) error { + t.lock.Lock() + defer t.lock.Unlock() + if t.latest-TxDAGCacheSize <= number { + return nil + } + t.closeFile() + if err := t.openFile(t.output); err != nil { + return err + } + t.latest = 0 + t.cache = nil + return nil +} + +func readTxDAGItemFromLine(line string) (uint64, types.TxDAG, error) { + tokens := strings.Split(line, ",") + if len(tokens) != 2 { + return 0, nil, errors.New("txDAG output contain wrong size") + } + num, err := strconv.Atoi(tokens[0]) + if err != nil { + return 0, nil, err + } + enc, err := hex.DecodeString(tokens[1]) + if err != nil { + return 0, nil, err + } + txDAG, err := types.DecodeTxDAG(enc) + if err != nil { + return 0, nil, err + } + return uint64(num), txDAG, nil +} diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 2ebe7113ec..afc664171d 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -4339,33 +4339,39 @@ func TestTxDAGFile_ReadWrite(t *testing.T) { 0: types.NewEmptyTxDAG(), 1: makeEmptyPlainTxDAG(1), 2: makeEmptyPlainTxDAG(2, types.NonDependentRelFlag), + } + writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + require.NoError(t, err) + for num, dag := range except { + require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag})) + } + writeFile.Close() + + except2 := map[uint64]types.TxDAG{ 3: types.NewEmptyTxDAG(), 4: makeEmptyPlainTxDAG(4, types.NonDependentRelFlag, types.ExcludedTxFlag), 5: makeEmptyPlainTxDAG(5, types.NonDependentRelFlag, types.ExcludedTxFlag), } - writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + writeFile, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) require.NoError(t, err) - for i := uint64(0); i < 6; i++ { - if i == 3 { - writeFile.WriteString("num,tag\n") + for num, dag := range except2 { + if num == 5 { + writeFile.WriteString("num,txdag\n") + continue } - require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: i, txDAG: except[i]})) + require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag})) } - writeFile.Sync() writeFile.Close() reader, err := NewTxDAGFileReader(path) require.NoError(t, err) - err = reader.InitAndStartReadingLock(0) - require.NoError(t, err) - for reader.latest < 5 { - time.Sleep(10 * time.Millisecond) - } - for i := 0; i < 6; i++ { + for i := 0; i < 5; i++ { num := uint64(i) if except[num] != nil { - require.Equal(t, except[num], reader.TxDAG(num), "num:%d", num) + require.Equal(t, except[num], reader.TxDAG(num)) + continue } + require.Equal(t, except2[num], reader.TxDAG(num)) } } @@ -4386,29 +4392,18 @@ func TestTxDAGFile_LargeRead(t *testing.T) { for num := uint64(0); num < totalSize; num++ { require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: except[num]})) } - writeFile.Sync() writeFile.Close() reader, err := NewTxDAGFileReader(path) require.NoError(t, err) - err = reader.InitAndStartReadingLock(0) - require.NoError(t, err) - for i := uint64(0); i < totalSize; i++ { - for reader.latest < i || reader.latest == 0 { - time.Sleep(10 * time.Millisecond) - } require.Equal(t, except[i], reader.TxDAG(i), i) } // test reset to genesis err = reader.Reset(0) require.NoError(t, err) - for i := uint64(0); i < totalSize; i++ { - for reader.latest < i || reader.latest == 0 { - time.Sleep(10 * time.Millisecond) - } require.Equal(t, except[i], reader.TxDAG(i), i) } @@ -4416,9 +4411,6 @@ func TestTxDAGFile_LargeRead(t *testing.T) { err = reader.Reset(totalSize - TxDAGCacheSize) require.NoError(t, err) for i := totalSize - TxDAGCacheSize; i < totalSize; i++ { - for reader.latest < i || reader.latest == 0 { - time.Sleep(10 * time.Millisecond) - } require.Equal(t, except[i], reader.TxDAG(i), i) } } diff --git a/core/txdag_reader.go b/core/txdag_reader.go deleted file mode 100644 index 957f9d60b6..0000000000 --- a/core/txdag_reader.go +++ /dev/null @@ -1,242 +0,0 @@ -package core - -import ( - "bufio" - "encoding/hex" - "errors" - "os" - "strconv" - "strings" - "sync" - "time" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" -) - -var TxDAGCacheSize = uint64(300) - -type TxDAGFileReader struct { - output string - file *os.File - scanner *bufio.Scanner - dagChan chan *TxDAGOutputItem - chanFirstBlockNumber int64 - latest uint64 - lock sync.RWMutex - isInit bool - closeChan chan struct{} -} - -func NewTxDAGFileReader(output string) (*TxDAGFileReader, error) { - reader := &TxDAGFileReader{ - output: output, - dagChan: make(chan *TxDAGOutputItem, TxDAGCacheSize), - closeChan: make(chan struct{}, 1), - chanFirstBlockNumber: -1, - } - err := reader.openFile(output) - if err != nil { - return nil, err - } - return reader, nil -} - -func (t *TxDAGFileReader) Close() { - t.lock.Lock() - defer t.lock.Unlock() - t.closeChan <- struct{}{} - t.closeFile() -} - -func (t *TxDAGFileReader) openFile(output string) error { - file, err := os.Open(output) - if err != nil { - return err - } - scanner := bufio.NewScanner(file) - scanner.Buffer(make([]byte, 5*1024*1024), 5*1024*1024) - t.file = file - t.scanner = scanner - return nil -} - -func (t *TxDAGFileReader) closeFile() { - if t.scanner != nil { - t.scanner = nil - } - if t.file != nil { - t.file.Close() - t.file = nil - } -} - -func (t *TxDAGFileReader) Latest() uint64 { - t.lock.RLock() - defer t.lock.RUnlock() - return t.latest -} -func (t *TxDAGFileReader) InitAndStartReadingLock(startBlockNum uint64) error { - t.lock.Lock() - defer t.lock.Unlock() - return t.initAndStartReading(startBlockNum) -} -func (t *TxDAGFileReader) initAndStartReading(startBlockNum uint64) error { - if t.isInit { - return nil - } - if t.scanner == nil { - return errors.New("TxDAG reader init fail,missing scanner") - } - if startBlockNum > 0 { - //We move the scanner to the position of startBlockNum-1 so that we can start reading data from startBlockNum next. - startBlockNum = startBlockNum - 1 - for t.scanner != nil && t.scanner.Scan() { - text := t.scanner.Text() - blockNum, err := readTxDAGBlockNumFromLine(text) - if err != nil { - log.Error("TxDAG reader init fail at readTxDAGBlockNumFromLine", "err", err, "text", text) - return err - } - if startBlockNum > blockNum { - continue - } - t.latest = blockNum - break - } - if t.scanner != nil && t.scanner.Err() != nil { - log.Error("TxDAG reader init, scan TxDAG file got err", "err", t.scanner.Err(), "startBlockNum", startBlockNum, "latest", t.latest) - return t.scanner.Err() - } - } - log.Info("TxDAG reader init done", "startBlockNum", startBlockNum, "latest", t.latest) - go t.loopReadDAGIntoChan() - t.isInit = true - return nil -} - -func (t *TxDAGFileReader) loopReadDAGIntoChan() { - start := time.Now() - - for t.scanner != nil && t.scanner.Scan() { - select { - case <-t.closeChan: - close(t.dagChan) - log.Info("TxDAG reader is closed. Exiting...", "latest", t.latest) - return - default: - text := t.scanner.Text() - num, dag, err := readTxDAGItemFromLine(text) - if err != nil { - log.Error("query TxDAG error", "latest", t.latest, "err", err) - continue - } - t.dagChan <- &TxDAGOutputItem{blockNumber: num, txDAG: dag} - t.latest = num - if t.chanFirstBlockNumber == -1 { - t.chanFirstBlockNumber = int64(num) - } - if time.Since(start) > 1*time.Minute { - log.Debug("TxDAG reader dagChan report", "dagChanSize", len(t.dagChan), "latest", t.latest, "chanFirstBlockNumber", t.chanFirstBlockNumber) - txDAGReaderChanGauge.Update(int64(len(t.dagChan))) - start = time.Now() - } - } - } - if t.scanner != nil && t.scanner.Err() != nil { - log.Error("scan TxDAG file got err", "latest", t.latest, "err", t.scanner.Err()) - } else { - log.Info("TxDAG reader done. Exiting...", "latest", t.latest) - } -} - -func (t *TxDAGFileReader) TxDAG(expect uint64) types.TxDAG { - t.lock.Lock() - defer t.lock.Unlock() - - if !t.isInit { - log.Error("TxDAG reader not init yet") - return nil - } - - if t.scanner == nil { - return nil - } - - if t.chanFirstBlockNumber > int64(expect) { - log.Debug("expect less than chanFirstBlockNumber,skip", "expect", expect, "chanFirstBlockNumber", t.chanFirstBlockNumber) - return nil - } - - for { - select { - case dag := <-t.dagChan: - if dag == nil { - return nil - } - t.chanFirstBlockNumber = int64(dag.blockNumber + 1) - if dag.blockNumber < expect { - continue - } else if dag.blockNumber > expect { - log.Warn("dag.blockNumber > expect", "dag.blockNumber", dag.blockNumber, "expect", expect, "chanFirstBlockNumber", t.chanFirstBlockNumber) - return nil - } else { - return dag.txDAG - } - default: - return nil - } - } -} - -func (t *TxDAGFileReader) Reset(number uint64) error { - t.lock.Lock() - defer t.lock.Unlock() - t.closeChan <- struct{}{} - t.closeFile() - if err := t.openFile(t.output); err != nil { - return err - } - t.latest = 0 - t.chanFirstBlockNumber = -1 - t.isInit = false - t.closeChan = make(chan struct{}, 1) - t.dagChan = make(chan *TxDAGOutputItem, TxDAGCacheSize) - err := t.initAndStartReading(number) - if err != nil { - return err - } - return nil -} - -func readTxDAGBlockNumFromLine(line string) (uint64, error) { - tokens := strings.Split(line, ",") - if len(tokens) != 2 { - return 0, errors.New("txDAG output contain wrong size") - } - num, err := strconv.Atoi(tokens[0]) - if err != nil { - return 0, err - } - return uint64(num), nil -} - -func readTxDAGItemFromLine(line string) (uint64, types.TxDAG, error) { - tokens := strings.Split(line, ",") - if len(tokens) != 2 { - return 0, nil, errors.New("txDAG output contain wrong size") - } - num, err := strconv.Atoi(tokens[0]) - if err != nil { - return 0, nil, err - } - enc, err := hex.DecodeString(tokens[1]) - if err != nil { - return 0, nil, err - } - txDAG, err := types.DecodeTxDAG(enc) - if err != nil { - return 0, nil, err - } - return uint64(num), txDAG, nil -} diff --git a/core/txdag_reader_test.go b/core/txdag_reader_test.go deleted file mode 100644 index 66e66872ed..0000000000 --- a/core/txdag_reader_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package core - -import ( - "os" - "path/filepath" - "testing" - "time" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestNewTxDAGFileReader(t *testing.T) { - path := filepath.Join(os.TempDir(), "test.csv") - defer func() { - os.Remove(path) - }() - except := map[uint64]types.TxDAG{ - 0: types.NewEmptyTxDAG(), - 1: makeEmptyPlainTxDAG(1), - 2: makeEmptyPlainTxDAG(2, types.NonDependentRelFlag), - 3: types.NewEmptyTxDAG(), - 4: makeEmptyPlainTxDAG(4, types.NonDependentRelFlag, types.ExcludedTxFlag), - 5: makeEmptyPlainTxDAG(5, types.NonDependentRelFlag, types.ExcludedTxFlag), - } - writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) - require.NoError(t, err) - for i := uint64(0); i < 6; i++ { - require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: i, txDAG: except[i]})) - } - writeFile.Sync() - writeFile.Close() - - reader, err := NewTxDAGFileReader(path) - if err != nil { - t.Error("newReaderErr", "err", err) - return - } - err = reader.InitAndStartReadingLock(2) - if err != nil { - t.Error("newReaderErr", "err", err) - return - } - //Check the initialization status - assert.Equal(t, true, reader.isInit) - assert.NotNil(t, reader.scanner) - //Waiting for the first data to enter the channel - for reader.chanFirstBlockNumber == -1 { - time.Sleep(10 * time.Millisecond) - } - //The starting point is 2, so 1 should not exist, 2 should exist, and txCount==2 - dag1 := reader.TxDAG(1) - assert.Nil(t, dag1) - dag2 := reader.TxDAG(2) - assert.NotNil(t, dag2) - assert.Equal(t, 2, dag2.TxCount()) - //Waiting to process to 5 - for reader.latest < 5 { - time.Sleep(10 * time.Millisecond) - } - //There are 9 transactions in 20 - dag5 := reader.TxDAG(5) - assert.NotNil(t, dag5) - assert.Equal(t, 5, dag5.TxCount()) - //Already read 5, data less than 5 cannot be read. - dag3 := reader.TxDAG(3) - assert.Nil(t, dag3) - err = reader.Reset(2) - if err != nil { - t.Error("resetErr", "err", err) - return - } - //Check the initialization status again after reset - assert.Equal(t, true, reader.isInit) - assert.NotNil(t, reader.scanner) - //Waiting for 3 to be read - for reader.latest < 3 { - time.Sleep(10 * time.Millisecond) - } - //11 should no longer be nil, because after reset, we haven't read it yet. - dag3 = reader.TxDAG(3) - assert.NotNil(t, dag3) - assert.Equal(t, 0, dag3.TxCount()) - //1000 should not exist - dag1000 := reader.TxDAG(1000) - assert.Nil(t, dag1000) -} - -func TestTxDAGFileReader_Close(t *testing.T) { - path := filepath.Join(os.TempDir(), "test.csv") - defer func() { - os.Remove(path) - }() - except := map[uint64]types.TxDAG{ - 0: types.NewEmptyTxDAG(), - 1: makeEmptyPlainTxDAG(1), - 2: makeEmptyPlainTxDAG(2, types.NonDependentRelFlag), - 3: types.NewEmptyTxDAG(), - 4: makeEmptyPlainTxDAG(4, types.NonDependentRelFlag, types.ExcludedTxFlag), - 5: makeEmptyPlainTxDAG(5, types.NonDependentRelFlag, types.ExcludedTxFlag), - } - writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) - require.NoError(t, err) - for i := uint64(0); i < 6; i++ { - require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: i, txDAG: except[i]})) - } - writeFile.Sync() - writeFile.Close() - - reader, err := NewTxDAGFileReader(path) - if err != nil { - t.Error("newReaderErr", "err", err) - return - } - err = reader.InitAndStartReadingLock(2) - if err != nil { - t.Error("newReaderErr", "err", err) - return - } - assert.Equal(t, true, reader.isInit) - assert.NotNil(t, reader.scanner) - reader.Close() - assert.Nil(t, reader.scanner) - assert.Nil(t, reader.file) -}