Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State recreation tests improvements #2442

Merged
merged 13 commits into from
Oct 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 57 additions & 54 deletions system_tests/recreatestate_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -338,6 +339,7 @@ func TestRecreateStateForRPCBlockNotFoundWhileRecreating(t *testing.T) {
}

func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig *gethexec.CachingConfig, txCount int) {
t.Parallel()
maxRecreateStateDepth := int64(30 * 1000 * 1000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -451,20 +453,26 @@ func TestSkippingSavingStateAndRecreatingAfterRestart(t *testing.T) {
cacheConfig.SnapshotCache = 0 // disable snapshots
cacheConfig.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are

runTestCase := func(t *testing.T, cacheConfig gethexec.CachingConfig, txes int) {
t.Run(fmt.Sprintf("skip-blocks-%d-skip-gas-%d-txes-%d", cacheConfig.MaxNumberOfBlocksToSkipStateSaving, cacheConfig.MaxAmountOfGasToSkipStateSaving, txes), func(t *testing.T) {
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, txes)
})
}

// test defaults
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512)
runTestCase(t, cacheConfig, 512)

cacheConfig.MaxNumberOfBlocksToSkipStateSaving = 127
cacheConfig.MaxAmountOfGasToSkipStateSaving = 0
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512)
runTestCase(t, cacheConfig, 512)

cacheConfig.MaxNumberOfBlocksToSkipStateSaving = 0
cacheConfig.MaxAmountOfGasToSkipStateSaving = 15 * 1000 * 1000
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512)
runTestCase(t, cacheConfig, 512)

cacheConfig.MaxNumberOfBlocksToSkipStateSaving = 127
cacheConfig.MaxAmountOfGasToSkipStateSaving = 15 * 1000 * 1000
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512)
runTestCase(t, cacheConfig, 512)

// lower number of blocks in triegc below 100 blocks, to be able to check for nonexistence in testSkippingSavingStateAndRecreatingAfterRestart (it doesn't check last BlockCount blocks as some of them may be persisted on node shutdown)
cacheConfig.BlockCount = 16
Expand All @@ -480,21 +488,16 @@ func TestSkippingSavingStateAndRecreatingAfterRestart(t *testing.T) {
cacheConfig.MaxAmountOfGasToSkipStateSaving = skipGas
// #nosec G115
cacheConfig.MaxNumberOfBlocksToSkipStateSaving = uint32(skipBlocks)
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 100)
runTestCase(t, cacheConfig, 100)
}
}
}

func TestGettingStateForRPCFullNode(t *testing.T) {
func testGettingState(t *testing.T, execConfig *gethexec.Config) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
execConfig := ExecConfigDefaultTest(t)
execConfig.Caching.SnapshotCache = 0 // disable snapshots
execConfig.Caching.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are
execConfig.Sequencer.MaxBlockSpeed = 0
execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110
builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 16)
execNode, _ := builder.L2.ExecNode, builder.L2.Client
execNode := builder.L2.ExecNode
defer cancelNode()
bc := execNode.Backend.ArbInterface().BlockChain()
api := execNode.Backend.APIBackend()
Expand All @@ -521,18 +524,40 @@ func TestGettingStateForRPCFullNode(t *testing.T) {
blockCountRequiredToFlushDirties := builder.execConfig.Caching.BlockCount
makeSomeTransfers(t, ctx, builder, blockCountRequiredToFlushDirties)

// force garbage collection to check if it won't break anything
runtime.GC()

exists = state.Exist(addr)
err = state.Error()
Require(t, err)
if !exists {
Fatal(t, "User2 address does not exist in the state")
}

// force garbage collection of StateDB object, what should cause the state finalizer to run
state = nil
runtime.GC()
_, err = bc.StateAt(header.Root)
if err == nil {
magicxyyz marked this conversation as resolved.
Show resolved Hide resolved
Fatal(t, "StateAndHeaderByNumber didn't failed as expected")
}
expectedErr := &trie.MissingNodeError{}
if !errors.As(err, &expectedErr) {
Fatal(t, "StateAndHeaderByNumber failed with unexpected error:", err)
}
}

func TestGettingStateForRPCHybridArchiveNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func TestGettingState(t *testing.T) {
execConfig := ExecConfigDefaultTest(t)
execConfig.Caching.SnapshotCache = 0 // disable snapshots
execConfig.Caching.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are
execConfig.Sequencer.MaxBlockSpeed = 0
execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110
t.Run("full-node", func(t *testing.T) {
testGettingState(t, execConfig)
})

execConfig = ExecConfigDefaultTest(t)
execConfig.Caching.Archive = true
// For now Archive node should use HashScheme
execConfig.Caching.StateScheme = rawdb.HashScheme
Expand All @@ -542,42 +567,13 @@ func TestGettingStateForRPCHybridArchiveNode(t *testing.T) {
execConfig.Caching.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are
execConfig.Sequencer.MaxBlockSpeed = 0
execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110
builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 16)
execNode, _ := builder.L2.ExecNode, builder.L2.Client
defer cancelNode()
bc := execNode.Backend.ArbInterface().BlockChain()
api := execNode.Backend.APIBackend()

header := bc.CurrentBlock()
if header == nil {
Fatal(t, "failed to get current block header")
}
// #nosec G115
state, _, err := api.StateAndHeaderByNumber(ctx, rpc.BlockNumber(header.Number.Uint64()))
Require(t, err)
addr := builder.L2Info.GetAddress("User2")
exists := state.Exist(addr)
err = state.Error()
Require(t, err)
if !exists {
Fatal(t, "User2 address does not exist in the state")
}
// Get the state again to avoid caching
// #nosec G115
state, _, err = api.StateAndHeaderByNumber(ctx, rpc.BlockNumber(header.Number.Uint64()))
Require(t, err)

blockCountRequiredToFlushDirties := builder.execConfig.Caching.BlockCount
makeSomeTransfers(t, ctx, builder, blockCountRequiredToFlushDirties)

exists = state.Exist(addr)
err = state.Error()
Require(t, err)
if !exists {
Fatal(t, "User2 address does not exist in the state")
}
t.Run("archive-node", func(t *testing.T) {
testGettingState(t, execConfig)
})
}

// regression test for issue caused by accessing block state that has just been committed to TrieDB but not yet referenced in core.BlockChain.writeBlockWithState (here called state of "recent" block)
// before the corresponding fix, access to the recent block state caused premature garbage collection of the head block state
func TestStateAndHeaderForRecentBlock(t *testing.T) {
threads := 32
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -618,15 +614,22 @@ func TestStateAndHeaderForRecentBlock(t *testing.T) {
}()
api := builder.L2.ExecNode.Backend.APIBackend()
db := builder.L2.ExecNode.Backend.ChainDb()
i := 1

recentBlock := 1
var mtx sync.RWMutex
var wgCallers sync.WaitGroup
for j := 0; j < threads && ctx.Err() == nil; j++ {
wgCallers.Add(1)
// each thread attempts to get state for a block that is just being created (here called recent):
// 1. Before state trie node is referenced in core.BlockChain.writeBlockWithState, block body is written to database with key prefix `b` followed by block number and then block hash (see: rawdb.blockBodyKey)
// 2. Each thread tries to read the block body entry to: a. extract recent block hash b. congest resource usage to slow down execution of core.BlockChain.writeBlockWithState
// 3. After extracting the hash from block body entry key, StateAndHeaderByNumberOfHash is called for the hash. It is expected that it will:
// a. either fail with "ahead of current block" if we made it before rawdb.WriteCanonicalHash is called in core.BlockChain.writeHeadBlock, which is called after writeBlockWithState finishes,
// b. or it will succeed if the canonical hash was written for the block meaning that writeBlockWithState was fully executed (i.a. state root trie node correctly referenced) - then the recentBlock is advanced
go func() {
defer wgCallers.Done()
mtx.RLock()
blockNumber := i
blockNumber := recentBlock
mtx.RUnlock()
for blockNumber < 300 && ctx.Err() == nil {
prefix := make([]byte, 8)
Expand All @@ -645,8 +648,8 @@ func TestStateAndHeaderForRecentBlock(t *testing.T) {
_, _, err := api.StateAndHeaderByNumberOrHash(ctx, rpc.BlockNumberOrHash{BlockHash: &blockHash})
if err == nil {
mtx.Lock()
if blockNumber == i {
i++
if blockNumber == recentBlock {
recentBlock++
}
mtx.Unlock()
break
Expand All @@ -666,7 +669,7 @@ func TestStateAndHeaderForRecentBlock(t *testing.T) {
}
it.Release()
mtx.RLock()
blockNumber = i
blockNumber = recentBlock
mtx.RUnlock()
}
}()
Expand Down
Loading