Skip to content

Commit

Permalink
Merge pull request #2442 from OffchainLabs/recreate-state-tests-impro…
Browse files Browse the repository at this point in the history
…vements

State recreation tests improvements
  • Loading branch information
PlasmaPower authored Oct 8, 2024
2 parents cfb211f + d48f24a commit 9eee233
Showing 1 changed file with 57 additions and 54 deletions.
111 changes: 57 additions & 54 deletions system_tests/recreatestate_rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"runtime"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -338,6 +339,7 @@ func TestRecreateStateForRPCBlockNotFoundWhileRecreating(t *testing.T) {
}

func testSkippingSavingStateAndRecreatingAfterRestart(t *testing.T, cacheConfig *gethexec.CachingConfig, txCount int) {
t.Parallel()
maxRecreateStateDepth := int64(30 * 1000 * 1000)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -451,20 +453,26 @@ func TestSkippingSavingStateAndRecreatingAfterRestart(t *testing.T) {
cacheConfig.SnapshotCache = 0 // disable snapshots
cacheConfig.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are

runTestCase := func(t *testing.T, cacheConfig gethexec.CachingConfig, txes int) {
t.Run(fmt.Sprintf("skip-blocks-%d-skip-gas-%d-txes-%d", cacheConfig.MaxNumberOfBlocksToSkipStateSaving, cacheConfig.MaxAmountOfGasToSkipStateSaving, txes), func(t *testing.T) {
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, txes)
})
}

// test defaults
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512)
runTestCase(t, cacheConfig, 512)

cacheConfig.MaxNumberOfBlocksToSkipStateSaving = 127
cacheConfig.MaxAmountOfGasToSkipStateSaving = 0
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512)
runTestCase(t, cacheConfig, 512)

cacheConfig.MaxNumberOfBlocksToSkipStateSaving = 0
cacheConfig.MaxAmountOfGasToSkipStateSaving = 15 * 1000 * 1000
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512)
runTestCase(t, cacheConfig, 512)

cacheConfig.MaxNumberOfBlocksToSkipStateSaving = 127
cacheConfig.MaxAmountOfGasToSkipStateSaving = 15 * 1000 * 1000
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 512)
runTestCase(t, cacheConfig, 512)

// lower number of blocks in triegc below 100 blocks, to be able to check for nonexistence in testSkippingSavingStateAndRecreatingAfterRestart (it doesn't check last BlockCount blocks as some of them may be persisted on node shutdown)
cacheConfig.BlockCount = 16
Expand All @@ -480,21 +488,16 @@ func TestSkippingSavingStateAndRecreatingAfterRestart(t *testing.T) {
cacheConfig.MaxAmountOfGasToSkipStateSaving = skipGas
// #nosec G115
cacheConfig.MaxNumberOfBlocksToSkipStateSaving = uint32(skipBlocks)
testSkippingSavingStateAndRecreatingAfterRestart(t, &cacheConfig, 100)
runTestCase(t, cacheConfig, 100)
}
}
}

func TestGettingStateForRPCFullNode(t *testing.T) {
func testGettingState(t *testing.T, execConfig *gethexec.Config) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
execConfig := ExecConfigDefaultTest(t)
execConfig.Caching.SnapshotCache = 0 // disable snapshots
execConfig.Caching.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are
execConfig.Sequencer.MaxBlockSpeed = 0
execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110
builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 16)
execNode, _ := builder.L2.ExecNode, builder.L2.Client
execNode := builder.L2.ExecNode
defer cancelNode()
bc := execNode.Backend.ArbInterface().BlockChain()
api := execNode.Backend.APIBackend()
Expand All @@ -521,18 +524,40 @@ func TestGettingStateForRPCFullNode(t *testing.T) {
blockCountRequiredToFlushDirties := builder.execConfig.Caching.BlockCount
makeSomeTransfers(t, ctx, builder, blockCountRequiredToFlushDirties)

// force garbage collection to check if it won't break anything
runtime.GC()

exists = state.Exist(addr)
err = state.Error()
Require(t, err)
if !exists {
Fatal(t, "User2 address does not exist in the state")
}

// force garbage collection of StateDB object, what should cause the state finalizer to run
state = nil
runtime.GC()
_, err = bc.StateAt(header.Root)
if err == nil {
Fatal(t, "StateAndHeaderByNumber didn't failed as expected")
}
expectedErr := &trie.MissingNodeError{}
if !errors.As(err, &expectedErr) {
Fatal(t, "StateAndHeaderByNumber failed with unexpected error:", err)
}
}

func TestGettingStateForRPCHybridArchiveNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func TestGettingState(t *testing.T) {
execConfig := ExecConfigDefaultTest(t)
execConfig.Caching.SnapshotCache = 0 // disable snapshots
execConfig.Caching.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are
execConfig.Sequencer.MaxBlockSpeed = 0
execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110
t.Run("full-node", func(t *testing.T) {
testGettingState(t, execConfig)
})

execConfig = ExecConfigDefaultTest(t)
execConfig.Caching.Archive = true
// For now Archive node should use HashScheme
execConfig.Caching.StateScheme = rawdb.HashScheme
Expand All @@ -542,42 +567,13 @@ func TestGettingStateForRPCHybridArchiveNode(t *testing.T) {
execConfig.Caching.BlockAge = 0 // use only Caching.BlockCount to keep only last N blocks in dirties cache, no matter how new they are
execConfig.Sequencer.MaxBlockSpeed = 0
execConfig.Sequencer.MaxTxDataSize = 150 // 1 test tx ~= 110
builder, cancelNode := prepareNodeWithHistory(t, ctx, execConfig, 16)
execNode, _ := builder.L2.ExecNode, builder.L2.Client
defer cancelNode()
bc := execNode.Backend.ArbInterface().BlockChain()
api := execNode.Backend.APIBackend()

header := bc.CurrentBlock()
if header == nil {
Fatal(t, "failed to get current block header")
}
// #nosec G115
state, _, err := api.StateAndHeaderByNumber(ctx, rpc.BlockNumber(header.Number.Uint64()))
Require(t, err)
addr := builder.L2Info.GetAddress("User2")
exists := state.Exist(addr)
err = state.Error()
Require(t, err)
if !exists {
Fatal(t, "User2 address does not exist in the state")
}
// Get the state again to avoid caching
// #nosec G115
state, _, err = api.StateAndHeaderByNumber(ctx, rpc.BlockNumber(header.Number.Uint64()))
Require(t, err)

blockCountRequiredToFlushDirties := builder.execConfig.Caching.BlockCount
makeSomeTransfers(t, ctx, builder, blockCountRequiredToFlushDirties)

exists = state.Exist(addr)
err = state.Error()
Require(t, err)
if !exists {
Fatal(t, "User2 address does not exist in the state")
}
t.Run("archive-node", func(t *testing.T) {
testGettingState(t, execConfig)
})
}

// regression test for issue caused by accessing block state that has just been committed to TrieDB but not yet referenced in core.BlockChain.writeBlockWithState (here called state of "recent" block)
// before the corresponding fix, access to the recent block state caused premature garbage collection of the head block state
func TestStateAndHeaderForRecentBlock(t *testing.T) {
threads := 32
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -618,15 +614,22 @@ func TestStateAndHeaderForRecentBlock(t *testing.T) {
}()
api := builder.L2.ExecNode.Backend.APIBackend()
db := builder.L2.ExecNode.Backend.ChainDb()
i := 1

recentBlock := 1
var mtx sync.RWMutex
var wgCallers sync.WaitGroup
for j := 0; j < threads && ctx.Err() == nil; j++ {
wgCallers.Add(1)
// each thread attempts to get state for a block that is just being created (here called recent):
// 1. Before state trie node is referenced in core.BlockChain.writeBlockWithState, block body is written to database with key prefix `b` followed by block number and then block hash (see: rawdb.blockBodyKey)
// 2. Each thread tries to read the block body entry to: a. extract recent block hash b. congest resource usage to slow down execution of core.BlockChain.writeBlockWithState
// 3. After extracting the hash from block body entry key, StateAndHeaderByNumberOfHash is called for the hash. It is expected that it will:
// a. either fail with "ahead of current block" if we made it before rawdb.WriteCanonicalHash is called in core.BlockChain.writeHeadBlock, which is called after writeBlockWithState finishes,
// b. or it will succeed if the canonical hash was written for the block meaning that writeBlockWithState was fully executed (i.a. state root trie node correctly referenced) - then the recentBlock is advanced
go func() {
defer wgCallers.Done()
mtx.RLock()
blockNumber := i
blockNumber := recentBlock
mtx.RUnlock()
for blockNumber < 300 && ctx.Err() == nil {
prefix := make([]byte, 8)
Expand All @@ -645,8 +648,8 @@ func TestStateAndHeaderForRecentBlock(t *testing.T) {
_, _, err := api.StateAndHeaderByNumberOrHash(ctx, rpc.BlockNumberOrHash{BlockHash: &blockHash})
if err == nil {
mtx.Lock()
if blockNumber == i {
i++
if blockNumber == recentBlock {
recentBlock++
}
mtx.Unlock()
break
Expand All @@ -666,7 +669,7 @@ func TestStateAndHeaderForRecentBlock(t *testing.T) {
}
it.Release()
mtx.RLock()
blockNumber = i
blockNumber = recentBlock
mtx.RUnlock()
}
}()
Expand Down

0 comments on commit 9eee233

Please sign in to comment.