Skip to content

Commit

Permalink
Revert "pevm: refactor txdag reader (bnb-chain#212)"
Browse files Browse the repository at this point in the history
This reverts commit aad4474.
  • Loading branch information
sunny2022da committed Nov 2, 2024
1 parent 5b5696a commit d8a70a4
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 428 deletions.
190 changes: 157 additions & 33 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package core

import (
"bufio"
"bytes"
"encoding/hex"
"errors"
Expand Down Expand Up @@ -95,8 +96,7 @@ var (
triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil)
innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil)

txDAGGenerateTimer = metrics.NewRegisteredTimer("chain/block/txdag/gen", nil)
txDAGReaderChanGauge = metrics.NewRegisteredGauge("chain/block/txdag/reader/chan", nil)
txDAGGenerateTimer = metrics.NewRegisteredTimer("chain/block/txdag/gen", nil)

parallelTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/txs", nil)
parallelConflictTxNumMeter = metrics.NewRegisteredMeter("chain/parallel/conflicttxs", nil)
Expand Down Expand Up @@ -891,10 +891,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
bc.futureBlocks.Purge()

if bc.txDAGReader != nil {
err := bc.txDAGReader.Reset(head)
if err != nil {
log.Error("reset txDAG reader fail", "err", err)
}
bc.txDAGReader.Reset(head)
}

// Clear safe block, finalized block if needed
Expand Down Expand Up @@ -2751,38 +2748,33 @@ func (bc *BlockChain) SetupTxDAGGeneration(output string, readFile bool) {
// startup with latest block
curHeader := bc.CurrentHeader()
if curHeader != nil && bc.txDAGReader != nil {
err := bc.txDAGReader.InitAndStartReadingLock(curHeader.Number.Uint64())
if err != nil {
log.Error("load TxDAG from file err", "err", err, "output", output, "block", curHeader.Number, "latest", bc.txDAGReader.Latest())
} else {
log.Info("load TxDAG from file", "output", output, "block", curHeader.Number, "latest", bc.txDAGReader.Latest())
}
bc.txDAGReader.TxDAG(curHeader.Number.Uint64())
log.Info("load TxDAG from file", "output", output, "block", curHeader.Number, "latest", bc.txDAGReader.Latest())
}
return
} else {
// write handler
go func() {
writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Error("OpenFile when open the txDAG output file", "file", output, "err", err)
}

// write handler
go func() {
writeHandle, err := os.OpenFile(output, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Error("OpenFile when open the txDAG output file", "file", output, "err", err)
return
}
bc.txDAGWriteCh = make(chan TxDAGOutputItem, 10000)
defer writeHandle.Close()
for {
select {
case <-bc.quit:
return
}
bc.txDAGWriteCh = make(chan TxDAGOutputItem, 10000)
defer writeHandle.Close()
for {
select {
case <-bc.quit:
return
case item := <-bc.txDAGWriteCh:
if err := writeTxDAGToFile(writeHandle, item); err != nil {
log.Error("encode TxDAG err in OutputHandler", "err", err)
continue
}
case item := <-bc.txDAGWriteCh:
if err := writeTxDAGToFile(writeHandle, item); err != nil {
log.Error("encode TxDAG err in OutputHandler", "err", err)
continue
}
}
}()
}

}
}()
}

type TxDAGOutputItem struct {
Expand All @@ -2803,3 +2795,135 @@ func writeTxDAGToFile(writeHandle *os.File, item TxDAGOutputItem) error {
_, err = writeHandle.Write(buf.Bytes())
return err
}

var TxDAGCacheSize = uint64(10000)

type TxDAGFileReader struct {
output string
file *os.File
scanner *bufio.Scanner
cache map[uint64]types.TxDAG
latest uint64
lock sync.RWMutex
}

func NewTxDAGFileReader(output string) (*TxDAGFileReader, error) {
reader := &TxDAGFileReader{output: output}
err := reader.openFile(output)
if err != nil {
return nil, err
}
return reader, nil
}

func (t *TxDAGFileReader) Close() {
t.lock.Lock()
defer t.lock.Unlock()
t.closeFile()
}

func (t *TxDAGFileReader) openFile(output string) error {
file, err := os.Open(output)
if err != nil {
return err
}
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 5*1024*1024), 5*1024*1024)
t.file = file
t.scanner = scanner
return nil
}

func (t *TxDAGFileReader) closeFile() {
if t.scanner != nil {
t.scanner = nil
}
if t.file != nil {
t.file.Close()
t.file = nil
}
}

func (t *TxDAGFileReader) Latest() uint64 {
t.lock.RLock()
defer t.lock.RUnlock()
return t.latest
}

func (t *TxDAGFileReader) TxDAG(expect uint64) types.TxDAG {
t.lock.Lock()
defer t.lock.Unlock()

if t.cache != nil && t.latest >= expect {
return t.cache[expect]
}
if t.scanner == nil {
return nil
}

logTime := time.Now()
t.cache = make(map[uint64]types.TxDAG, TxDAGCacheSize)
for t.scanner.Scan() {
num, dag, err := readTxDAGItemFromLine(t.scanner.Text())
if err != nil {
log.Error("query TxDAG error", "latest", t.latest, "err", err)
continue
}
if time.Since(logTime) > 10*time.Second {
logTime = time.Now()
log.Info("try load TxDAG from file", "num", num, "expect", expect, "cached", len(t.cache))
}
// skip lower blocks
if expect > num {
continue
}
t.cache[num] = dag
t.latest = num
if uint64(len(t.cache)) >= TxDAGCacheSize {
break
}
}
if t.scanner.Err() != nil {
log.Error("scan TxDAG file got err", "expect", expect, "err", t.scanner.Err())
}

if time.Since(logTime) > 10*time.Second {
log.Info("try load TxDAG from file", "expect", expect, "cached", len(t.cache))
}
return t.cache[expect]
}

func (t *TxDAGFileReader) Reset(number uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
if t.latest-TxDAGCacheSize <= number {
return nil
}
t.closeFile()
if err := t.openFile(t.output); err != nil {
return err
}
t.latest = 0
t.cache = nil
return nil
}

func readTxDAGItemFromLine(line string) (uint64, types.TxDAG, error) {
tokens := strings.Split(line, ",")
if len(tokens) != 2 {
return 0, nil, errors.New("txDAG output contain wrong size")
}
num, err := strconv.Atoi(tokens[0])
if err != nil {
return 0, nil, err
}
enc, err := hex.DecodeString(tokens[1])
if err != nil {
return 0, nil, err
}
txDAG, err := types.DecodeTxDAG(enc)
if err != nil {
return 0, nil, err
}
return uint64(num), txDAG, nil
}
46 changes: 19 additions & 27 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4339,33 +4339,39 @@ func TestTxDAGFile_ReadWrite(t *testing.T) {
0: types.NewEmptyTxDAG(),
1: makeEmptyPlainTxDAG(1),
2: makeEmptyPlainTxDAG(2, types.NonDependentRelFlag),
}
writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
require.NoError(t, err)
for num, dag := range except {
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag}))
}
writeFile.Close()

except2 := map[uint64]types.TxDAG{
3: types.NewEmptyTxDAG(),
4: makeEmptyPlainTxDAG(4, types.NonDependentRelFlag, types.ExcludedTxFlag),
5: makeEmptyPlainTxDAG(5, types.NonDependentRelFlag, types.ExcludedTxFlag),
}
writeFile, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
writeFile, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
require.NoError(t, err)
for i := uint64(0); i < 6; i++ {
if i == 3 {
writeFile.WriteString("num,tag\n")
for num, dag := range except2 {
if num == 5 {
writeFile.WriteString("num,txdag\n")
continue
}
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: i, txDAG: except[i]}))
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: dag}))
}
writeFile.Sync()
writeFile.Close()

reader, err := NewTxDAGFileReader(path)
require.NoError(t, err)
err = reader.InitAndStartReadingLock(0)
require.NoError(t, err)
for reader.latest < 5 {
time.Sleep(10 * time.Millisecond)
}
for i := 0; i < 6; i++ {
for i := 0; i < 5; i++ {
num := uint64(i)
if except[num] != nil {
require.Equal(t, except[num], reader.TxDAG(num), "num:%d", num)
require.Equal(t, except[num], reader.TxDAG(num))
continue
}
require.Equal(t, except2[num], reader.TxDAG(num))
}
}

Expand All @@ -4386,39 +4392,25 @@ func TestTxDAGFile_LargeRead(t *testing.T) {
for num := uint64(0); num < totalSize; num++ {
require.NoError(t, writeTxDAGToFile(writeFile, TxDAGOutputItem{blockNumber: num, txDAG: except[num]}))
}
writeFile.Sync()
writeFile.Close()

reader, err := NewTxDAGFileReader(path)
require.NoError(t, err)
err = reader.InitAndStartReadingLock(0)
require.NoError(t, err)

for i := uint64(0); i < totalSize; i++ {
for reader.latest < i || reader.latest == 0 {
time.Sleep(10 * time.Millisecond)
}
require.Equal(t, except[i], reader.TxDAG(i), i)
}

// test reset to genesis
err = reader.Reset(0)
require.NoError(t, err)

for i := uint64(0); i < totalSize; i++ {
for reader.latest < i || reader.latest == 0 {
time.Sleep(10 * time.Millisecond)
}
require.Equal(t, except[i], reader.TxDAG(i), i)
}

// test reset skip
err = reader.Reset(totalSize - TxDAGCacheSize)
require.NoError(t, err)
for i := totalSize - TxDAGCacheSize; i < totalSize; i++ {
for reader.latest < i || reader.latest == 0 {
time.Sleep(10 * time.Millisecond)
}
require.Equal(t, except[i], reader.TxDAG(i), i)
}
}
Expand Down
Loading

0 comments on commit d8a70a4

Please sign in to comment.